128 lines
5.0 KiB
Python
128 lines
5.0 KiB
Python
# w.c.s. - web application for online forms
|
|
# Copyright (C) 2005-2022 Entr'ouvert
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
|
|
|
from wcs import sql
|
|
from wcs.qommon import _
|
|
|
|
|
|
class WorkflowTrace(sql.WorkflowTrace):
|
|
@classmethod
|
|
def select_for_formdata(cls, formdata):
|
|
return cls.select(
|
|
[
|
|
sql.Equal('formdef_type', formdata.formdef.xml_root_node),
|
|
sql.Equal('formdef_id', formdata.formdef.id),
|
|
sql.Equal('formdata_id', formdata.id),
|
|
],
|
|
order_by='timestamp',
|
|
)
|
|
|
|
@classmethod
|
|
def wipe_for_formdata(cls, formdata):
|
|
return cls.wipe(
|
|
clause=[
|
|
sql.Equal('formdef_type', formdata.formdef.xml_root_node),
|
|
sql.Equal('formdef_id', formdata.formdef.id),
|
|
sql.Equal('formdata_id', formdata.id),
|
|
],
|
|
)
|
|
|
|
def get_event_label(self):
|
|
return {
|
|
'api-created': _('Created (by API)'),
|
|
'api-post-edit-action': _('Actions after edit action (by API)'),
|
|
'api-trigger': _('API Trigger'),
|
|
'backoffice-created': _('Created (backoffice submission)'),
|
|
'button': _('Action button'),
|
|
'continuation': _('Continuation'),
|
|
'csv-import-created': _('Created (by CSV import)'),
|
|
'edit-action': _('Actions after edit action'),
|
|
'email-button': _('Email action button'),
|
|
'mass-jump': _('Mass jump action'),
|
|
'frontoffice-created': _('Created (frontoffice submission)'),
|
|
'global-action-button': _('Click on a global action button'),
|
|
'global-action': _('Global action'),
|
|
'global-action-mass': _('Mass global action'),
|
|
'global-action-timeout': _('Global action timeout'),
|
|
'global-api-trigger': _('API Trigger'),
|
|
'global-interactive-action': _('Global action (interactive)'),
|
|
'global-external-workflow': _('Trigger by external workflow'),
|
|
'json-import-created': _('Created (by JSON import)'),
|
|
'timeout-jump': _('Timeout jump'),
|
|
'workflow-created': _('Created (by workflow action)'),
|
|
'workflow-edited': _('Edited (by workflow action)'),
|
|
'workflow-form-submit': _('Action in workflow form'),
|
|
}.get(self.event, self.event)
|
|
|
|
def is_global_event(self):
|
|
return bool(self.event and self.event.startswith('global-'))
|
|
|
|
@property
|
|
def external_workflow(self):
|
|
if not hasattr(self, '_external_workflow'):
|
|
self._external_workflow = None
|
|
if self.event_args.get('external_workflow_id'):
|
|
from wcs.workflows import Workflow
|
|
|
|
self._external_workflow = Workflow.get(self.event_args.get('external_workflow_id'))
|
|
return self._external_workflow
|
|
|
|
def get_external_url(self, global_event):
|
|
try:
|
|
return '%sitems/%s/' % (
|
|
self.get_base_url(
|
|
self.external_workflow,
|
|
self.event_args.get('external_status_id'),
|
|
global_event,
|
|
),
|
|
self.event_args.get('external_item_id'),
|
|
)
|
|
except KeyError:
|
|
return '#missing-%s' % self.event_args.get('external_item_id')
|
|
|
|
def get_base_url(self, workflow, status_id, global_event):
|
|
if global_event:
|
|
if not global_event.event_args:
|
|
raise KeyError()
|
|
return '%sglobal-actions/%s/' % (
|
|
workflow.get_admin_url(),
|
|
global_event.event_args.get('global_action_id'),
|
|
)
|
|
status = workflow.get_status(status_id)
|
|
return status.get_admin_url()
|
|
|
|
def get_real_action(self, workflow, status_id, action_id, global_event):
|
|
if global_event:
|
|
if not global_event.event_args:
|
|
return None
|
|
global_action_id = global_event.event_args.get('global_action_id')
|
|
try:
|
|
global_action = [x for x in workflow.global_actions if x.id == global_action_id][0]
|
|
except IndexError:
|
|
return None
|
|
items = global_action.items
|
|
else:
|
|
try:
|
|
status = workflow.get_status(status_id)
|
|
except KeyError:
|
|
return None
|
|
items = status.items
|
|
try:
|
|
real_action = [x for x in items if x.id == action_id][0]
|
|
except IndexError:
|
|
real_action = None
|
|
return real_action
|