journal de suivi des consultations (#51974) #22

Merged
fpeters merged 3 commits from wip/51974-audit into main 2023-01-13 14:47:24 +01:00
16 changed files with 750 additions and 39 deletions

View File

@ -0,0 +1,166 @@
import datetime
import pytest
from wcs.audit import Audit
from wcs.formdef import FormDef
from wcs.qommon import audit
from wcs.qommon.http_request import HTTPRequest
from ..utilities import clean_temporary_pub, create_temporary_pub, get_app, login
from .test_all import create_superuser
@pytest.fixture
def superuser(pub):
return create_superuser(pub)
@pytest.fixture
def pub(request, emails):
pub = create_temporary_pub()
req = HTTPRequest(None, {'SCRIPT_NAME': '/', 'SERVER_NAME': 'example.net'})
pub.set_app_dir(req)
pub.cfg['identification'] = {'methods': ['password']}
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
return pub
def teardown_module(module):
clean_temporary_pub()
def test_formdata_audit(pub, superuser):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'form title'
formdef.store()
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
app = login(get_app(pub))
app.get(formdef.get_url(backoffice=True))
app.get(formdata.get_backoffice_url())
assert Audit.count() == 2
audit1, audit2 = Audit.select(order_by='id')
assert audit1.action == 'listing'
assert audit2.action == 'view'
assert audit1.object_type == audit2.object_type == 'formdef'
assert audit1.object_id == audit2.object_id == str(formdef.id)
assert audit1.data_id is None
assert audit2.data_id == formdata.id
assert audit2.frozen['user_email'] == superuser.email
assert audit2.frozen['object_slug'] == formdef.slug
def test_audit_journal(pub, superuser):
Audit.wipe()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'form title'
formdef.store()
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
formdef2 = FormDef()
formdef2.name = 'form title 2'
formdef2.store()
formdef2.data_class().wipe()
formdata2 = formdef2.data_class()()
formdata2.just_created()
formdata2.store()
app = login(get_app(pub))
resp = app.get('/backoffice/journal/') # visit empty
assert resp.pyquery('tbody tr').length == 0
for i in range(5):
audit('listing', obj=formdef, user_id=superuser.id)
for i in range(50):
audit('view', obj=formdata, user_id=superuser.id)
# create audit object in the past
audit_obj = Audit.select(order_by='-id')[0]
audit_obj.id = None
audit_obj.timestamp = audit_obj.timestamp - datetime.timedelta(days=40)
audit_obj.store()
# additional audit events
audit('export.csv', obj=formdef, user_id=superuser.id)
audit('export.csv', obj=formdef2, user_id=superuser.id)
audit('download file', obj=formdata2, user_id=superuser.id, extra_label='file.png')
resp = app.get('/backoffice/studio/')
resp = resp.click('Audit Journal')
assert resp.pyquery('.journal-table--user:first').text() == 'admin'
assert resp.pyquery('tbody tr').length == 10
resp = resp.click('Next page')
assert resp.pyquery('tbody tr').length == 10
resp = resp.click('Previous page')
assert resp.pyquery('tbody tr').length == 10
resp = resp.click('First page')
assert resp.pyquery('tbody tr').length == 10
assert resp.pyquery('.button.first-page.disabled')
resp = resp.click('Last page')
assert resp.pyquery('tbody tr').length == 10
assert resp.pyquery('.button.last-page.disabled')
resp.form['date'] = audit_obj.timestamp.strftime('%Y-%m-%d')
resp = resp.form.submit('submit')
assert resp.pyquery('tbody tr').length == 1
resp.form['date'] = ''
resp.form['action'] = 'listing'
resp = resp.form.submit('submit')
assert resp.pyquery('tbody tr').length == 5
resp.form['user_id'].force_value('12')
resp = resp.form.submit('submit')
assert resp.pyquery('tbody tr').length == 0
resp.form['user_id'].force_value(superuser.id)
resp = resp.form.submit('submit')
assert resp.pyquery('tbody tr').length == 5
assert resp.form['user_id'].value == str(superuser.id)
assert resp.form['user_id'].options[-1] == (str(superuser.id), True, 'admin')
resp.form['action'] = 'export.csv'
resp = resp.form.submit('submit')
assert resp.pyquery('tbody tr').length == 2
assert resp.pyquery('[data-widget-name="object_id"]').attr.style == 'display: none'
resp.form['object'] = f'formdef:{formdef2.id}'
resp = resp.form.submit('submit')
assert resp.pyquery('tbody tr').length == 1
assert resp.pyquery('[data-widget-name="object_id"]').attr.style != 'display: none'
resp.form['action'] = ''
resp.form['object_id'] = formdata2.id
resp = resp.form.submit('submit')
assert resp.pyquery('tbody tr').length == 1
resp.form['object_id'] = 'XXX'
resp = resp.form.submit('submit')
assert resp.pyquery('tbody tr').length == 0
# check journal is still displayed correctly after formdef removal
resp.form['object_id'] = ''
resp = resp.form.submit('submit')
assert resp.pyquery('tbody tr').length == 2
assert resp.pyquery('.journal-table--description')[-1].text == 'CSV Export - form title 2'
formdef2.remove_self()
resp = resp.form.submit('submit')
assert resp.pyquery('tbody tr').length == 2
assert resp.pyquery('.journal-table--description')[-1].text == 'CSV Export - form title 2'

48
tests/test_audit.py Normal file
View File

@ -0,0 +1,48 @@
import pytest
from django.utils.timezone import now
from wcs.audit import Audit
from wcs.formdef import FormDef
from wcs.qommon import audit
from wcs.qommon.http_request import HTTPRequest
from .utilities import clean_temporary_pub, create_temporary_pub
@pytest.fixture
def pub(request):
pub = create_temporary_pub()
req = HTTPRequest(None, {'SCRIPT_NAME': '/', 'SERVER_NAME': 'example.net'})
pub.set_app_dir(req)
pub.load_site_options()
return pub
def teardown_module(module):
clean_temporary_pub()
def test_audit_clean_job(pub, freezer):
Audit.wipe()
FormDef.wipe()
pub.user_class.wipe()
user = pub.user_class()
user.name = 'Test'
user.store()
formdef = FormDef()
formdef.name = 'form title'
formdef.store()
current_timestamp = now()
freezer.move_to('2018-12-01T00:00:00')
audit('listing', obj=formdef, user=user.id)
audit('listing', obj=formdef, user=user.id)
freezer.move_to(current_timestamp)
audit('listing', obj=formdef, user=user.id)
assert Audit.count() == 3
Audit.clean()
assert Audit.count() == 1

View File

@ -127,6 +127,7 @@ def create_temporary_pub(pickle_mode=False, lazy_mode=False):
pub.cfg['postgresql'] = {'database': known_elements.sql_db_name, 'user': os.environ['USER']}
pub.loggederror_class.wipe()
sql.WorkflowTrace.wipe()
sql.Audit.wipe()
pub.write_cfg()
return pub
@ -163,6 +164,7 @@ def create_temporary_pub(pickle_mode=False, lazy_mode=False):
sql.do_custom_views_table()
sql.do_snapshots_table()
sql.do_loggederrors_table()
sql.Audit.do_table()
sql.do_meta_table()
TestDef.do_table()
sql.WorkflowTrace.do_table()

98
wcs/audit.py Normal file
View File

@ -0,0 +1,98 @@
# w.c.s. - web application for online forms
# Copyright (C) 2005-2022 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import datetime
from django.utils.timezone import now
from quixote import get_publisher, get_request
from wcs import sql
from wcs.qommon import _
class Audit(sql.Audit):
id = None
timestamp = None
action = None
url = None
user_id = None
object_type = None # (formdef, carddef, etc.)
object_id = None
data_id = None # (for formdata and carddata)
extra_data = None
@classmethod
def record(cls, action, obj=None, user_id=None, **kwargs):
audit = cls()
audit.action = action
audit.timestamp = now()
request = get_request()
user = None
if user_id:
audit.user_id = user_id
user = get_publisher().user_class.get(audit.user_id, ignore_errors=True)
elif request:
user = request.get_user()
if user and hasattr(user, 'id'):
audit.user_id = user.id
if request:
audit.url = request.get_path_query()
if obj:
if hasattr(obj, '_formdef'): # formdata or carddata
audit.data_id = obj.id
obj = obj._formdef
audit.object_type = obj.xml_root_node
audit.object_id = obj.id
audit.extra_data = kwargs
audit.frozen = {
'user_email': getattr(user, 'email', None),
'user_full_name': getattr(user, 'display_name', None),
'user_nameid': getattr(user, 'nameid', None),
'object_slug': getattr(obj, 'slug', None),
'object_name': getattr(obj, 'name', None),
}
audit.store()
@classmethod
def get_action_labels(cls):
return {
'listing': _('Listing'),
'export.csv': _('CSV Export'),
'export.ods': _('ODS Export'),
'download file': _('Download of attached file'),
'download files': _('Download of attached files (bundle)'),
'view': _('View Data'),
}
def get_action_description(self):
action_label = self.get_action_labels().get(self.action, self.action)
obj_name = self.frozen.get('object_name')
if self.object_type and self.object_id:
obj_class = get_publisher().get_object_class(self.object_type)
obj = obj_class.get(self.object_id, ignore_errors=True)
if obj:
obj_name = obj.name
parts = [str(action_label), obj_name]
if self.data_id:
parts.append(str(self.data_id))
if self.extra_data and self.extra_data.get('extra_label'):
parts.append(self.extra_data.get('extra_label'))
return ' - '.join(parts)
@classmethod
def clean(cls, **kwargs):
audit_retention_days = get_publisher().get_site_option('audit-retention-days') or 730
Audit.wipe(clause=[sql.Less('timestamp', now() - datetime.timedelta(days=int(audit_retention_days)))])

154
wcs/backoffice/journal.py Normal file
View File

@ -0,0 +1,154 @@
# w.c.s. - web application for online forms
# Copyright (C) 2005-2022 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import datetime
from quixote import get_publisher, get_request, get_response
from quixote.directory import Directory
from wcs.carddef import CardDef
from wcs.formdef import FormDef
from wcs.qommon import _, template
from wcs.qommon.backoffice.menu import html_top
from wcs.qommon.form import DateWidget, Form, OptGroup, SingleSelectWidget, StringWidget
from wcs.qommon.misc import get_as_datetime
from wcs.qommon.storage import Equal, Greater, GreaterOrEqual, Less, Nothing
class JournalDirectory(Directory):
_q_exports = ['']
def html_top(self, title):
return html_top('journal', title)
def _q_traverse(self, path):
get_response().breadcrumb.append(('journal/', _('Audit Journal')))
return super()._q_traverse(path)
def _q_index(self):
from wcs.audit import Audit
self.html_top(_('Audit Journal'))
context = {
'has_sidebar': True,
'html_form': self.get_filter_form(),
}
criterias = []
querystring_parts = []
order_by = '-id'
if get_request().form.get('date'):
dt = get_as_datetime(get_request().form.get('date'))
dtm = dt + datetime.timedelta(days=1)
criterias.append(Less('timestamp', dtm))
criterias.append(GreaterOrEqual('timestamp', dt))
querystring_parts.append('date=%s' % get_request().form.get('date'))
if get_request().form.get('action'):
criterias.append(Equal('action', get_request().form.get('action')))
querystring_parts.append('action=%s' % get_request().form.get('action'))
if get_request().form.get('user_id'):
criterias.append(Equal('user_id', get_request().form.get('user_id')))
querystring_parts.append('user_id=%s' % get_request().form.get('user_id'))
if get_request().form.get('object'):
object_type, object_id = get_request().form.get('object').split(':')
criterias.append(Equal('object_type', object_type))
criterias.append(Equal('object_id', object_id))
querystring_parts.append('object=%s' % get_request().form.get('object'))
formdata_id = get_request().form.get('object_id')
if formdata_id:
querystring_parts.append('object_id=%s' % formdata_id)
try:
formdata_id = int(formdata_id)
except ValueError:
criterias.append(Nothing())
else:
criterias.append(Equal('data_id', formdata_id))
if get_request().form.get('max'):
criterias.append(Less('id', get_request().form.get('max')))
elif get_request().form.get('min'):
criterias.append(Greater('id', get_request().form.get('min')))
order_by = 'id'
context['lines'] = lines = Audit.select(criterias, order_by=order_by, limit=10)
first_id = Audit.get_first_id()
if order_by == 'id':
lines.reverse()
if len(lines) < 10 and get_request().form.get('min'):
get_request().form['min'] = None
return self._q_index()
if lines:
context['last_row_id'] = max(x.id for x in lines)
context['first_row_id'] = min(x.id for x in lines)
elif get_request().form.get('min'):
get_request().form['min'] = None
return self._q_index()
if first_id in [x.id for x in lines]:
# on latest page
context['no_next'] = True
if not get_request().form.get('min') and not get_request().form.get('max'):
context['no_prev'] = True
context['latest_page_id'] = first_id + 10
context['extra_qs'] = '&'.join(querystring_parts)
return template.QommonTemplateResponse(
templates=['wcs/backoffice/journal.html'], context=context, is_django_native=True
)
def is_accessible(self, user):
return user.can_go_in_admin()
def get_filter_form(self):
from wcs.audit import Audit
get_response().add_javascript(['select2.js'])
form = Form(method='get', action='.', id='journal-filter')
form.add(DateWidget, 'date', title=_('Date'), date_in_the_past=True, date_can_be_today=True)
user_options = [(None, '', '')]
if get_request().form.get('user_id'):
user = get_publisher().user_class.get(get_request().form.get('user_id'), ignore_errors=True)
if user:
user_options.append((user.id, str(user), user.id))
form.add(
SingleSelectWidget, 'user_id', title=_('User'), options=user_options, class_='user-selection'
)
formdefs = FormDef.select(order_by='name', lightweight=True, ignore_errors=True)
carddefs = CardDef.select(order_by='name', lightweight=True, ignore_errors=True)
object_options = [(None, '', '')]
if formdefs and carddefs:
object_options.append(OptGroup(_('Forms')))
object_options.extend([(f'formdef:{x.id}', x.name, f'formdef:{x.id}') for x in formdefs])
if formdefs and carddefs:
object_options.append(OptGroup(_('Card Models')))
object_options.extend([(f'carddef:{x.id}', x.name, f'carddef:{x.id}') for x in carddefs])
form.add(SingleSelectWidget, 'object', title=_('Form/Card'), options=object_options)
widget = form.add(StringWidget, 'object_id', title=_('Form/Card Identifier'))
if not form.get_widget('object').parse():
widget.is_hidden = True
form.add(
SingleSelectWidget,
'action',
title=_('Action'),
options=[('', '', '')] + [(x[0], x[1], x[0]) for x in Audit.get_action_labels().items()],
)
form.add_submit('submit', _('Search'))
return form

View File

@ -46,7 +46,7 @@ from wcs.roles import logged_users_role
from wcs.variables import LazyFieldVar, LazyList
from wcs.workflows import WorkflowStatusItem, item_classes, template_on_formdata
from ..qommon import _, errors, ezt, force_str, get_cfg, misc, ngettext, ods, pgettext_lazy, template
from ..qommon import _, audit, errors, ezt, force_str, get_cfg, misc, ngettext, ods, pgettext_lazy, template
from ..qommon.afterjobs import AfterJob
from ..qommon.backoffice.listing import pagination_links
from ..qommon.backoffice.menu import html_top
@ -2200,6 +2200,9 @@ class FormPage(FormdefDirectoryBase):
multi_form.widgets.append(HtmlWidget(table))
if not multi_actions:
multi_form.widgets.append(HtmlWidget('<div class="buttons"></div>'))
audit('listing', obj=self.formdef, refresh=bool(get_request().form.get('ajax') == 'true'))
if get_request().form.get('ajax') == 'true':
get_request().ignore_session = True
get_response().filter = {'raw': True}
@ -2313,6 +2316,7 @@ class FormPage(FormdefDirectoryBase):
get_request().form = parse_query(form.get_widget('query_string').parse() or '', 'utf-8')
get_request().form['skip_header_line'] = not (form.get_widget('include_header_line').parse())
file_format = form.get_widget('format').parse()
audit('export.%s' % file_format, obj=self.formdef)
if file_format == 'csv':
return self.csv()
elif file_format == 'json':
@ -3354,6 +3358,7 @@ class FormBackOfficeStatusPage(FormStatusPage):
if isinstance(subvalue_elem, PicklableUpload):
add_zip_file(subvalue_elem, zip_file)
audit('download files', obj=formdata)
response = get_response()
response.set_content_type('application/zip')
response.set_header(

View File

@ -34,13 +34,14 @@ from ..qommon.backoffice.menu import html_top
from .cards import CardsDirectory
from .data_management import DataManagementDirectory
from .i18n import I18nDirectory
from .journal import JournalDirectory
from .management import ManagementDirectory
from .studio import StudioDirectory
from .submission import SubmissionDirectory
class RootDirectory(BackofficeRootDirectory):
_q_exports = ['', 'pending', 'statistics', ('menu.json', 'menu_json'), 'processing']
_q_exports = ['', 'pending', 'statistics', ('menu.json', 'menu_json'), 'processing', 'journal']
forms = wcs.admin.forms.FormsDirectory()
roles = wcs.admin.roles.RolesDirectory()
@ -48,6 +49,7 @@ class RootDirectory(BackofficeRootDirectory):
users = wcs.admin.users.UsersDirectory()
workflows = wcs.admin.workflows.WorkflowsDirectory()
management = ManagementDirectory()
journal = JournalDirectory()
studio = StudioDirectory()
cards = CardsDirectory()
data = DataManagementDirectory()

View File

@ -120,6 +120,8 @@ class StudioDirectory(Directory):
object_types += [CardDef]
if backoffice_root.is_accessible('i18n') and get_publisher().has_i18n_enabled():
extra_links.append(('../i18n/', pgettext('studio', 'Multilinguism')))
if backoffice_root.is_accessible('journal'):
extra_links.append(('../journal/', pgettext('studio', 'Audit Journal')))
user = get_request().user
context = {

View File

@ -34,7 +34,7 @@ from wcs.qommon.admin.texts import TextsDirectory
from wcs.wf.editable import EditableWorkflowStatusItem
from wcs.workflows import RedisplayFormException
from ..qommon import _, errors, misc, template
from ..qommon import _, audit, errors, misc, template
class FileDirectory(Directory):
@ -80,6 +80,10 @@ class FileDirectory(Directory):
redirect_url = sign_url_auto_orig(redirect_url)
return redirect(redirect_url)
if not self.thumbnails:
# do not log access to thumbnails as they will already be accounted for as
# a view of the formdata/carddata containing them.
audit('download file', obj=self.formdata, extra_label=component)
return self.serve_file(file, thumbnail=self.thumbnails)
@classmethod
@ -665,6 +669,7 @@ class FormStatusPage(Directory, FormTemplateMixin):
return response
get_response().add_javascript(['jquery.js', 'qommon.forms.js'])
audit('view', obj=self.filled)
self.html_top('%s - %s' % (self.formdef.name, self.filled.id))
r = TemplateIO(html=True)
r += get_session().display_message()
@ -776,10 +781,17 @@ class FormStatusPage(Directory, FormTemplateMixin):
if not hasattr(field_data, 'file_digest'):
continue
if field_data.file_digest() == file_digest:
return FileDirectory.serve_file(
field_data,
thumbnail=bool(get_request().form.get('thumbnail') and field_data.can_thumbnail()),
)
thumbnail = bool(get_request().form.get('thumbnail') and field_data.can_thumbnail())
if not thumbnail:
# do not log access to thumbnails as they will already be accounted for as
# a view of the formdata/carddata containing them.
audit(
'download file',
obj=self.filled,
extra_label=str(field_data),
file_digest=file_digest,
)
return FileDirectory.serve_file(field_data, thumbnail=thumbnail)
elif get_request().form and get_request().form.get('f'):
try:
fn = get_request().form['f']

View File

@ -153,6 +153,12 @@ class WcsPublisher(QommonPublisher):
cls.register_cronjob(
CronJob(cls.clean_deleted_users, name='clean_deleted_users', hours=[3], minutes=[0])
)
# once a day clean old audit entries
from .audit import Audit
cls.register_cronjob(CronJob(Audit.clean, name='clean_audit', hours=[3], minutes=[0]))
# other jobs
data_sources.register_cronjob()
formdef.register_cronjobs()
@ -399,6 +405,7 @@ class WcsPublisher(QommonPublisher):
sql.do_loggederrors_table()
sql.do_tokens_table()
sql.WorkflowTrace.do_table()
sql.Audit.do_table()
sql.do_meta_table()
from .carddef import CardDef
from .formdef import FormDef
@ -485,6 +492,33 @@ class WcsPublisher(QommonPublisher):
pass
return logged_exception
def get_object_class(self, object_type):
from wcs.blocks import BlockDef
from wcs.carddef import CardDef
from wcs.categories import BlockCategory, CardDefCategory, Category, WorkflowCategory
from wcs.data_sources import NamedDataSource
from wcs.formdef import FormDef
from wcs.mail_templates import MailTemplate
from wcs.workflows import Workflow
from wcs.wscalls import NamedWsCall
for klass in (
BlockDef,
CardDef,
NamedDataSource,
FormDef,
Workflow,
NamedWsCall,
MailTemplate,
Category,
CardDefCategory,
WorkflowCategory,
BlockCategory,
):
if klass.xml_root_node == object_type:
return klass
raise KeyError('no class for object type: %s' % object_type)
def apply_global_action_timeouts(self, **kwargs):
from wcs.workflows import Workflow, WorkflowGlobalActionTimeoutTrigger

View File

@ -38,6 +38,12 @@ force_str = force_text
PICKLE_KWARGS = {'encoding': 'bytes', 'fix_imports': True}
def audit(action, **kwargs):
from wcs.audit import Audit
Audit.record(action, **kwargs)
def gettext(message):
pub = get_publisher()
if pub is None:

View File

@ -922,14 +922,22 @@ div.bo-block div#page-links {
padding: 1ex 1.5ex;
}
#page-links .previous-page:before {
.first-page:before {
content: "";
}
.previous-page:before {
content: "<";
}
#page-links .next-page:before {
.next-page:before {
content: ">";
}
.last-page:before {
content: "";
}
div#page-links a {
padding: 1ex 1.5ex;
border: none;
@ -2605,3 +2613,7 @@ div#main-content > h3.field-edit--subtitle {
max-height: 10em;
overflow-y: auto;
}
.journal-table--datetime {
width: 10em;
}

View File

@ -140,6 +140,14 @@ $(function() {
});
});
$('#journal-filter #form_object').on('change', function() {
if (this.value) {
$('[data-widget-name="object_id"]').show();
} else {
$('[data-widget-name="object_id"]').hide();
}
});
/* keep title/slug in sync */
$('body').delegate('input[data-slug-sync]', 'input change paste',
function() {
@ -185,6 +193,7 @@ $(function() {
url: '/api/users/'
},
placeholder: '-',
allowClear: true,
templateResult: function (state) {
if (!state.description) {
return state.text;
@ -197,9 +206,9 @@ $(function() {
return $template_string;
}
}
if ($('div.submit-user-selection').length) {
$('div.submit-user-selection select').select2(user_select2_options);
$('div.submit-user-selection select').on('select2:open', function (e) {
if ($('div.submit-user-selection, select.user-selection').length) {
$('div.submit-user-selection select, select.user-selection').select2(user_select2_options);
$('div.submit-user-selection select, select.user-selection').on('select2:open', function (e) {
var available_height = $(window).height() - $(this).offset().top;
$('ul.select2-results__options').css('max-height', (available_height - 100) + 'px');
});
@ -341,6 +350,27 @@ $(function() {
});
$('[type=radio][name=display_mode]:checked').trigger('change');
function prepate_journal_links() {
$('#journal-page-links a').on('click', function() {
var url = $(this).attr('href');
$.ajax({url: url, dataType: 'html', success: function(html) {
var $html = $(html);
var $table = $html.find('#journal-table');
var $page_links = $html.find('#journal-page-links');
if ($table.length && $page_links.length) {
$('#journal-table').replaceWith($table);
$('#journal-page-links').replaceWith($page_links);
prepate_journal_links();
if (window.history) {
window.history.replaceState(null, null, url);
}
}
}});
return false;
});
}
prepate_journal_links();
// IE doesn't accept periods or dashes in the window name, but the element IDs
// we use to generate popup window names may contain them, therefore we map them
// to allowed characters in a reversible way so that we can locate the correct

View File

@ -196,35 +196,11 @@ class Snapshot:
return instances
def get_object_class(self):
return Snapshot.get_class(self.object_type)
return get_publisher().get_object_class(self.object_type)
@classmethod
def get_class(cls, object_type):
from wcs.blocks import BlockDef
from wcs.carddef import CardDef
from wcs.categories import BlockCategory, CardDefCategory, Category, WorkflowCategory
from wcs.data_sources import NamedDataSource
from wcs.formdef import FormDef
from wcs.mail_templates import MailTemplate
from wcs.workflows import Workflow
from wcs.wscalls import NamedWsCall
for klass in (
BlockDef,
CardDef,
NamedDataSource,
FormDef,
Workflow,
NamedWsCall,
MailTemplate,
Category,
CardDefCategory,
WorkflowCategory,
BlockCategory,
):
if klass.xml_root_node == object_type:
return klass
raise KeyError('no class for object type: %s' % object_type)
return get_publisher().get_object_class(object_type)
def get_serialization(self, indented=True):
# there is a complete serialization

View File

@ -4571,6 +4571,117 @@ class WorkflowTrace(SqlMixin):
formdata.store()
class Audit(SqlMixin):
_table_name = 'audit'
_table_static_fields = [
('id', 'bigserial'),
('timestamp', 'timestamptz'),
('action', 'varchar'),
('url', 'varchar'),
('user_id', 'varchar'),
('object_type', 'varchar'),
('object_id', 'varchar'),
('data_id', 'int'),
('extra_data', 'jsonb'),
('frozen', 'jsonb'), # plain copy of user email, object name and slug
]
id = None
@classmethod
@guard_postgres
def do_table(cls):
conn, cur = get_connection_and_cursor()
table_name = cls._table_name
cur.execute(
'''SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = %s''',
(table_name,),
)
if cur.fetchone()[0] == 0:
cur.execute(
'''CREATE TABLE %s (id BIGSERIAL,
timestamp TIMESTAMP WITH TIME ZONE,
action VARCHAR,
url VARCHAR,
user_id VARCHAR,
user_email VARCHAR,
object_type VARCHAR,
object_id VARCHAR,
data_id INTEGER,
extra_data JSONB,
frozen JSONB
)'''
% table_name
)
cur.execute(
'''SELECT column_name FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = %s''',
(table_name,),
)
existing_fields = {x[0] for x in cur.fetchall()}
needed_fields = {x[0] for x in Audit._table_static_fields}
# delete obsolete fields
for field in existing_fields - needed_fields:
cur.execute('''ALTER TABLE %s DROP COLUMN %s''' % (table_name, field))
cur.execute('CREATE INDEX IF NOT EXISTS audit_id_idx ON audit USING btree (id)')
conn.commit()
cur.close()
@guard_postgres
def store(self):
if self.id:
# do not allow updates
raise AssertionError()
sql_dict = {x: getattr(self, x) for x, y in self._table_static_fields}
conn, cur = get_connection_and_cursor()
column_names = [x for x in sql_dict.keys() if x != 'id']
sql_statement = '''INSERT INTO %s (%s)
VALUES (%s)
RETURNING id''' % (
self._table_name,
', '.join(column_names),
', '.join(['%%(%s)s' % x for x in column_names]),
)
cur.execute(sql_statement, sql_dict)
self.id = cur.fetchone()[0]
conn.commit()
cur.close()
@classmethod
def _row2ob(cls, row, **kwargs):
o = cls()
for field, value in zip(cls._table_static_fields, tuple(row)):
setattr(o, field[0], value)
return o
@classmethod
def get_data_fields(cls):
return []
@classmethod
@guard_postgres
def get_first_id(cls):
conn, cur = get_connection_and_cursor()
sql_statement = 'SELECT id FROM audit ORDER BY id LIMIT 1'
cur.execute(sql_statement)
try:
lguerin marked this conversation as resolved Outdated

Ca n'arrivera qu'une fois, mais en local après migration et consultation de la page d'audit, j'ai une trace:

Exception:
  type = '<class 'IndexError'>', value = 'list index out of range'

Stack trace (most recent call first):
  File "/home/lguerin/src/wcs/wcs/sql.py", line 4676, in get_first_id
  4674         sql_statement = 'SELECT id FROM audit ORDER BY id LIMIT 1'
  4675         cur.execute(sql_statement)
> 4676         first_id = cur.fetchall()[0][0]
  4677         conn.commit()
  4678         cur.close()
Ca n'arrivera qu'une fois, mais en local après migration et consultation de la page d'audit, j'ai une trace: ``` Exception: type = '<class 'IndexError'>', value = 'list index out of range' Stack trace (most recent call first): File "/home/lguerin/src/wcs/wcs/sql.py", line 4676, in get_first_id 4674 sql_statement = 'SELECT id FROM audit ORDER BY id LIMIT 1' 4675 cur.execute(sql_statement) > 4676 first_id = cur.fetchall()[0][0] 4677 conn.commit() 4678 cur.close() ```

Pris en compte pour désormais faire

+        try:
+            first_id = cur.fetchall()[0][0]
+        except IndexError:
+            first_id = 0
Pris en compte pour désormais faire ``` + try: + first_id = cur.fetchall()[0][0] + except IndexError: + first_id = 0 ```
first_id = cur.fetchall()[0][0]
except IndexError:
first_id = 0
conn.commit()
cur.close()
return first_id
class classproperty:
def __init__(self, f):
self.f = f
@ -4949,7 +5060,7 @@ def get_period_total(
# latest migration, number + description (description is not used
# programmaticaly but will make sure git conflicts if two migrations are
# separately added with the same number)
SQL_LEVEL = (77, 'use token table for nonces')
SQL_LEVEL = (78, 'add audit table')
def migrate_global_views(conn, cur):
@ -5138,6 +5249,9 @@ def migrate():
# 75: migrate to dedicated workflow traces table
# 76: add index to workflow traces table
WorkflowTrace.do_table()
if sql_level < 78:
# 78: add audit table
Audit.do_table()
if sql_level < 52:
# 2: introduction of formdef_id in views
# 5: add concerned_roles_array, is_at_endpoint and fts to views

View File

@ -0,0 +1,50 @@
{% extends "wcs/backoffice.html" %}
{% load i18n %}
{% block content %}
{% block appbar %}
<div id="appbar">
<h2>{% trans "Audit Journal" %}</h2>
</div>
{% endblock %}
<table id="journal-table" class="main">
<thead>
<tr>
<th>{% trans "Date" %}</th>
<th>{% trans "User" %}</th>
<th>{% trans "Action" %}</th>
</tr>
</thead>
<tbody>
{% for line in lines %}
<tr>
<td class="journal-table--datetime">{{ line.timestamp|date:"DATETIME_FORMAT" }}</td>
<td class="journal-table--user">{{ line.frozen.user_full_name|default:"-" }}</td>
<td class="journal-table--description">{{ line.get_action_description }}</td>
</tr>
{% endfor %}
</tbody>
</table>
<div id="journal-page-links">
<a class="button first-page {% if no_prev%}disabled{% endif %}"
title="{% trans "Most recent activity" %}"
href="./?{{ extra_qs }}"><span class="sr-only">{% trans "First page" %}</span></a>
<a class="button previous-page {% if no_prev%}disabled{% endif %}"
title="{% trans "More recent activity" %}"
href="./?min={{ last_row_id }}&{{ extra_qs }}"><span class="sr-only">{% trans "Previous page" %}</span></a>
<a class="button next-page {% if no_next %}disabled{% endif %}"
title="{% trans "More ancient activity" %}"
href="./?max={{ first_row_id }}&{{ extra_qs }}"><span class="sr-only">{% trans "Next page" %}</span></a>
<a class="button last-page {% if no_next %}disabled{% endif %}"
title="{% trans "Most ancient activity" %}"
href="./?max={{ latest_page_id }}&{{ extra_qs }}"><span class="sr-only">{% trans "Last page" %}</span></a>
</div>
{% endblock %}
{% block sidebar-content %}
<h3>{% trans "Search" %}</h3>
{{ html_form.render|safe }}
{% endblock %}