journal de suivi des consultations (#51974) #22

Merged
fpeters merged 3 commits from wip/51974-audit into main 2023-01-13 14:47:24 +01:00
16 changed files with 750 additions and 39 deletions

View File

@ -0,0 +1,166 @@
import datetime
import pytest
from wcs.audit import Audit
from wcs.formdef import FormDef
from wcs.qommon import audit
from wcs.qommon.http_request import HTTPRequest
from ..utilities import clean_temporary_pub, create_temporary_pub, get_app, login
from .test_all import create_superuser
@pytest.fixture
def superuser(pub):
return create_superuser(pub)
@pytest.fixture
def pub(request, emails):
pub = create_temporary_pub()
req = HTTPRequest(None, {'SCRIPT_NAME': '/', 'SERVER_NAME': 'example.net'})
pub.set_app_dir(req)
pub.cfg['identification'] = {'methods': ['password']}
pub.cfg['language'] = {'language': 'en'}
pub.write_cfg()
return pub
def teardown_module(module):
clean_temporary_pub()
def test_formdata_audit(pub, superuser):
FormDef.wipe()
formdef = FormDef()
formdef.name = 'form title'
formdef.store()
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
app = login(get_app(pub))
app.get(formdef.get_url(backoffice=True))
app.get(formdata.get_backoffice_url())
assert Audit.count() == 2
audit1, audit2 = Audit.select(order_by='id')
assert audit1.action == 'listing'
assert audit2.action == 'view'
assert audit1.object_type == audit2.object_type == 'formdef'
assert audit1.object_id == audit2.object_id == str(formdef.id)
assert audit1.data_id is None
assert audit2.data_id == formdata.id
assert audit2.frozen['user_email'] == superuser.email
assert audit2.frozen['object_slug'] == formdef.slug
def test_audit_journal(pub, superuser):
Audit.wipe()
FormDef.wipe()
formdef = FormDef()
formdef.name = 'form title'
formdef.store()
formdef.data_class().wipe()
formdata = formdef.data_class()()
formdata.just_created()
formdata.store()
formdef2 = FormDef()
formdef2.name = 'form title 2'
formdef2.store()
formdef2.data_class().wipe()
formdata2 = formdef2.data_class()()
formdata2.just_created()
formdata2.store()
app = login(get_app(pub))
resp = app.get('/backoffice/journal/') # visit empty
assert resp.pyquery('tbody tr').length == 0
for i in range(5):
audit('listing', obj=formdef, user_id=superuser.id)
for i in range(50):
audit('view', obj=formdata, user_id=superuser.id)
# create audit object in the past
audit_obj = Audit.select(order_by='-id')[0]
audit_obj.id = None
audit_obj.timestamp = audit_obj.timestamp - datetime.timedelta(days=40)
audit_obj.store()
# additional audit events
audit('export.csv', obj=formdef, user_id=superuser.id)
audit('export.csv', obj=formdef2, user_id=superuser.id)
audit('download file', obj=formdata2, user_id=superuser.id, extra_label='file.png')
resp = app.get('/backoffice/studio/')
resp = resp.click('Audit Journal')
assert resp.pyquery('.journal-table--user:first').text() == 'admin'
assert resp.pyquery('tbody tr').length == 10
resp = resp.click('Next page')
assert resp.pyquery('tbody tr').length == 10
resp = resp.click('Previous page')
assert resp.pyquery('tbody tr').length == 10
resp = resp.click('First page')
assert resp.pyquery('tbody tr').length == 10
assert resp.pyquery('.button.first-page.disabled')
resp = resp.click('Last page')
assert resp.pyquery('tbody tr').length == 10
assert resp.pyquery('.button.last-page.disabled')
resp.form['date'] = audit_obj.timestamp.strftime('%Y-%m-%d')
resp = resp.form.submit('submit')
assert resp.pyquery('tbody tr').length == 1
resp.form['date'] = ''
resp.form['action'] = 'listing'
resp = resp.form.submit('submit')
assert resp.pyquery('tbody tr').length == 5
resp.form['user_id'].force_value('12')
resp = resp.form.submit('submit')
assert resp.pyquery('tbody tr').length == 0
resp.form['user_id'].force_value(superuser.id)
resp = resp.form.submit('submit')
assert resp.pyquery('tbody tr').length == 5
assert resp.form['user_id'].value == str(superuser.id)
assert resp.form['user_id'].options[-1] == (str(superuser.id), True, 'admin')
resp.form['action'] = 'export.csv'
resp = resp.form.submit('submit')
assert resp.pyquery('tbody tr').length == 2
assert resp.pyquery('[data-widget-name="object_id"]').attr.style == 'display: none'
resp.form['object'] = f'formdef:{formdef2.id}'
resp = resp.form.submit('submit')
assert resp.pyquery('tbody tr').length == 1
assert resp.pyquery('[data-widget-name="object_id"]').attr.style != 'display: none'
resp.form['action'] = ''
resp.form['object_id'] = formdata2.id
resp = resp.form.submit('submit')
assert resp.pyquery('tbody tr').length == 1
resp.form['object_id'] = 'XXX'
resp = resp.form.submit('submit')
assert resp.pyquery('tbody tr').length == 0
# check journal is still displayed correctly after formdef removal
resp.form['object_id'] = ''
resp = resp.form.submit('submit')
assert resp.pyquery('tbody tr').length == 2
assert resp.pyquery('.journal-table--description')[-1].text == 'CSV Export - form title 2'
formdef2.remove_self()
resp = resp.form.submit('submit')
assert resp.pyquery('tbody tr').length == 2
assert resp.pyquery('.journal-table--description')[-1].text == 'CSV Export - form title 2'

48
tests/test_audit.py Normal file
View File

@ -0,0 +1,48 @@
import pytest
from django.utils.timezone import now
from wcs.audit import Audit
from wcs.formdef import FormDef
from wcs.qommon import audit
from wcs.qommon.http_request import HTTPRequest
from .utilities import clean_temporary_pub, create_temporary_pub
@pytest.fixture
def pub(request):
pub = create_temporary_pub()
req = HTTPRequest(None, {'SCRIPT_NAME': '/', 'SERVER_NAME': 'example.net'})
pub.set_app_dir(req)
pub.load_site_options()
return pub
def teardown_module(module):
clean_temporary_pub()
def test_audit_clean_job(pub, freezer):
Audit.wipe()
FormDef.wipe()
pub.user_class.wipe()
user = pub.user_class()
user.name = 'Test'
user.store()
formdef = FormDef()
formdef.name = 'form title'
formdef.store()
current_timestamp = now()
freezer.move_to('2018-12-01T00:00:00')
audit('listing', obj=formdef, user=user.id)
audit('listing', obj=formdef, user=user.id)
freezer.move_to(current_timestamp)
audit('listing', obj=formdef, user=user.id)
assert Audit.count() == 3
Audit.clean()
assert Audit.count() == 1

View File

@ -127,6 +127,7 @@ def create_temporary_pub(pickle_mode=False, lazy_mode=False):
pub.cfg['postgresql'] = {'database': known_elements.sql_db_name, 'user': os.environ['USER']} pub.cfg['postgresql'] = {'database': known_elements.sql_db_name, 'user': os.environ['USER']}
pub.loggederror_class.wipe() pub.loggederror_class.wipe()
sql.WorkflowTrace.wipe() sql.WorkflowTrace.wipe()
sql.Audit.wipe()
pub.write_cfg() pub.write_cfg()
return pub return pub
@ -163,6 +164,7 @@ def create_temporary_pub(pickle_mode=False, lazy_mode=False):
sql.do_custom_views_table() sql.do_custom_views_table()
sql.do_snapshots_table() sql.do_snapshots_table()
sql.do_loggederrors_table() sql.do_loggederrors_table()
sql.Audit.do_table()
sql.do_meta_table() sql.do_meta_table()
TestDef.do_table() TestDef.do_table()
sql.WorkflowTrace.do_table() sql.WorkflowTrace.do_table()

98
wcs/audit.py Normal file
View File

@ -0,0 +1,98 @@
# w.c.s. - web application for online forms
# Copyright (C) 2005-2022 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import datetime
from django.utils.timezone import now
from quixote import get_publisher, get_request
from wcs import sql
from wcs.qommon import _
class Audit(sql.Audit):
id = None
timestamp = None
action = None
url = None
user_id = None
object_type = None # (formdef, carddef, etc.)
object_id = None
data_id = None # (for formdata and carddata)
extra_data = None
@classmethod
def record(cls, action, obj=None, user_id=None, **kwargs):
audit = cls()
audit.action = action
audit.timestamp = now()
request = get_request()
user = None
if user_id:
audit.user_id = user_id
user = get_publisher().user_class.get(audit.user_id, ignore_errors=True)
elif request:
user = request.get_user()
if user and hasattr(user, 'id'):
audit.user_id = user.id
if request:
audit.url = request.get_path_query()
if obj:
if hasattr(obj, '_formdef'): # formdata or carddata
audit.data_id = obj.id
obj = obj._formdef
audit.object_type = obj.xml_root_node
audit.object_id = obj.id
audit.extra_data = kwargs
audit.frozen = {
'user_email': getattr(user, 'email', None),
'user_full_name': getattr(user, 'display_name', None),
'user_nameid': getattr(user, 'nameid', None),
'object_slug': getattr(obj, 'slug', None),
'object_name': getattr(obj, 'name', None),
}
audit.store()
@classmethod
def get_action_labels(cls):
return {
'listing': _('Listing'),
'export.csv': _('CSV Export'),
'export.ods': _('ODS Export'),
'download file': _('Download of attached file'),
'download files': _('Download of attached files (bundle)'),
'view': _('View Data'),
}
def get_action_description(self):
action_label = self.get_action_labels().get(self.action, self.action)
obj_name = self.frozen.get('object_name')
if self.object_type and self.object_id:
obj_class = get_publisher().get_object_class(self.object_type)
obj = obj_class.get(self.object_id, ignore_errors=True)
if obj:
obj_name = obj.name
parts = [str(action_label), obj_name]
if self.data_id:
parts.append(str(self.data_id))
if self.extra_data and self.extra_data.get('extra_label'):
parts.append(self.extra_data.get('extra_label'))
return ' - '.join(parts)
@classmethod
def clean(cls, **kwargs):
audit_retention_days = get_publisher().get_site_option('audit-retention-days') or 730
Audit.wipe(clause=[sql.Less('timestamp', now() - datetime.timedelta(days=int(audit_retention_days)))])

154
wcs/backoffice/journal.py Normal file
View File

@ -0,0 +1,154 @@
# w.c.s. - web application for online forms
# Copyright (C) 2005-2022 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import datetime
from quixote import get_publisher, get_request, get_response
from quixote.directory import Directory
from wcs.carddef import CardDef
from wcs.formdef import FormDef
from wcs.qommon import _, template
from wcs.qommon.backoffice.menu import html_top
from wcs.qommon.form import DateWidget, Form, OptGroup, SingleSelectWidget, StringWidget
from wcs.qommon.misc import get_as_datetime
from wcs.qommon.storage import Equal, Greater, GreaterOrEqual, Less, Nothing
class JournalDirectory(Directory):
_q_exports = ['']
def html_top(self, title):
return html_top('journal', title)
def _q_traverse(self, path):
get_response().breadcrumb.append(('journal/', _('Audit Journal')))
return super()._q_traverse(path)
def _q_index(self):
from wcs.audit import Audit
self.html_top(_('Audit Journal'))
context = {
'has_sidebar': True,
'html_form': self.get_filter_form(),
}
criterias = []
querystring_parts = []
order_by = '-id'
if get_request().form.get('date'):
dt = get_as_datetime(get_request().form.get('date'))
dtm = dt + datetime.timedelta(days=1)
criterias.append(Less('timestamp', dtm))
criterias.append(GreaterOrEqual('timestamp', dt))
querystring_parts.append('date=%s' % get_request().form.get('date'))
if get_request().form.get('action'):
criterias.append(Equal('action', get_request().form.get('action')))
querystring_parts.append('action=%s' % get_request().form.get('action'))
if get_request().form.get('user_id'):
criterias.append(Equal('user_id', get_request().form.get('user_id')))
querystring_parts.append('user_id=%s' % get_request().form.get('user_id'))
if get_request().form.get('object'):
object_type, object_id = get_request().form.get('object').split(':')
criterias.append(Equal('object_type', object_type))
criterias.append(Equal('object_id', object_id))
querystring_parts.append('object=%s' % get_request().form.get('object'))
formdata_id = get_request().form.get('object_id')
if formdata_id:
querystring_parts.append('object_id=%s' % formdata_id)
try:
formdata_id = int(formdata_id)
except ValueError:
criterias.append(Nothing())
else:
criterias.append(Equal('data_id', formdata_id))
if get_request().form.get('max'):
criterias.append(Less('id', get_request().form.get('max')))
elif get_request().form.get('min'):
criterias.append(Greater('id', get_request().form.get('min')))
order_by = 'id'
context['lines'] = lines = Audit.select(criterias, order_by=order_by, limit=10)
first_id = Audit.get_first_id()
if order_by == 'id':
lines.reverse()
if len(lines) < 10 and get_request().form.get('min'):
get_request().form['min'] = None
return self._q_index()
if lines:
context['last_row_id'] = max(x.id for x in lines)
context['first_row_id'] = min(x.id for x in lines)
elif get_request().form.get('min'):
get_request().form['min'] = None
return self._q_index()
if first_id in [x.id for x in lines]:
# on latest page
context['no_next'] = True
if not get_request().form.get('min') and not get_request().form.get('max'):
context['no_prev'] = True
context['latest_page_id'] = first_id + 10
context['extra_qs'] = '&'.join(querystring_parts)
return template.QommonTemplateResponse(
templates=['wcs/backoffice/journal.html'], context=context, is_django_native=True
)
def is_accessible(self, user):
return user.can_go_in_admin()
def get_filter_form(self):
from wcs.audit import Audit
get_response().add_javascript(['select2.js'])
form = Form(method='get', action='.', id='journal-filter')
form.add(DateWidget, 'date', title=_('Date'), date_in_the_past=True, date_can_be_today=True)
user_options = [(None, '', '')]
if get_request().form.get('user_id'):
user = get_publisher().user_class.get(get_request().form.get('user_id'), ignore_errors=True)
if user:
user_options.append((user.id, str(user), user.id))
form.add(
SingleSelectWidget, 'user_id', title=_('User'), options=user_options, class_='user-selection'
)
formdefs = FormDef.select(order_by='name', lightweight=True, ignore_errors=True)
carddefs = CardDef.select(order_by='name', lightweight=True, ignore_errors=True)
object_options = [(None, '', '')]
if formdefs and carddefs:
object_options.append(OptGroup(_('Forms')))
object_options.extend([(f'formdef:{x.id}', x.name, f'formdef:{x.id}') for x in formdefs])
if formdefs and carddefs:
object_options.append(OptGroup(_('Card Models')))
object_options.extend([(f'carddef:{x.id}', x.name, f'carddef:{x.id}') for x in carddefs])
form.add(SingleSelectWidget, 'object', title=_('Form/Card'), options=object_options)
widget = form.add(StringWidget, 'object_id', title=_('Form/Card Identifier'))
if not form.get_widget('object').parse():
widget.is_hidden = True
form.add(
SingleSelectWidget,
'action',
title=_('Action'),
options=[('', '', '')] + [(x[0], x[1], x[0]) for x in Audit.get_action_labels().items()],
)
form.add_submit('submit', _('Search'))
return form

View File

@ -46,7 +46,7 @@ from wcs.roles import logged_users_role
from wcs.variables import LazyFieldVar, LazyList from wcs.variables import LazyFieldVar, LazyList
from wcs.workflows import WorkflowStatusItem, item_classes, template_on_formdata from wcs.workflows import WorkflowStatusItem, item_classes, template_on_formdata
from ..qommon import _, errors, ezt, force_str, get_cfg, misc, ngettext, ods, pgettext_lazy, template from ..qommon import _, audit, errors, ezt, force_str, get_cfg, misc, ngettext, ods, pgettext_lazy, template
from ..qommon.afterjobs import AfterJob from ..qommon.afterjobs import AfterJob
from ..qommon.backoffice.listing import pagination_links from ..qommon.backoffice.listing import pagination_links
from ..qommon.backoffice.menu import html_top from ..qommon.backoffice.menu import html_top
@ -2200,6 +2200,9 @@ class FormPage(FormdefDirectoryBase):
multi_form.widgets.append(HtmlWidget(table)) multi_form.widgets.append(HtmlWidget(table))
if not multi_actions: if not multi_actions:
multi_form.widgets.append(HtmlWidget('<div class="buttons"></div>')) multi_form.widgets.append(HtmlWidget('<div class="buttons"></div>'))
audit('listing', obj=self.formdef, refresh=bool(get_request().form.get('ajax') == 'true'))
if get_request().form.get('ajax') == 'true': if get_request().form.get('ajax') == 'true':
get_request().ignore_session = True get_request().ignore_session = True
get_response().filter = {'raw': True} get_response().filter = {'raw': True}
@ -2313,6 +2316,7 @@ class FormPage(FormdefDirectoryBase):
get_request().form = parse_query(form.get_widget('query_string').parse() or '', 'utf-8') get_request().form = parse_query(form.get_widget('query_string').parse() or '', 'utf-8')
get_request().form['skip_header_line'] = not (form.get_widget('include_header_line').parse()) get_request().form['skip_header_line'] = not (form.get_widget('include_header_line').parse())
file_format = form.get_widget('format').parse() file_format = form.get_widget('format').parse()
audit('export.%s' % file_format, obj=self.formdef)
if file_format == 'csv': if file_format == 'csv':
return self.csv() return self.csv()
elif file_format == 'json': elif file_format == 'json':
@ -3354,6 +3358,7 @@ class FormBackOfficeStatusPage(FormStatusPage):
if isinstance(subvalue_elem, PicklableUpload): if isinstance(subvalue_elem, PicklableUpload):
add_zip_file(subvalue_elem, zip_file) add_zip_file(subvalue_elem, zip_file)
audit('download files', obj=formdata)
response = get_response() response = get_response()
response.set_content_type('application/zip') response.set_content_type('application/zip')
response.set_header( response.set_header(

View File

@ -34,13 +34,14 @@ from ..qommon.backoffice.menu import html_top
from .cards import CardsDirectory from .cards import CardsDirectory
from .data_management import DataManagementDirectory from .data_management import DataManagementDirectory
from .i18n import I18nDirectory from .i18n import I18nDirectory
from .journal import JournalDirectory
from .management import ManagementDirectory from .management import ManagementDirectory
from .studio import StudioDirectory from .studio import StudioDirectory
from .submission import SubmissionDirectory from .submission import SubmissionDirectory
class RootDirectory(BackofficeRootDirectory): class RootDirectory(BackofficeRootDirectory):
_q_exports = ['', 'pending', 'statistics', ('menu.json', 'menu_json'), 'processing'] _q_exports = ['', 'pending', 'statistics', ('menu.json', 'menu_json'), 'processing', 'journal']
forms = wcs.admin.forms.FormsDirectory() forms = wcs.admin.forms.FormsDirectory()
roles = wcs.admin.roles.RolesDirectory() roles = wcs.admin.roles.RolesDirectory()
@ -48,6 +49,7 @@ class RootDirectory(BackofficeRootDirectory):
users = wcs.admin.users.UsersDirectory() users = wcs.admin.users.UsersDirectory()
workflows = wcs.admin.workflows.WorkflowsDirectory() workflows = wcs.admin.workflows.WorkflowsDirectory()
management = ManagementDirectory() management = ManagementDirectory()
journal = JournalDirectory()
studio = StudioDirectory() studio = StudioDirectory()
cards = CardsDirectory() cards = CardsDirectory()
data = DataManagementDirectory() data = DataManagementDirectory()

View File

@ -120,6 +120,8 @@ class StudioDirectory(Directory):
object_types += [CardDef] object_types += [CardDef]
if backoffice_root.is_accessible('i18n') and get_publisher().has_i18n_enabled(): if backoffice_root.is_accessible('i18n') and get_publisher().has_i18n_enabled():
extra_links.append(('../i18n/', pgettext('studio', 'Multilinguism'))) extra_links.append(('../i18n/', pgettext('studio', 'Multilinguism')))
if backoffice_root.is_accessible('journal'):
extra_links.append(('../journal/', pgettext('studio', 'Audit Journal')))
user = get_request().user user = get_request().user
context = { context = {

View File

@ -34,7 +34,7 @@ from wcs.qommon.admin.texts import TextsDirectory
from wcs.wf.editable import EditableWorkflowStatusItem from wcs.wf.editable import EditableWorkflowStatusItem
from wcs.workflows import RedisplayFormException from wcs.workflows import RedisplayFormException
from ..qommon import _, errors, misc, template from ..qommon import _, audit, errors, misc, template
class FileDirectory(Directory): class FileDirectory(Directory):
@ -80,6 +80,10 @@ class FileDirectory(Directory):
redirect_url = sign_url_auto_orig(redirect_url) redirect_url = sign_url_auto_orig(redirect_url)
return redirect(redirect_url) return redirect(redirect_url)
if not self.thumbnails:
# do not log access to thumbnails as they will already be accounted for as
# a view of the formdata/carddata containing them.
audit('download file', obj=self.formdata, extra_label=component)
return self.serve_file(file, thumbnail=self.thumbnails) return self.serve_file(file, thumbnail=self.thumbnails)
@classmethod @classmethod
@ -665,6 +669,7 @@ class FormStatusPage(Directory, FormTemplateMixin):
return response return response
get_response().add_javascript(['jquery.js', 'qommon.forms.js']) get_response().add_javascript(['jquery.js', 'qommon.forms.js'])
audit('view', obj=self.filled)
self.html_top('%s - %s' % (self.formdef.name, self.filled.id)) self.html_top('%s - %s' % (self.formdef.name, self.filled.id))
r = TemplateIO(html=True) r = TemplateIO(html=True)
r += get_session().display_message() r += get_session().display_message()
@ -776,10 +781,17 @@ class FormStatusPage(Directory, FormTemplateMixin):
if not hasattr(field_data, 'file_digest'): if not hasattr(field_data, 'file_digest'):
continue continue
if field_data.file_digest() == file_digest: if field_data.file_digest() == file_digest:
return FileDirectory.serve_file( thumbnail = bool(get_request().form.get('thumbnail') and field_data.can_thumbnail())
field_data, if not thumbnail:
thumbnail=bool(get_request().form.get('thumbnail') and field_data.can_thumbnail()), # do not log access to thumbnails as they will already be accounted for as
) # a view of the formdata/carddata containing them.
audit(
'download file',
obj=self.filled,
extra_label=str(field_data),
file_digest=file_digest,
)
return FileDirectory.serve_file(field_data, thumbnail=thumbnail)
elif get_request().form and get_request().form.get('f'): elif get_request().form and get_request().form.get('f'):
try: try:
fn = get_request().form['f'] fn = get_request().form['f']

View File

@ -153,6 +153,12 @@ class WcsPublisher(QommonPublisher):
cls.register_cronjob( cls.register_cronjob(
CronJob(cls.clean_deleted_users, name='clean_deleted_users', hours=[3], minutes=[0]) CronJob(cls.clean_deleted_users, name='clean_deleted_users', hours=[3], minutes=[0])
) )
# once a day clean old audit entries
from .audit import Audit
cls.register_cronjob(CronJob(Audit.clean, name='clean_audit', hours=[3], minutes=[0]))
# other jobs # other jobs
data_sources.register_cronjob() data_sources.register_cronjob()
formdef.register_cronjobs() formdef.register_cronjobs()
@ -399,6 +405,7 @@ class WcsPublisher(QommonPublisher):
sql.do_loggederrors_table() sql.do_loggederrors_table()
sql.do_tokens_table() sql.do_tokens_table()
sql.WorkflowTrace.do_table() sql.WorkflowTrace.do_table()
sql.Audit.do_table()
sql.do_meta_table() sql.do_meta_table()
from .carddef import CardDef from .carddef import CardDef
from .formdef import FormDef from .formdef import FormDef
@ -485,6 +492,33 @@ class WcsPublisher(QommonPublisher):
pass pass
return logged_exception return logged_exception
def get_object_class(self, object_type):
from wcs.blocks import BlockDef
from wcs.carddef import CardDef
from wcs.categories import BlockCategory, CardDefCategory, Category, WorkflowCategory
from wcs.data_sources import NamedDataSource
from wcs.formdef import FormDef
from wcs.mail_templates import MailTemplate
from wcs.workflows import Workflow
from wcs.wscalls import NamedWsCall
for klass in (
BlockDef,
CardDef,
NamedDataSource,
FormDef,
Workflow,
NamedWsCall,
MailTemplate,
Category,
CardDefCategory,
WorkflowCategory,
BlockCategory,
):
if klass.xml_root_node == object_type:
return klass
raise KeyError('no class for object type: %s' % object_type)
def apply_global_action_timeouts(self, **kwargs): def apply_global_action_timeouts(self, **kwargs):
from wcs.workflows import Workflow, WorkflowGlobalActionTimeoutTrigger from wcs.workflows import Workflow, WorkflowGlobalActionTimeoutTrigger

View File

@ -38,6 +38,12 @@ force_str = force_text
PICKLE_KWARGS = {'encoding': 'bytes', 'fix_imports': True} PICKLE_KWARGS = {'encoding': 'bytes', 'fix_imports': True}
def audit(action, **kwargs):
from wcs.audit import Audit
Audit.record(action, **kwargs)
def gettext(message): def gettext(message):
pub = get_publisher() pub = get_publisher()
if pub is None: if pub is None:

View File

@ -922,14 +922,22 @@ div.bo-block div#page-links {
padding: 1ex 1.5ex; padding: 1ex 1.5ex;
} }
#page-links .previous-page:before { .first-page:before {
content: "";
}
.previous-page:before {
content: "<"; content: "<";
} }
#page-links .next-page:before { .next-page:before {
content: ">"; content: ">";
} }
.last-page:before {
content: "";
}
div#page-links a { div#page-links a {
padding: 1ex 1.5ex; padding: 1ex 1.5ex;
border: none; border: none;
@ -2605,3 +2613,7 @@ div#main-content > h3.field-edit--subtitle {
max-height: 10em; max-height: 10em;
overflow-y: auto; overflow-y: auto;
} }
.journal-table--datetime {
width: 10em;
}

View File

@ -140,6 +140,14 @@ $(function() {
}); });
}); });
$('#journal-filter #form_object').on('change', function() {
if (this.value) {
$('[data-widget-name="object_id"]').show();
} else {
$('[data-widget-name="object_id"]').hide();
}
});
/* keep title/slug in sync */ /* keep title/slug in sync */
$('body').delegate('input[data-slug-sync]', 'input change paste', $('body').delegate('input[data-slug-sync]', 'input change paste',
function() { function() {
@ -185,6 +193,7 @@ $(function() {
url: '/api/users/' url: '/api/users/'
}, },
placeholder: '-', placeholder: '-',
allowClear: true,
templateResult: function (state) { templateResult: function (state) {
if (!state.description) { if (!state.description) {
return state.text; return state.text;
@ -197,9 +206,9 @@ $(function() {
return $template_string; return $template_string;
} }
} }
if ($('div.submit-user-selection').length) { if ($('div.submit-user-selection, select.user-selection').length) {
$('div.submit-user-selection select').select2(user_select2_options); $('div.submit-user-selection select, select.user-selection').select2(user_select2_options);
$('div.submit-user-selection select').on('select2:open', function (e) { $('div.submit-user-selection select, select.user-selection').on('select2:open', function (e) {
var available_height = $(window).height() - $(this).offset().top; var available_height = $(window).height() - $(this).offset().top;
$('ul.select2-results__options').css('max-height', (available_height - 100) + 'px'); $('ul.select2-results__options').css('max-height', (available_height - 100) + 'px');
}); });
@ -341,6 +350,27 @@ $(function() {
}); });
$('[type=radio][name=display_mode]:checked').trigger('change'); $('[type=radio][name=display_mode]:checked').trigger('change');
function prepate_journal_links() {
$('#journal-page-links a').on('click', function() {
var url = $(this).attr('href');
$.ajax({url: url, dataType: 'html', success: function(html) {
var $html = $(html);
var $table = $html.find('#journal-table');
var $page_links = $html.find('#journal-page-links');
if ($table.length && $page_links.length) {
$('#journal-table').replaceWith($table);
$('#journal-page-links').replaceWith($page_links);
prepate_journal_links();
if (window.history) {
window.history.replaceState(null, null, url);
}
}
}});
return false;
});
}
prepate_journal_links();
// IE doesn't accept periods or dashes in the window name, but the element IDs // IE doesn't accept periods or dashes in the window name, but the element IDs
// we use to generate popup window names may contain them, therefore we map them // we use to generate popup window names may contain them, therefore we map them
// to allowed characters in a reversible way so that we can locate the correct // to allowed characters in a reversible way so that we can locate the correct

View File

@ -196,35 +196,11 @@ class Snapshot:
return instances return instances
def get_object_class(self): def get_object_class(self):
return Snapshot.get_class(self.object_type) return get_publisher().get_object_class(self.object_type)
@classmethod @classmethod
def get_class(cls, object_type): def get_class(cls, object_type):
from wcs.blocks import BlockDef return get_publisher().get_object_class(object_type)
from wcs.carddef import CardDef
from wcs.categories import BlockCategory, CardDefCategory, Category, WorkflowCategory
from wcs.data_sources import NamedDataSource
from wcs.formdef import FormDef
from wcs.mail_templates import MailTemplate
from wcs.workflows import Workflow
from wcs.wscalls import NamedWsCall
for klass in (
BlockDef,
CardDef,
NamedDataSource,
FormDef,
Workflow,
NamedWsCall,
MailTemplate,
Category,
CardDefCategory,
WorkflowCategory,
BlockCategory,
):
if klass.xml_root_node == object_type:
return klass
raise KeyError('no class for object type: %s' % object_type)
def get_serialization(self, indented=True): def get_serialization(self, indented=True):
# there is a complete serialization # there is a complete serialization

View File

@ -4571,6 +4571,117 @@ class WorkflowTrace(SqlMixin):
formdata.store() formdata.store()
class Audit(SqlMixin):
_table_name = 'audit'
_table_static_fields = [
('id', 'bigserial'),
('timestamp', 'timestamptz'),
('action', 'varchar'),
('url', 'varchar'),
('user_id', 'varchar'),
('object_type', 'varchar'),
('object_id', 'varchar'),
('data_id', 'int'),
('extra_data', 'jsonb'),
('frozen', 'jsonb'), # plain copy of user email, object name and slug
]
id = None
@classmethod
@guard_postgres
def do_table(cls):
conn, cur = get_connection_and_cursor()
table_name = cls._table_name
cur.execute(
'''SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = %s''',
(table_name,),
)
if cur.fetchone()[0] == 0:
cur.execute(
'''CREATE TABLE %s (id BIGSERIAL,
timestamp TIMESTAMP WITH TIME ZONE,
action VARCHAR,
url VARCHAR,
user_id VARCHAR,
user_email VARCHAR,
object_type VARCHAR,
object_id VARCHAR,
data_id INTEGER,
extra_data JSONB,
frozen JSONB
)'''
% table_name
)
cur.execute(
'''SELECT column_name FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = %s''',
(table_name,),
)
existing_fields = {x[0] for x in cur.fetchall()}
needed_fields = {x[0] for x in Audit._table_static_fields}
# delete obsolete fields
for field in existing_fields - needed_fields:
cur.execute('''ALTER TABLE %s DROP COLUMN %s''' % (table_name, field))
cur.execute('CREATE INDEX IF NOT EXISTS audit_id_idx ON audit USING btree (id)')
conn.commit()
cur.close()
@guard_postgres
def store(self):
if self.id:
# do not allow updates
raise AssertionError()
sql_dict = {x: getattr(self, x) for x, y in self._table_static_fields}
conn, cur = get_connection_and_cursor()
column_names = [x for x in sql_dict.keys() if x != 'id']
sql_statement = '''INSERT INTO %s (%s)
VALUES (%s)
RETURNING id''' % (
self._table_name,
', '.join(column_names),
', '.join(['%%(%s)s' % x for x in column_names]),
)
cur.execute(sql_statement, sql_dict)
self.id = cur.fetchone()[0]
conn.commit()
cur.close()
@classmethod
def _row2ob(cls, row, **kwargs):
o = cls()
for field, value in zip(cls._table_static_fields, tuple(row)):
setattr(o, field[0], value)
return o
@classmethod
def get_data_fields(cls):
return []
@classmethod
@guard_postgres
def get_first_id(cls):
conn, cur = get_connection_and_cursor()
sql_statement = 'SELECT id FROM audit ORDER BY id LIMIT 1'
cur.execute(sql_statement)
try:
lguerin marked this conversation as resolved Outdated

Ca n'arrivera qu'une fois, mais en local après migration et consultation de la page d'audit, j'ai une trace:

Exception:
  type = '<class 'IndexError'>', value = 'list index out of range'

Stack trace (most recent call first):
  File "/home/lguerin/src/wcs/wcs/sql.py", line 4676, in get_first_id
  4674         sql_statement = 'SELECT id FROM audit ORDER BY id LIMIT 1'
  4675         cur.execute(sql_statement)
> 4676         first_id = cur.fetchall()[0][0]
  4677         conn.commit()
  4678         cur.close()
Ca n'arrivera qu'une fois, mais en local après migration et consultation de la page d'audit, j'ai une trace: ``` Exception: type = '<class 'IndexError'>', value = 'list index out of range' Stack trace (most recent call first): File "/home/lguerin/src/wcs/wcs/sql.py", line 4676, in get_first_id 4674 sql_statement = 'SELECT id FROM audit ORDER BY id LIMIT 1' 4675 cur.execute(sql_statement) > 4676 first_id = cur.fetchall()[0][0] 4677 conn.commit() 4678 cur.close() ```

Pris en compte pour désormais faire

+        try:
+            first_id = cur.fetchall()[0][0]
+        except IndexError:
+            first_id = 0
Pris en compte pour désormais faire ``` + try: + first_id = cur.fetchall()[0][0] + except IndexError: + first_id = 0 ```
first_id = cur.fetchall()[0][0]
except IndexError:
first_id = 0
conn.commit()
cur.close()
return first_id
class classproperty: class classproperty:
def __init__(self, f): def __init__(self, f):
self.f = f self.f = f
@ -4949,7 +5060,7 @@ def get_period_total(
# latest migration, number + description (description is not used # latest migration, number + description (description is not used
# programmaticaly but will make sure git conflicts if two migrations are # programmaticaly but will make sure git conflicts if two migrations are
# separately added with the same number) # separately added with the same number)
SQL_LEVEL = (77, 'use token table for nonces') SQL_LEVEL = (78, 'add audit table')
def migrate_global_views(conn, cur): def migrate_global_views(conn, cur):
@ -5138,6 +5249,9 @@ def migrate():
# 75: migrate to dedicated workflow traces table # 75: migrate to dedicated workflow traces table
# 76: add index to workflow traces table # 76: add index to workflow traces table
WorkflowTrace.do_table() WorkflowTrace.do_table()
if sql_level < 78:
# 78: add audit table
Audit.do_table()
if sql_level < 52: if sql_level < 52:
# 2: introduction of formdef_id in views # 2: introduction of formdef_id in views
# 5: add concerned_roles_array, is_at_endpoint and fts to views # 5: add concerned_roles_array, is_at_endpoint and fts to views

View File

@ -0,0 +1,50 @@
{% extends "wcs/backoffice.html" %}
{% load i18n %}
{% block content %}
{% block appbar %}
<div id="appbar">
<h2>{% trans "Audit Journal" %}</h2>
</div>
{% endblock %}
<table id="journal-table" class="main">
<thead>
<tr>
<th>{% trans "Date" %}</th>
<th>{% trans "User" %}</th>
<th>{% trans "Action" %}</th>
</tr>
</thead>
<tbody>
{% for line in lines %}
<tr>
<td class="journal-table--datetime">{{ line.timestamp|date:"DATETIME_FORMAT" }}</td>
<td class="journal-table--user">{{ line.frozen.user_full_name|default:"-" }}</td>
<td class="journal-table--description">{{ line.get_action_description }}</td>
</tr>
{% endfor %}
</tbody>
</table>
<div id="journal-page-links">
<a class="button first-page {% if no_prev%}disabled{% endif %}"
title="{% trans "Most recent activity" %}"
href="./?{{ extra_qs }}"><span class="sr-only">{% trans "First page" %}</span></a>
<a class="button previous-page {% if no_prev%}disabled{% endif %}"
title="{% trans "More recent activity" %}"
href="./?min={{ last_row_id }}&{{ extra_qs }}"><span class="sr-only">{% trans "Previous page" %}</span></a>
<a class="button next-page {% if no_next %}disabled{% endif %}"
title="{% trans "More ancient activity" %}"
href="./?max={{ first_row_id }}&{{ extra_qs }}"><span class="sr-only">{% trans "Next page" %}</span></a>
<a class="button last-page {% if no_next %}disabled{% endif %}"
title="{% trans "Most ancient activity" %}"
href="./?max={{ latest_page_id }}&{{ extra_qs }}"><span class="sr-only">{% trans "Last page" %}</span></a>
</div>
{% endblock %}
{% block sidebar-content %}
<h3>{% trans "Search" %}</h3>
{{ html_form.render|safe }}
{% endblock %}