wcs/wcs/admin/workflows.py

2217 lines
84 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2010 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import io
import itertools
import json
import textwrap
import time
import xml.etree.ElementTree as ET
from subprocess import PIPE, Popen
from django.utils.encoding import force_bytes
from quixote import get_publisher, get_request, get_response, get_session, redirect
from quixote.directory import Directory
from quixote.html import TemplateIO, htmltext
from wcs.admin.categories import WorkflowCategoriesDirectory, get_categories
from wcs.backoffice.snapshots import SnapshotsDirectory
from wcs.carddef import CardDef
from wcs.categories import WorkflowCategory
from wcs.formdata import Evolution
from wcs.formdef import FormDef, UpdateStatisticsDataAfterJob
from wcs.qommon import _, errors, force_str, misc, template
from wcs.qommon.admin.menu import command_icon
from wcs.qommon.backoffice.menu import html_top
from wcs.qommon.form import (
CheckboxWidget,
ColourWidget,
CompositeWidget,
FileWidget,
Form,
HtmlWidget,
SingleSelectWidget,
StringWidget,
UrlWidget,
VarnameWidget,
WysiwygTextWidget,
)
from wcs.qommon.storage import Equal
from wcs.workflows import (
DuplicateGlobalActionNameError,
DuplicateStatusNameError,
Workflow,
WorkflowBackofficeFieldsFormDef,
WorkflowCriticalityLevel,
WorkflowGlobalAction,
WorkflowImportError,
WorkflowVariablesFieldsFormDef,
item_classes,
)
from . import utils
from .comment_templates import CommentTemplatesDirectory
from .data_sources import NamedDataSourcesDirectory
from .fields import FieldDefPage, FieldsDirectory
from .logged_errors import LoggedErrorsDirectory
from .mail_templates import MailTemplatesDirectory
def is_global_accessible():
return get_publisher().get_backoffice_root().is_global_accessible('workflows')
def update_order(elements):
request = get_request()
new_order = request.form['order'].strip(';').split(';')
new_elements = []
for y in new_order:
element = [x for x in elements if x.id == y]
if not element:
continue
new_elements.append(element[0])
if {element.id for element in new_elements} != {element.id for element in elements}:
return None
return new_elements
def svg(tag):
return '{http://www.w3.org/2000/svg}%s' % tag
def xlink(tag):
return '{http://www.w3.org/1999/xlink}%s' % tag
TITLE = svg('title')
POLYGON = svg('polygon')
XLINK_TITLE = xlink('title')
def remove_tag(node, tag):
for child in node:
if child.tag == tag:
node.remove(child)
def remove_attribute(node, att):
if att in node.attrib:
del node.attrib[att]
def adjust_style(node, top, colours, white_text=False, colour_class=None):
remove_tag(node, TITLE)
if node.get('class') and node.get('class').startswith('node '):
colour_class = node.get('class').split()[-1]
if (node.get('fill'), node.get('stroke')) in (('white', 'white'), ('white', 'none')):
# this is the general white background, reduce it to a dot
node.attrib['points'] = '0,0 0,0 0,0 0,0'
if node.tag == svg('text') and white_text:
node.attrib['fill'] = 'white'
for child in node:
remove_attribute(child, XLINK_TITLE)
if child.tag == '{http://www.w3.org/2000/svg}polygon' and colour_class:
# for compatibility with graphviz >= 2.40 replace fill attribute
# with the original colour name.
child.attrib['fill'] = colour_class
if child.get('fill') in colours:
matching_hexa = colours.get(child.get('fill'))
child.attrib['fill'] = '#' + matching_hexa
del child.attrib['stroke']
if misc.get_foreground_colour(matching_hexa) == 'white':
white_text = True
if child.get('font-family'):
del child.attrib['font-family']
if child.get('font-size'):
child.attrib['font-size'] = str(float(child.attrib['font-size']) * 0.8)
remove_attribute(child, 'style')
adjust_style(child, top, colours, white_text=white_text, colour_class=colour_class)
def graphviz_post_treatment(content, colours, include=False):
"""Remove all svg:title and top-level svg:polygon nodes, remove style
attributes and xlink:title attributes.
If a color style is set to a name matching class-\\w+, set the second
part on as class selector on the top level svg:g element.
"""
tree = ET.fromstring(content)
if not include:
style = ET.SubElement(tree, svg('style'))
style.attrib['type'] = 'text/css'
css_url = '%s%s%s' % (
get_publisher().get_root_url(),
get_publisher().qommon_static_dir,
get_publisher().qommon_admin_css,
)
style.text = '@import url(%s);' % css_url
for root in tree:
remove_tag(root, TITLE)
for child in root:
adjust_style(child, child, colours)
return force_str(ET.tostring(tree))
def graphviz(workflow, url_prefix='', select=None, svg=True, include=False):
out = io.StringIO()
# a list of colours known to graphviz, they will serve as key to get back
# to the colours defined in wcs, they are used as color attributes in
# graphviz (<= 2.38) then as class attribute on node elements for 2.40 and
# later.
graphviz_colours = [
'aliceblue',
'antiquewhite',
'aqua',
'aquamarine',
'azure',
'beige',
'bisque',
'black',
'blanchedalmond',
'blue',
'blueviolet',
'brown',
'burlywood',
'cadetblue',
'chartreuse',
'chocolate',
'coral',
'cornflowerblue',
'cornsilk',
'crimson',
'cyan',
'darkblue',
'darkcyan',
'darkgoldenrod',
'darkgray',
'darkgreen',
'darkgrey',
'darkkhaki',
'darkmagenta',
'darkolivegreen',
'darkorange',
'darkorchid',
'darkred',
'darksalmon',
'darkseagreen',
'darkslateblue',
'darkslategray',
'darkslategrey',
'darkturquoise',
'darkviolet',
'deeppink',
'deepskyblue',
'dimgray',
'dimgrey',
'dodgerblue',
'firebrick',
'floralwhite',
'forestgreen',
'fuchsia',
'gainsboro',
'ghostwhite',
'gold',
'goldenrod',
'gray',
'grey',
'green',
'greenyellow',
'honeydew',
'hotpink',
'indianred',
'indigo',
'ivory',
'khaki',
'lavender',
'lavenderblush',
'lawngreen',
'lemonchiffon',
'lightblue',
'lightcoral',
'lightcyan',
'lightgoldenrodyellow',
'lightgray',
'lightgrey',
'lightpink',
]
colours = {}
revert_colours = {}
print('digraph main {', file=out)
# print >>out, 'graph [ rankdir=LR ];'
print('node [shape=box,style=filled];', file=out)
print('edge [];', file=out)
for status in workflow.possible_status:
i = status.id
print('status%s' % i, end=' ', file=out)
print('[label="%s"' % status.name.replace('"', "'"), end=' ', file=out)
if select == str(i):
print(',id=current_status', file=out)
if status.colour:
if status.colour not in colours:
colours[status.colour] = graphviz_colours.pop()
revert_colours[colours[status.colour]] = status.colour
print(',color=%s' % colours[status.colour], file=out)
print(',class=%s' % colours[status.colour], file=out)
print(' URL="%sstatus/%s/"];' % (url_prefix, i), file=out)
for status in workflow.possible_status:
i = status.id
for item in status.items:
next_status_ids = [x.id for x in item.get_target_status() if x.id]
if not next_status_ids:
next_status_ids = [status.id]
done = {}
url = 'status/%s/items/%s/' % (i, item.id)
for next_id in next_status_ids:
if next_id in done:
# don't display multiple arrows for same action and target
# status
continue
print('status%s -> status%s' % (i, next_id), file=out)
done[next_id] = True
label = item.get_jump_label(target_id=next_id)
label = label.replace('"', '\\"')
label = textwrap.fill(label, 20, break_long_words=False)
label = label.replace('\n', '\\n')
label = label.replace('&', '&amp;')
print('[label="%s"' % label, end=' ', file=out)
print(',URL="%s%s"]' % (url_prefix, url), file=out)
print('}', file=out)
out = out.getvalue()
if svg:
try:
with Popen(['dot', '-Tsvg'], stdin=PIPE, stdout=PIPE) as process:
out = process.communicate(force_bytes(out))[0]
if process.returncode != 0:
return ''
except OSError:
return ''
out = graphviz_post_treatment(out, revert_colours, include=include)
if include:
# It seems webkit refuse to accept SVG when using its proper namespace,
# and xlink namespace prefix must be xlink: to be acceptable
out = out.replace('ns0:', '')
out = out.replace('xmlns:ns0', 'xmlns:svg')
out = out.replace('ns1:', 'xlink:')
out = out.replace(':ns1', ':xlink')
out = out.replace('<title>main</title>', '<title>%s</title>' % workflow.name)
return out
class WorkflowUI:
def __init__(self, workflow):
self.workflow = workflow
def get_categories(self):
global_access = is_global_accessible()
user_roles = set(get_request().user.get_roles())
def filter_function(category):
if global_access:
return True
management_roles = {x.id for x in getattr(category, 'management_roles') or []}
return bool(user_roles.intersection(management_roles))
return get_categories(WorkflowCategory, filter_function=filter_function)
def form_new(self):
form = Form(enctype='multipart/form-data')
form.add(StringWidget, 'name', title=_('Workflow Name'), required=True, size=30)
category_options = self.get_categories()
if category_options:
if is_global_accessible():
category_options = [(None, '---', '')] + list(category_options)
form.add(
SingleSelectWidget,
'category_id',
title=_('Category'),
options=category_options,
)
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
return form
def form_edit(self):
form = Form(enctype='multipart/form-data')
form.add_hidden('id', value=self.workflow.id)
form.add(
StringWidget, 'name', title=_('Workflow Name'), required=True, size=30, value=self.workflow.name
)
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
return form
def submit_form(self, form):
if self.workflow:
workflow = self.workflow
else:
workflow = Workflow(name=form.get_widget('name').parse())
name = form.get_widget('name').parse()
workflows_name = [x.name for x in Workflow.select() if x.id != workflow.id]
if name in workflows_name:
form.get_widget('name').set_error(_('This name is already used'))
raise ValueError()
for f in ('name', 'category_id'):
widget = form.get_widget(f)
if widget:
setattr(workflow, f, widget.parse())
workflow.store()
return workflow
class WorkflowItemPage(Directory):
_q_exports = ['', 'delete', 'copy']
def __init__(self, workflow, parent, component, html_top):
try:
self.item = [x for x in parent.items if x.id == component][0]
except (IndexError, ValueError):
raise errors.TraversalError()
self.workflow = workflow
self.parent = parent
self.html_top = html_top
get_response().breadcrumb.append(('items/%s/' % component, self.item.description))
def _q_index(self):
request = get_request()
if request.get_method() == 'GET' and request.form.get('file'):
value = getattr(self.item, request.form.get('file'), None)
if value:
return value.build_response()
form = Form(enctype='multipart/form-data', use_tabs=True)
self.item.fill_admin_form(form)
if not self.workflow.is_readonly():
submit_label = _('Submit')
if hasattr(self.item, 'submit_button_label'):
submit_label = self.item.submit_button_label
form.add_submit('submit', submit_label)
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('..')
if form.get_submit() == 'submit' and not form.has_errors():
self.item.submit_admin_form(form)
if not form.has_errors():
if isinstance(self.parent, WorkflowGlobalAction):
comment = _('Change in action "%(description)s" in global action "%(name)s"')
else:
comment = _('Change in action "%(description)s" in status "%(name)s"')
self.workflow.store(
comment
% {
'description': self.item.render_as_line(),
'name': self.parent.name,
}
)
if getattr(self.item, 'redirect_after_submit_url', None):
return redirect(self.item.redirect_after_submit_url)
return redirect('..')
self.html_top('%s - %s' % (_('Workflow'), self.workflow.name))
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % self.item.description
r += form.render()
if self.item.support_substitution_variables:
r += get_publisher().substitutions.get_substitution_html_table()
return r.getvalue()
def delete(self):
form = Form(enctype='multipart/form-data')
form.widgets.append(HtmlWidget('<p>%s</p>' % _('You are about to remove an item.')))
form.add_submit('delete', _('Delete'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('../../')
if not form.is_submitted() or form.has_errors():
get_response().breadcrumb.append(('delete', _('Delete')))
self.html_top(title=_('Delete Item'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('Deleting Item')
r += form.render()
return r.getvalue()
else:
del self.parent.items[self.parent.items.index(self.item)]
self.workflow.store(
comment=_('Deletion of action "%(description)s" in status "%(status)s"')
% {
'description': self.item.render_as_line(),
'status': self.parent.name,
}
)
return redirect('../../')
def copy(self):
form = Form(enctype='multipart/form-data')
destinations = [(x.id, x.name) for x in self.workflow.possible_status]
form.add(
SingleSelectWidget, 'status', title=_('Target status'), options=destinations, value=self.parent.id
)
form.add_submit('copy', _('Copy'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('../../')
if form.is_submitted() and not form.has_errors():
try:
return self.copy_submit(form)
except ValueError:
pass
get_response().breadcrumb.append(('copy', _('Copy')))
self.html_top(title=_('Copy Item'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('Copy Item')
r += form.render()
return r.getvalue()
def copy_submit(self, form):
status_id = form.get_widget('status').parse()
destination_status = self.workflow.get_status(status_id)
item = self.item.export_to_xml('utf-8')
item_type = item.attrib['type']
new_item = destination_status.add_action(item_type)
new_item.parent = destination_status
try:
new_item.init_with_xml(item, 'utf-8', check_datasources=False)
except WorkflowImportError as e:
reason = _(e.msg) % e.msg_args
if hasattr(e, 'render'):
reason = e.render()
elif e.details:
reason += ' [%s]' % e.details
form.add_global_errors([reason])
raise ValueError()
self.workflow.store(
comment=_(
'Copy of action "%(description)s" from status "%(from_status)s" to status "%(destination_status)s"'
)
% {
'description': self.item.render_as_line(),
'from_status': self.parent.name,
'destination_status': destination_status.name,
}
)
return redirect('../../')
def _q_lookup(self, component):
t = self.item.q_admin_lookup(self.workflow, self.parent, component, self.html_top)
if t:
return t
return Directory._q_lookup(self, component)
class GlobalActionTriggerPage(Directory):
_q_exports = ['', 'delete']
def __init__(self, workflow, action, component, html_top):
try:
self.trigger = [x for x in action.triggers if x.id == component][0]
except (IndexError, ValueError):
raise errors.TraversalError()
self.workflow = workflow
self.action = action
self.status = action
self.html_top = html_top
get_response().breadcrumb.append(('triggers/%s/' % component, _('Trigger')))
def _q_index(self):
form = self.trigger.form(self.workflow)
form.add_submit('submit', _('Save'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('..')
if form.get_submit() == 'submit' and not form.has_errors():
self.trigger.submit_admin_form(form)
if not form.has_errors():
self.workflow.store(comment=_('Change in global action trigger'))
return redirect('../../')
self.html_top('%s - %s' % (_('Workflow'), self.workflow.name))
r = TemplateIO(html=True)
r += htmltext('<h2>%s - %s</h2>') % (self.workflow.name, self.action.name)
r += form.render()
return r.getvalue()
def delete(self):
form = Form(enctype='multipart/form-data')
form.widgets.append(HtmlWidget('<p>%s</p>' % _('You are about to remove a trigger.')))
form.add_submit('delete', _('Delete'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('../../')
if not form.is_submitted() or form.has_errors():
get_response().breadcrumb.append(('delete', _('Delete')))
self.html_top(title=_('Delete Trigger'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('Deleting Trigger')
r += form.render()
return r.getvalue()
else:
del self.action.triggers[self.action.triggers.index(self.trigger)]
self.workflow.store(comment=_('Deletion of global action trigger'))
return redirect('../../')
class ToChildDirectory(Directory):
_q_exports = ['']
klass = None
def __init__(self, workflow, status, html_top):
self.workflow = workflow
self.status = status
self.html_top = html_top
def _q_lookup(self, component):
return self.klass(self.workflow, self.status, component, self.html_top)
def _q_index(self):
return redirect('..')
class WorkflowItemsDir(ToChildDirectory):
klass = WorkflowItemPage
class GlobalActionTriggersDir(ToChildDirectory):
klass = GlobalActionTriggerPage
class GlobalActionItemsDir(ToChildDirectory):
klass = WorkflowItemPage
class WorkflowStatusPage(Directory):
_q_exports = [
'',
'delete',
'newitem',
('items', 'items_dir'),
'update_order',
'edit',
'reassign',
'endpoint',
'display',
('backoffice-info-text', 'backoffice_info_text'),
'fullscreen',
('schema.svg', 'svg'),
'svg',
]
do_not_call_in_templates = True
def __init__(self, workflow, status_id, html_top):
self.html_top = html_top
self.workflow = workflow
try:
self.status = [x for x in self.workflow.possible_status if x.id == status_id][0]
except IndexError:
raise errors.TraversalError()
self.items_dir = WorkflowItemsDir(workflow, self.status, html_top)
get_response().breadcrumb.append(('status/%s/' % status_id, self.status.name))
def _q_index(self):
self.html_top('%s - %s' % (_('Workflow'), self.workflow.name))
get_response().add_javascript(
[
'jquery.js',
'jquery-ui.js',
'biglist.js',
'svg-pan-zoom.js',
'qommon.wysiwyg.js',
'popup.js',
'jquery.colourpicker.js',
]
)
return template.QommonTemplateResponse(
templates=['wcs/backoffice/workflow-status.html'],
context={'view': self, 'workflow': self.workflow, 'status': self.status, 'has_sidebar': True},
is_django_native=True,
)
def get_source_statuses(self):
statuses = []
for status in self.workflow.possible_status:
if status is self.status:
continue
for item in status.items:
if self.status in item.get_target_status():
statuses.append(status)
break
return statuses
def graphviz(self):
return graphviz(self.workflow, url_prefix='../../', include=True, select='%s' % self.status.id)
def fullscreen(self):
get_response().add_javascript(['jquery.js', 'svg-pan-zoom.js', 'qommon.admin.js'])
context = {
'view': self,
'workflow': self.workflow,
}
return template.QommonTemplateResponse(
templates=['wcs/backoffice/workflow-fullscreen-schema.html'],
context=context,
is_django_native=True,
)
def svg(self):
response = get_response()
response.set_content_type('image/svg+xml')
root_url = get_publisher().get_application_static_files_root_url()
css = root_url + get_publisher().qommon_static_dir + get_publisher().qommon_admin_css
return graphviz(
self.workflow, url_prefix='../../', include=False, select='%s' % self.status.id
).replace('?>', '?>\n<?xml-stylesheet href="%s" type="text/css"?>\n' % css)
def is_item_available(self, item):
return not item.is_disabled() and item.is_available(workflow=self.workflow)
def get_new_item_form(self):
form = Form(enctype='multipart/form-data', action='newitem', id='new-action-form')
categories = [
('status-change', _('Change Status')),
('interaction', _('Interact')),
('formdata-action', _('Act on a Form/Card')),
('user-action', _('Act on User')),
]
available_items = [x for x in item_classes if self.is_item_available(x)]
available_items.sort(key=lambda x: misc.simplify(x.description))
for category, category_label in categories:
options = [
(x.key, x(parent=self.status).description) for x in available_items if x.category == category
]
form.add(
SingleSelectWidget,
'action-%s' % category,
title=category_label,
required=False,
options=[(None, '')] + options,
)
form.add_submit('submit', _('Add'))
return form
def update_order(self):
get_response().set_content_type('application/json')
reordered_items = update_order(self.status.items)
if reordered_items is None:
return json.dumps({'err': 1})
self.status.items = reordered_items
self.workflow.store(comment=_('Change in action order'))
return json.dumps({'err': 0})
def newitem(self):
form = self.get_new_item_form()
if not form.is_submitted() or form.has_errors():
get_session().message = ('error', _('Submitted form was not filled properly.'))
return redirect('.')
for category in ('status-change', 'interaction', 'formdata-action', 'user-action'):
action_type = form.get_widget('action-%s' % category).parse()
if action_type:
self.status.add_action(action_type)
self.workflow.store(
comment=_('New action "%(description)s" in status "%(status)s"')
% {
'description': self.status.items[-1].description,
'status': self.status.name,
}
)
return redirect('.')
get_session().message = ('error', _('Submitted form was not filled properly.'))
return redirect('.')
def delete(self):
form = Form(enctype="multipart/form-data")
form.widgets.append(HtmlWidget('<p>%s</p>' % _("You are about to remove a status.")))
form.add_submit('delete', _('Delete'))
form.add_submit("cancel", _("Cancel"))
if form.get_widget('cancel').parse():
return redirect('.')
if not form.is_submitted() or form.has_errors():
get_response().breadcrumb.append(('delete', _('Delete')))
self.html_top(title=_('Delete Status'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s %s</h2>') % (_('Deleting Status:'), self.status.name)
r += form.render()
return r.getvalue()
else:
# Before removing the status, scan formdata to know if it's in use.
for formdef in itertools.chain(
FormDef.select(lightweight=True), CardDef.select(lightweight=True)
):
if formdef.workflow_id != self.workflow.id:
continue
if formdef.data_class().exists([Equal('status', 'wf-%s' % self.status.id)]):
return redirect('reassign')
del self.workflow.possible_status[self.workflow.possible_status.index(self.status)]
self.workflow.store(comment=_('Deletion of status %s') % self.status.name)
return redirect('../../')
def edit(self):
form = Form(enctype='multipart/form-data')
form.add(StringWidget, 'name', title=_('Status Name'), required=True, size=30, value=self.status.name)
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('.')
if form.is_submitted() and not form.has_errors():
new_name = str(form.get_widget('name').parse())
if [x for x in self.workflow.possible_status if x.name == new_name]:
form.get_widget('name').set_error(_('There is already a status with that name.'))
else:
self.status.name = new_name
self.workflow.store(comment=_('Change name of status %s') % new_name)
return redirect('.')
self.html_top(title=_('Edit Workflow Status'))
get_response().breadcrumb.append(('edit', _('Edit')))
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('Edit Workflow Status')
r += form.render()
return r.getvalue()
def reassign(self):
carddefs = [x for x in CardDef.select(lightweight=True) if x.workflow_id == self.workflow.id]
formdefs = [x for x in FormDef.select(lightweight=True) if x.workflow_id == self.workflow.id]
if formdefs and carddefs:
remove_option_label = _('Remove these cards/forms')
change_option_label = _('Change these cards/forms status to "%s"')
description = _('There are forms or cards set to this status.')
elif carddefs:
remove_option_label = _('Remove these cards')
change_option_label = _('Change these cards status to "%s"')
description = _('There are cards set to this status.')
else:
remove_option_label = _('Remove these forms')
change_option_label = _('Change these forms status to "%s"')
description = _('There are forms set to this status.')
options = [(None, _('Do nothing')), ('remove', remove_option_label)]
for status in self.workflow.get_waitpoint_status():
if status.id == self.status.id:
continue
options.append(('reassign-%s' % status.id, change_option_label % status.name))
form = Form(enctype='multipart/form-data')
form.add(SingleSelectWidget, 'action', title=_('Pick an Action'), options=options)
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
if form.get_submit() == 'cancel':
return redirect('.')
if form.is_submitted() and not form.get_widget('action').parse():
return redirect('.')
if not form.is_submitted() or form.has_errors():
get_response().breadcrumb.append(('reassign', _('Delete / Reassign')))
self.html_top(title=_('Delete Status / Reassign'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s %s</h2>') % (_('Deleting Status:'), self.status.name)
r += htmltext('<div class="remove-status-form">')
r += htmltext('<p>%s %s</p>') % (
description,
_('They need to be changed before this status can be deleted.'),
)
r += htmltext('</p>')
r += htmltext('<ul>')
for formdef in itertools.chain(formdefs, carddefs):
count = formdef.data_class().count([Equal('status', 'wf-%s' % self.status.id)])
if count:
r += htmltext('<li>%s%s %s %s</li>') % (
formdef.name,
_(':'),
count,
formdef.item_name if count < 2 else formdef.item_name_plural,
)
r += htmltext('</ul>')
r += form.render()
r += htmltext('</div>')
return r.getvalue()
else:
self.submit_reassign(form)
del self.workflow.possible_status[self.workflow.possible_status.index(self.status)]
self.workflow.store(comment=_('Removal of status %s') % self.status.name)
return redirect('../..')
def submit_reassign(self, form):
nb_forms = 0
action = form.get_widget('action').parse()
if action.startswith('reassign-'):
new_status = 'wf-%s' % str(action)[9:]
for formdef in itertools.chain(FormDef.select(), CardDef.select()):
if formdef.workflow_id != self.workflow.id:
continue
for item in formdef.data_class().get_with_indexed_value('status', 'wf-%s' % self.status.id):
nb_forms += 1
if action == 'remove':
item.remove_self()
else:
item.status = new_status
evo = Evolution()
evo.time = time.localtime()
evo.status = new_status
evo.comment = str(_('Administrator reassigned status'))
if not item.evolution:
item.evolution = []
item.evolution.append(evo)
item.store()
# delete all (old) status references in evolutions
for item in formdef.data_class().select():
if item.evolution:
modified = False
for evo in item.evolution:
if evo.status == self.status:
evo.status = None
modified = True
if modified:
item.store()
def display(self):
form = Form(enctype='multipart/form-data')
form.add(
CheckboxWidget,
'hide_status_from_user',
title=_('Hide status from user'),
value=bool(self.status.get_visibility_restricted_roles()),
)
form.add(ColourWidget, 'colour', title=_('Colour in backoffice'), value=self.status.colour)
form.add(
StringWidget,
'extra_css_class',
title=_('Extra CSS for frontoffice style'),
value=self.status.extra_css_class,
)
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('.')
if form.is_submitted() and not form.has_errors():
hide_status = form.get_widget('hide_status_from_user').parse()
if hide_status:
self.status.visibility = list(self.workflow.roles.keys())
else:
self.status.visibility = None
self.status.colour = form.get_widget('colour').parse() or 'ffffff'
self.status.extra_css_class = form.get_widget('extra_css_class').parse()
self.workflow.store(comment=_('Change in display options'))
return redirect('.')
self.html_top(title=_('Change Display Settings'))
get_response().breadcrumb.append(('display', _('Display Settings')))
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('Change Display Settings')
r += form.render()
return r.getvalue()
def endpoint(self):
form = Form(enctype='multipart/form-data')
form.add(
CheckboxWidget,
'force_terminal_status',
title=_('Force Terminal Status'),
value=(self.status.forced_endpoint is True),
)
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('.')
if form.is_submitted() and not form.has_errors():
self.status.forced_endpoint = form.get_widget('force_terminal_status').parse()
self.workflow.store(comment=_('Change of terminal status option'))
return redirect('.')
self.html_top(title=_('Edit Terminal Status'))
get_response().breadcrumb.append(('endpoint', _('Terminal Status')))
return form.render()
def backoffice_info_text(self):
form = Form(enctype='multipart/form-data')
form.add(
WysiwygTextWidget,
'backoffice_info_text',
title=_('Information text for backoffice'),
value=self.status.backoffice_info_text,
)
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('.')
if form.is_submitted() and not form.has_errors():
self.status.backoffice_info_text = form.get_widget('backoffice_info_text').parse()
self.workflow.store(comment=_('Change in backoffice info text'))
return redirect('.')
self.html_top(title=_('Edit Backoffice Information Text'))
get_response().breadcrumb.append(('backoffice_info_text', _('Backoffice Information Text')))
return form.render()
class WorkflowStatusDirectory(Directory):
_q_exports = ['']
def __init__(self, workflow, html_top):
self.workflow = workflow
self.html_top = html_top
def _q_lookup(self, component):
return WorkflowStatusPage(self.workflow, component, self.html_top)
def _q_index(self):
return redirect('..')
class WorkflowVariableWidget(CompositeWidget):
def __init__(self, name, value=None, workflow=None, **kwargs):
CompositeWidget.__init__(self, name, **kwargs)
if value and '*' in value:
varname = None
else:
varname = value
self.add(VarnameWidget, 'name', render_br=False, value=varname)
options = []
if workflow:
excluded_parameters = ['backoffice_info_text']
for status in workflow.possible_status:
for item in status.items:
prefix = '%s*%s*' % (status.id, item.id)
parameters = [
x
for x in item.get_parameters()
if not getattr(item, x) and x not in excluded_parameters
]
label = getattr(item, 'label', None) or item.description
for parameter in parameters:
key = prefix + parameter
fake_form = Form()
item.add_parameters_widgets(fake_form, [parameter], orig='variable_widget')
if not fake_form.widgets:
continue
parameter_label = fake_form.widgets[0].title
option_value = '%s / %s / %s' % (status.name, label, parameter_label)
options.append((key, option_value, key))
if not options:
return
options = [('', '---', '')] + options
self.widgets.append(
HtmlWidget(_('or you can use this field to directly replace a workflow parameter:'))
)
self.add(
SingleSelectWidget,
'select',
options=options,
value=value,
hint=_('This takes priority over a variable name'),
)
def _parse(self, request):
super()._parse(request)
if self.get('select'):
self.value = self.get('select')
elif self.get('name'):
self.value = self.get('name')
class WorkflowVariablesFieldDefPage(FieldDefPage):
section = 'workflows'
blacklisted_attributes = ['condition', 'prefill', 'display_locations']
def form(self):
form = super().form()
form.remove('varname')
form.add(
WorkflowVariableWidget,
'varname',
title=_('Variable'),
value=self.field.varname,
advanced=False,
required=True,
workflow=self.objectdef.workflow,
)
return form
class WorkflowBackofficeFieldDefPage(FieldDefPage):
section = 'workflows'
blacklisted_attributes = ['condition']
def form(self):
form = super().form()
form.remove('prefill')
display_locations = form.get_widget('display_locations')
if display_locations:
# remove validation page from choices
display_locations.options = display_locations.options[1:]
return form
def schedule_statistics_data_update(self):
formdefs = [
x
for x in FormDef.select(lightweight=True) + CardDef.select(lightweight=True)
if x.workflow_id == self.objectdef.workflow.id
]
get_response().add_after_job(UpdateStatisticsDataAfterJob(formdefs=formdefs))
class WorkflowVariablesFieldsDirectory(FieldsDirectory):
_q_exports = ['', 'update_order', 'new']
section = 'workflows'
field_def_page_class = WorkflowVariablesFieldDefPage
support_import = False
blacklisted_types = ['page', 'blocks', 'computed']
field_var_prefix = 'form_option_'
readonly_message = _('This workflow is readonly.')
new_field_history_message = _('New workflow option "%s"')
field_count_message = _('This workflow contains %d variables.')
field_over_count_message = _('This workflow contains more than %d variables.')
fields_count_total_soft_limit = 40
fields_count_total_hard_limit = 80
def index_top(self):
r = TemplateIO(html=True)
r += htmltext('<h2>%s - %s - %s</h2>') % (_('Workflow'), self.objectdef.name, _('Variables'))
r += get_session().display_message()
if not self.objectdef.fields:
r += htmltext('<p>%s</p>') % _('There are not yet any variables.')
return r.getvalue()
def index_bottom(self):
pass
class WorkflowBackofficeFieldsDirectory(FieldsDirectory):
_q_exports = ['', 'update_order', 'new']
section = 'workflows'
field_def_page_class = WorkflowBackofficeFieldDefPage
support_import = False
blacklisted_types = ['page', 'computed']
blacklisted_attributes = ['condition']
field_var_prefix = 'form_var_'
readonly_message = _('This workflow is readonly.')
new_field_history_message = _('New backoffice field "%s"')
field_count_message = _('This workflow contains %d backoffice fields.')
field_over_count_message = _('This workflow contains more than %d backoffice fields.')
fields_count_total_soft_limit = 40
fields_count_total_hard_limit = 80
def index_top(self):
r = TemplateIO(html=True)
r += htmltext('<h2>%s - %s - %s</h2>') % (_('Workflow'), self.objectdef.name, _('Backoffice Fields'))
r += get_session().display_message()
if not self.objectdef.fields:
r += htmltext('<p>%s</p>') % _('There are not yet any backoffice fields.')
return r.getvalue()
def index_bottom(self):
pass
class VariablesDirectory(Directory):
_q_exports = ['', 'fields']
def __init__(self, workflow):
self.workflow = workflow
def _q_index(self):
return redirect('fields/')
def _q_traverse(self, path):
get_response().breadcrumb.append(('variables/', _('Variables')))
self.fields = WorkflowVariablesFieldsDirectory(WorkflowVariablesFieldsFormDef(self.workflow))
return Directory._q_traverse(self, path)
class BackofficeFieldsDirectory(Directory):
_q_exports = ['', 'fields']
def __init__(self, workflow):
self.workflow = workflow
def _q_index(self):
return redirect('fields/')
def _q_traverse(self, path):
get_response().breadcrumb.append(('backoffice-fields/', _('Backoffice Fields')))
self.fields = WorkflowBackofficeFieldsDirectory(WorkflowBackofficeFieldsFormDef(self.workflow))
return Directory._q_traverse(self, path)
class FunctionsDirectory(Directory):
_q_exports = ['', 'new']
def __init__(self, workflow):
self.workflow = workflow
def _q_traverse(self, path):
get_response().breadcrumb.append(('functions/', _('Functions')))
return Directory._q_traverse(self, path)
def new(self):
form = Form(enctype='multipart/form-data')
form.add(StringWidget, 'name', title=_('Name'), required=True, size=50)
form.add_submit('submit', _('Add'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('..')
if form.is_submitted() and not form.has_errors():
name = form.get_widget('name').parse()
base_slug = slug = '_%s' % misc.simplify(name)
base_idx = 2
while slug in self.workflow.roles:
slug = '%s-%s' % (base_slug, base_idx)
base_idx += 1
self.workflow.roles[slug] = name
# go over all existing status and update their visibility
# restrictions if necessary
for status in self.workflow.possible_status:
if status.get_visibility_restricted_roles():
status.visibility = list(self.workflow.roles.keys())
self.workflow.store(comment=_('New function "%s"') % name)
return redirect('..')
get_response().breadcrumb.append(('new', _('New Function')))
html_top('workflows', title=_('New Function'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('New Function')
r += form.render()
return r.getvalue()
def _q_lookup(self, component):
function = self.workflow.roles.get('_' + component)
if not function:
raise errors.TraversalError()
form = Form(enctype='multipart/form-data')
form.add(StringWidget, 'name', title=_('Name'), required=True, size=50, value=function)
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
if component != 'receiver':
# do not allow removing the standard "receiver" function.
# TODO: do not display "delete" for functions that are currently in
# use.
form.add_submit('delete', _('Delete'))
if form.get_widget('cancel').parse():
return redirect('..')
if form.get_submit() == 'delete':
slug = '_%s' % component
name = self.workflow.roles[slug]
del self.workflow.roles[slug]
self.workflow.store(comment=_('Deletion of function "%s"') % name)
return redirect('..')
if form.is_submitted() and not form.has_errors():
name = form.get_widget('name').parse()
slug = '_%s' % component
self.workflow.roles[slug] = name
self.workflow.store(comment=_('Rename of function "%s"') % name)
return redirect('..')
get_response().breadcrumb.append(('new', _('Edit Function')))
html_top('workflows', title=_('Edit Function'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('Edit Function')
r += form.render()
return r.getvalue()
def _q_index(self):
return redirect('..')
class CriticalityLevelsDirectory(Directory):
_q_exports = ['', 'new']
def __init__(self, workflow):
self.workflow = workflow
def _q_traverse(self, path):
get_response().breadcrumb.append(('criticality-levels/', _('Criticality Levels')))
return Directory._q_traverse(self, path)
def new(self):
currentlevels = self.workflow.criticality_levels or []
default_colours = ['FFFFFF', 'FFFF00', 'FF9900', 'FF6600', 'FF0000']
try:
default_colour = default_colours[len(currentlevels)]
except IndexError:
default_colour = '000000'
form = Form(enctype='multipart/form-data')
form.add(StringWidget, 'name', title=_('Name'), required=True, size=50)
form.add(ColourWidget, 'colour', title=_('Colour'), required=False, value=default_colour)
form.add_submit('submit', _('Add'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('..')
if form.is_submitted() and not form.has_errors():
if not self.workflow.criticality_levels:
self.workflow.criticality_levels = []
level = WorkflowCriticalityLevel()
level.name = form.get_widget('name').parse()
level.colour = form.get_widget('colour').parse()
self.workflow.criticality_levels.append(level)
self.workflow.store(comment=_('New criticality level'))
return redirect('..')
get_response().breadcrumb.append(('new', _('New Criticality Level')))
html_top('workflows', title=_('New Criticality level'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('New Criticality Level')
r += form.render()
return r.getvalue()
def _q_lookup(self, component):
for level in self.workflow.criticality_levels or []:
if level.id == component:
break
else:
raise errors.TraversalError()
form = Form(enctype='multipart/form-data')
form.add(StringWidget, 'name', title=_('Name'), required=True, size=50, value=level.name)
form.add(ColourWidget, 'colour', title=_('Colour'), required=False, value=level.colour)
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
form.add_submit('delete-level', _('Delete'))
if form.get_widget('cancel').parse():
return redirect('..')
if form.get_submit() == 'delete-level':
self.workflow.criticality_levels.remove(level)
self.workflow.store(comment=_('Deletion of criticality level'))
return redirect('..')
if form.is_submitted() and not form.has_errors():
level.name = form.get_widget('name').parse()
level.colour = form.get_widget('colour').parse()
self.workflow.store(comment=_('Change of name of criticality level'))
return redirect('..')
get_response().breadcrumb.append(('new', _('Edit Criticality Level')))
html_top('workflows', title=_('Edit Criticality Level'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('Edit Criticality Level')
r += form.render()
return r.getvalue()
def _q_index(self):
return redirect('..')
class GlobalActionPage(WorkflowStatusPage):
_q_exports = [
'',
'new',
'delete',
'newitem',
('items', 'items_dir'),
'update_order',
'edit',
'newtrigger',
('triggers', 'triggers_dir'),
'update_triggers_order',
('backoffice-info-text', 'backoffice_info_text'),
]
def __init__(self, workflow, action_id, html_top):
self.html_top = html_top
self.workflow = workflow
try:
self.action = [x for x in self.workflow.global_actions if x.id == action_id][0]
except IndexError:
raise errors.TraversalError()
self.status = self.action
self.items_dir = GlobalActionItemsDir(workflow, self.action, html_top)
self.triggers_dir = GlobalActionTriggersDir(workflow, self.action, html_top)
def _q_traverse(self, path):
get_response().breadcrumb.append(
('global-actions/%s/' % self.action.id, _('Global Action: %s') % self.action.name)
)
return Directory._q_traverse(self, path)
def is_item_available(self, item):
return not item.is_disabled() and item.is_available(self.workflow) and item.ok_in_global_action
def _q_index(self):
self.html_top('%s - %s' % (_('Workflow'), self.workflow.name))
r = TemplateIO(html=True)
get_response().add_javascript(['jquery.js', 'jquery-ui.js', 'biglist.js', 'qommon.wysiwyg.js'])
r += htmltext('<h2>%s</h2>') % self.action.name
r += get_session().display_message()
r += htmltext('<div class="bo-block">')
r += htmltext('<h2>%s</h2>') % _('Actions')
if not self.action.items:
r += htmltext('<p>%s</p>') % _('There are not yet any actions.')
else:
if self.workflow.is_readonly():
r += htmltext('<ul id="items-list" class="biglist readonly">')
else:
r += htmltext('<p class="items">')
r += str(_('Use drag and drop with the handles to reorder actions.'))
r += htmltext('</p>')
r += htmltext('<ul id="items-list" class="biglist sortable">')
for item in self.action.items:
r += htmltext('<li class="biglistitem" id="itemId_%s">') % item.id
if self.workflow.is_readonly():
r += item.render_as_line()
else:
if hasattr(item, 'fill_admin_form'):
r += htmltext('<a href="items/%s/">%s</a>') % (item.id, item.render_as_line())
else:
r += item.render_as_line()
r += htmltext('<p class="commands">')
if hasattr(item, 'fill_admin_form'):
r += command_icon('items/%s/' % item.id, 'edit')
r += command_icon('items/%s/delete' % item.id, 'remove', popup=True)
r += htmltext('</p>')
r += htmltext('</li>')
r += htmltext('</ul>')
r += htmltext('</div>') # bo-block
sortable = 'sortable readonly' if self.workflow.is_readonly() else 'sortable'
r += htmltext('<div class="bo-block">')
r += htmltext('<h2>%s</h2>') % _('Triggers')
r += (
htmltext('<ul id="items-list" class="biglist %s" data-order-function="update_triggers_order">')
% sortable
)
for trigger in self.action.triggers:
r += htmltext('<li class="biglistitem" id="trigId_%s">') % trigger.id
r += htmltext('<a rel="popup" href="triggers/%s/">%s</a>') % (
trigger.id,
trigger.render_as_line(),
)
r += htmltext('<p class="commands">')
r += command_icon('triggers/%s/' % trigger.id, 'edit', popup=True)
r += command_icon('triggers/%s/delete' % trigger.id, 'remove', popup=True)
r += htmltext('</li>')
r += htmltext('</ul>')
r += htmltext('</div>') # bo-block
r += htmltext('<p><a href="../../">%s</a></p>') % _('Back to workflow main page')
get_response().filter['sidebar'] = self.get_sidebar()
return r.getvalue()
def delete(self):
form = Form(enctype='multipart/form-data')
form.widgets.append(HtmlWidget('<p>%s</p>' % _('You are about to remove an action.')))
form.add_submit('delete', _('Delete'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('.')
if not form.is_submitted() or form.has_errors():
get_response().breadcrumb.append(('delete', _('Delete')))
self.html_top(title=_('Delete Action'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s %s</h2>') % (_('Deleting Action:'), self.action.name)
r += form.render()
return r.getvalue()
del self.workflow.global_actions[self.workflow.global_actions.index(self.action)]
self.workflow.store(comment=_('Deletion of global action "%s"') % self.action.name)
return redirect('../../')
def edit(self):
form = Form(enctype='multipart/form-data')
form.add(StringWidget, 'name', title=_('Action Name'), required=True, size=30, value=self.action.name)
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('.')
if form.is_submitted() and not form.has_errors():
new_name = str(form.get_widget('name').parse())
if [x for x in self.workflow.global_actions if x.name == new_name]:
form.get_widget('name').set_error(_('There is already an action with that name.'))
else:
self.action.name = new_name
self.workflow.store(comment=_('Change in global action'))
return redirect('.')
self.html_top(title=_('Edit Action Name'))
get_response().breadcrumb.append(('edit', _('Edit')))
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('Edit Action Name')
r += form.render()
return r.getvalue()
def get_sidebar(self):
get_response().add_javascript(['popup.js', 'jquery.colourpicker.js'])
r = TemplateIO(html=True)
if self.workflow.is_default():
r += htmltext('<p>')
r += str(
_(
'''This is the default workflow, you cannot edit it but you can
duplicate it to base your own workflow on it.'''
)
)
r += htmltext('</p>')
else:
r += htmltext('<ul id="sidebar-actions">')
r += htmltext('<li><a href="edit" rel="popup">%s</a></li>') % _('Change Action Name')
r += htmltext('<li><a href="backoffice-info-text" rel="popup">%s</a></li>') % _(
'Change Backoffice Information Text'
)
r += htmltext('<li><a href="delete" rel="popup">%s</a></li>') % _('Delete')
r += htmltext('</ul>')
r += htmltext('<div id="new-field">')
r += htmltext('<h3>%s</h3>') % _('New Item')
r += self.get_new_item_form().render()
r += htmltext('</div>')
r += htmltext('<div id="new-trigger">')
r += htmltext('<h3>%s</h3>') % _('New Trigger')
r += self.get_new_trigger_form().render()
r += htmltext('</div>')
return r.getvalue()
def update_triggers_order(self):
request = get_request()
new_order = request.form['order'].strip(';').split(';')
self.action.triggers = [[x for x in self.action.triggers if x.id == y][0] for y in new_order]
self.workflow.store(comment=_('Change in trigger order'))
return 'ok'
def newtrigger(self):
form = self.get_new_trigger_form()
if not form.is_submitted() or form.has_errors():
get_session().message = ('error', _('Submitted form was not filled properly.'))
return redirect('.')
if form.get_widget('type').parse():
self.action.append_trigger(form.get_widget('type').parse())
else:
get_session().message = ('error', _('Submitted form was not filled properly.'))
return redirect('.')
self.workflow.store(comment=_('New global action trigger'))
return redirect('.')
def get_new_trigger_form(self):
form = Form(enctype='multipart/form-data', action='newtrigger')
available_triggers = [
('timeout', _('Automatic')),
('manual', _('Manual')),
('webservice', _('External call')),
]
form.add(SingleSelectWidget, 'type', title=_('Type'), required=True, options=available_triggers)
form.add_submit('submit', _('Add'))
return form
class GlobalActionsDirectory(Directory):
_q_exports = ['', 'new']
def __init__(self, workflow, html_top):
self.workflow = workflow
self.html_top = html_top
def _q_lookup(self, component):
return GlobalActionPage(self.workflow, component, self.html_top)
def _q_index(self):
return redirect('..')
def new(self):
form = Form(enctype='multipart/form-data')
form.add(StringWidget, 'name', title=_('Name'), required=True, size=50)
form.add_submit('submit', _('Add'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('..')
if form.is_submitted() and not form.has_errors():
name = form.get_widget('name').parse()
try:
action = self.workflow.add_global_action(name)
except DuplicateGlobalActionNameError:
form.get_widget('name').set_error(_('There is already an action with that name.'))
else:
self.workflow.store(comment=_('New global action'))
return redirect('%s/' % action.id)
get_response().breadcrumb.append(('new', _('New Global Action')))
html_top('workflows', title=_('New Global Action'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('New Global Action')
r += form.render()
return r.getvalue()
class WorkflowPage(Directory):
_q_exports = [
'',
'edit',
'category',
'delete',
'newstatus',
('status', 'status_dir'),
'update_order',
'duplicate',
'export',
'svg',
('variables', 'variables_dir'),
'inspect',
('schema.svg', 'svg'),
('backoffice-fields', 'backoffice_fields_dir'),
'update_actions_order',
'update_criticality_levels_order',
('functions', 'functions_dir'),
('global-actions', 'global_actions_dir'),
('criticality-levels', 'criticality_levels_dir'),
('logged-errors', 'logged_errors_dir'),
('history', 'snapshots_dir'),
('fullscreen'),
]
do_not_call_in_templates = True
def __init__(self, component, instance=None):
if instance:
self.workflow = instance
elif component == '_carddef_default':
self.workflow = CardDef.get_default_workflow()
else:
try:
self.workflow = Workflow.get(component)
except KeyError:
raise errors.TraversalError()
self.workflow_ui = WorkflowUI(self.workflow)
self.status_dir = WorkflowStatusDirectory(self.workflow, self.html_top)
self.variables_dir = VariablesDirectory(self.workflow)
self.backoffice_fields_dir = BackofficeFieldsDirectory(self.workflow)
self.functions_dir = FunctionsDirectory(self.workflow)
self.global_actions_dir = GlobalActionsDirectory(self.workflow, self.html_top)
self.criticality_levels_dir = CriticalityLevelsDirectory(self.workflow)
self.logged_errors_dir = LoggedErrorsDirectory(parent_dir=self, workflow_id=self.workflow.id)
self.snapshots_dir = SnapshotsDirectory(self.workflow)
if component:
get_response().breadcrumb.append((component + '/', self.workflow.name))
def html_top(self, title):
return html_top('workflows', title)
def category(self):
category_options = self.workflow_ui.get_categories()
if is_global_accessible():
category_options = [(None, '---', '')] + list(category_options)
form = Form(enctype='multipart/form-data')
form.widgets.append(HtmlWidget('<p>%s</p>' % _('Select a category for this workflow.')))
form.add(
SingleSelectWidget,
'category_id',
title=_('Category'),
value=self.workflow.category_id,
options=category_options,
)
if not self.workflow.is_readonly():
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('..')
if form.is_submitted() and not form.has_errors():
widget = form.get_widget('category_id')
old_value = self.workflow.category_id
new_value = widget.parse()
if new_value != old_value:
self.workflow.category_id = new_value
self.workflow.store(comment=_('Change of category'))
return redirect('.')
html_top('workflows', title=self.workflow.name)
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('Category')
r += form.render()
return r.getvalue()
def last_modification_block(self):
return utils.last_modification_block(obj=self.workflow)
def graphviz(self):
return graphviz(self.workflow, include=True)
def fullscreen(self):
get_response().add_javascript(['jquery.js', 'svg-pan-zoom.js', 'qommon.admin.js'])
context = {
'view': self,
'workflow': self.workflow,
}
return template.QommonTemplateResponse(
templates=['wcs/backoffice/workflow-fullscreen-schema.html'],
context=context,
is_django_native=True,
)
def _q_index(self):
self.html_top(title=_('Workflow - %s') % self.workflow.name)
get_response().filter['sidebar'] = self.get_sidebar()
get_response().add_javascript(
['jquery.js', 'jquery-ui.js', 'biglist.js', 'svg-pan-zoom.js', 'jquery.colourpicker.js']
)
return template.QommonTemplateResponse(
templates=['wcs/backoffice/workflow.html'],
context={'view': self, 'workflow': self.workflow},
)
def get_sidebar(self):
r = TemplateIO(html=True)
if self.workflow.is_default():
r += htmltext('<p>')
r += str(
_(
'''This is the default workflow, you cannot edit it but you can
duplicate it to base your own workflow on it.'''
)
)
r += htmltext('</p>')
elif self.workflow.is_readonly():
r += htmltext('<div class="infonotice"><p>%s</p></div>') % _('This workflow is readonly.')
r += utils.snapshot_info_block(snapshot=self.workflow.snapshot_object)
return r.getvalue()
r += htmltext('<ul id="sidebar-actions">')
if not self.workflow.is_readonly():
r += htmltext('<li><a href="delete" rel="popup">%s</a></li>') % _('Delete')
r += htmltext('<li><a rel="popup" href="duplicate">%s</a></li>') % _('Duplicate')
r += htmltext('<li><a href="export">%s</a></li>') % _('Export')
if get_publisher().snapshot_class and not self.workflow.is_default():
r += htmltext('<li><a rel="popup" href="history/save">%s</a></li>') % _('Save snapshot')
r += htmltext('<li><a href="history/">%s</a></li>') % _('History')
r += htmltext('<li><a href="inspect">%s</a></li>') % _('Inspector')
r += htmltext('</ul>')
if not self.workflow.is_readonly():
r += self.get_new_status_form()
r += LoggedErrorsDirectory.errors_block(workflow_id=self.workflow.id)
return r.getvalue()
def inspect(self):
self.html_top(self.workflow.name)
get_response().breadcrumb.append(('inspect', _('Inspector')))
return self.render_inspect()
def render_inspect(self):
context = {'workflow': self.workflow, 'view': self}
return template.QommonTemplateResponse(
templates=['wcs/backoffice/workflow-inspect.html'], context=context
)
def svg(self):
response = get_response()
response.set_content_type('image/svg+xml')
root_url = get_publisher().get_application_static_files_root_url()
css = root_url + get_publisher().qommon_static_dir + get_publisher().qommon_admin_css
return graphviz(self.workflow, include=False).replace(
'?>', '?>\n<?xml-stylesheet href="%s" type="text/css"?>\n' % css
)
def export(self):
return misc.xml_response(
self.workflow,
filename='workflow-%s.wcs' % misc.simplify(self.workflow.name),
content_type='application/x-wcs-workflow',
)
def get_new_status_form(self):
r = TemplateIO(html=True)
r += htmltext('<div id="new-field">')
r += htmltext('<h3>%s</h3>') % _('New Status')
form = Form(enctype='multipart/form-data', action='newstatus')
form.add(StringWidget, 'name', title=_('Name'), required=True, size=50)
form.add_submit('submit', _('Add'))
r += form.render()
r += htmltext('</div>')
return r.getvalue()
def update_order(self):
get_response().set_content_type('application/json')
new_possible_status = update_order(self.workflow.possible_status)
if new_possible_status is None:
return json.dumps({'err': 1})
self.workflow.possible_status = new_possible_status
self.workflow.store(comment=_('Change in status order'))
return json.dumps({'err': 0})
def update_actions_order(self):
get_response().set_content_type('application/json')
new_global_actions = update_order(self.workflow.global_actions)
if new_global_actions is None:
return json.dumps({'err': 1})
self.workflow.global_actions = new_global_actions
self.workflow.store(comment=_('Change in global actions order'))
return json.dumps({'err': 0})
def update_criticality_levels_order(self):
get_response().set_content_type('application/json')
new_criticality_levels = update_order(self.workflow.criticality_levels)
if new_criticality_levels is None:
return json.dumps({'err': 1})
self.workflow.criticality_levels = new_criticality_levels
self.workflow.store(comment=_('Change in criticality levels order'))
return json.dumps({'err': 0})
def newstatus(self):
form = Form(enctype='multipart/form-data', action='newstatus')
form.add(StringWidget, 'name', title=_('Name'), size=50)
if not form.is_submitted() or form.has_errors():
get_session().message = ('error', _('Submitted form was not filled properly.'))
return redirect('.')
if form.get_widget('name').parse():
try:
self.workflow.add_status(form.get_widget('name').parse())
except DuplicateStatusNameError:
get_session().message = ('error', _('There is already a status with that name.'))
return redirect('.')
else:
get_session().message = ('error', _('Submitted form was not filled properly.'))
return redirect('.')
self.workflow.store(comment=_('New status "%s"') % form.get_widget('name').parse())
return redirect('.')
def edit(self):
form = self.workflow_ui.form_edit()
if form.get_widget('cancel').parse():
return redirect('.')
if form.is_submitted() and not form.has_errors():
try:
self.workflow_ui.submit_form(form)
except ValueError:
pass
else:
return redirect('.')
self.html_top(title=_('Edit Workflow'))
r = TemplateIO(html=True)
get_response().breadcrumb.append(('edit', _('Edit')))
r += htmltext('<h2>%s</h2>') % _('Edit Workflow')
r += form.render()
return r.getvalue()
def delete(self):
form = Form(enctype="multipart/form-data")
from itertools import chain
for objdef in chain(FormDef.select(), CardDef.select()):
if objdef.workflow_id == self.workflow.id:
form.widgets.append(
HtmlWidget('<p>%s</p>' % _("This workflow is currently in use, you cannot remove it."))
)
form.add_submit("cancel", _("Cancel"))
break
else:
form.widgets.append(
HtmlWidget('<p>%s</p>' % _("You are about to irrevocably delete this workflow."))
)
form.add_submit('delete', _('Delete'))
form.add_submit("cancel", _("Cancel"))
if form.get_widget('cancel').parse():
return redirect('.')
if not form.is_submitted() or form.has_errors():
get_response().breadcrumb.append(('delete', _('Delete')))
self.html_top(title=_('Delete Workflow'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s %s</h2>') % (_('Deleting Workflow:'), self.workflow.name)
r += form.render()
return r.getvalue()
else:
self.workflow.remove_self()
return redirect('..')
def duplicate(self):
form = Form(enctype='multipart/form-data')
name_widget = form.add(StringWidget, 'name', title=_('Name'), required=True, size=30)
form.add_submit('submit', _('Submit'))
form.add_submit('cancel', _('Cancel'))
if form.get_widget('cancel').parse():
return redirect('.')
if not is_global_accessible() and self.workflow.id in ('_default', '_carddef_default'):
category_options = self.workflow_ui.get_categories()
form.add(
SingleSelectWidget,
'category_id',
title=_('Category'),
options=category_options,
)
if not form.is_submitted():
original_name = self.workflow_ui.workflow.name
new_name = '%s %s' % (original_name, _('(copy)'))
names = [x.name for x in Workflow.select()]
no = 2
while new_name in names:
new_name = _('%(name)s (copy %(no)d)') % {'name': original_name, 'no': no}
no += 1
name_widget.set_value(new_name)
if form.is_submitted() and not form.has_errors():
try:
return self.duplicate_submit(form)
except ValueError:
pass
self.html_top(title=_('Duplicate Workflow'))
r = TemplateIO(html=True)
get_response().breadcrumb.append(('duplicate', _('Duplicate')))
r += htmltext('<h2>%s</h2>') % _('Duplicate Workflow')
r += form.render()
return r.getvalue()
def duplicate_submit(self, form):
# duplicate via xml export and import to get clean copy of
# inner actions.
tree = self.workflow_ui.workflow.export_to_xml(include_id=True)
try:
new_workflow = Workflow.import_from_xml_tree(tree, check_datasources=False)
except WorkflowImportError as e:
reason = _(e.msg) % e.msg_args
if hasattr(e, 'render'):
reason = e.render()
elif e.details:
reason += ' [%s]' % e.details
form.add_global_errors([reason])
raise ValueError()
new_workflow.name = form.get_widget('name').parse()
new_workflow.slug = None
if form.get_widget('category_id'):
new_workflow.category_id = form.get_widget('category_id').parse()
new_workflow.store()
return redirect('../%s/' % new_workflow.id)
class NamedDataSourcesDirectoryInWorkflows(NamedDataSourcesDirectory):
pass
class WorkflowsDirectory(Directory):
_q_exports = [
'',
'new',
'categories',
('import', 'p_import'),
('data-sources', 'data_sources'),
('mail-templates', 'mail_templates'),
('comment-templates', 'comment_templates'),
]
data_sources = NamedDataSourcesDirectoryInWorkflows()
mail_templates = MailTemplatesDirectory()
comment_templates = CommentTemplatesDirectory()
category_class = WorkflowCategory
categories = WorkflowCategoriesDirectory()
def html_top(self, title):
return html_top('workflows', title)
def _q_traverse(self, path):
get_response().breadcrumb.append(('workflows/', _('Workflows')))
return super()._q_traverse(path)
def is_accessible(self, user):
if is_global_accessible():
return True
# check for access to specific categories
user_roles = set(user.get_roles())
for category in WorkflowCategory.select():
management_roles = {x.id for x in getattr(category, 'management_roles') or []}
if management_roles and user_roles.intersection(management_roles):
return True
return False
def _q_index(self):
self.html_top(title=_('Workflows'))
r = TemplateIO(html=True)
r += htmltext('<div id="appbar">')
r += htmltext('<h2>%s</h2>') % _('Workflows')
r += htmltext('<span class="actions">')
if is_global_accessible():
r += htmltext('<a href="comment-templates/">%s</a>') % _('Comment Templates')
r += htmltext('<a href="mail-templates/">%s</a>') % _('Mail Templates')
r += htmltext('<a href="data-sources/">%s</a>') % _('Data sources')
r += htmltext('<a href="categories/">%s</a>') % _('Categories')
r += htmltext('<a href="import" rel="popup">%s</a>') % _('Import')
r += htmltext('<a class="new-item" rel="popup" href="new">%s</a>') % _('New Workflow')
r += htmltext('</span>')
r += htmltext('</div>')
formdef_workflows = [Workflow.get_default_workflow()]
workflows_in_formdef_use = set(formdef_workflows[0].id)
for formdef in FormDef.select(lightweight=True):
workflows_in_formdef_use.add(str(formdef.workflow_id))
carddef_workflows = [CardDef.get_default_workflow()]
workflows_in_carddef_use = set(carddef_workflows[0].id)
for carddef in CardDef.select(lightweight=True):
workflows_in_carddef_use.add(str(carddef.workflow_id))
shared_workflows = []
unused_workflows = []
workflows = formdef_workflows + carddef_workflows
for workflow in Workflow.select(order_by='name'):
if str(workflow.id) in workflows_in_formdef_use and str(workflow.id) in workflows_in_carddef_use:
shared_workflows.append(workflow)
elif str(workflow.id) in workflows_in_formdef_use:
formdef_workflows.append(workflow)
elif str(workflow.id) in workflows_in_carddef_use:
carddef_workflows.append(workflow)
if str(workflow.id) in workflows_in_formdef_use or str(workflow.id) in workflows_in_carddef_use:
workflows.append(workflow)
else:
unused_workflows.append(workflow)
if is_global_accessible():
categories = WorkflowCategory.select()
else:
categories = []
user_roles = set(get_request().user.get_roles())
for category in WorkflowCategory.select():
management_roles = {x.id for x in getattr(category, 'management_roles') or []}
if management_roles and user_roles.intersection(management_roles):
categories.append(category)
self.category_class.sort_by_position(categories)
if categories:
default_category = WorkflowCategory('Default')
default_category.id = '_default_category'
for workflow in workflows:
if workflow.id in ('_default', '_carddef_default'):
workflow.category_id = default_category.id
categories = [default_category] + categories
if is_global_accessible():
categories = categories + [None]
def workflow_section(r, workflows):
r += htmltext('<ul class="objects-list single-links">')
for workflow in workflows:
if workflow in shared_workflows:
css_class = 'shared-workflow'
usage_label = _('Forms and card models')
elif workflow in formdef_workflows:
css_class = 'formdef-workflow'
usage_label = _('Forms')
elif workflow in carddef_workflows:
css_class = 'carddef-workflow'
usage_label = _('Card models')
else:
css_class = 'unused-workflow'
usage_label = _('Unused')
r += htmltext('<li class="%s">' % css_class)
r += htmltext('<a href="%s/">%s</a>') % (
workflow.id,
workflow.name,
)
if usage_label and carddef_workflows:
r += htmltext('<span class="badge">%s</span>') % usage_label
r += htmltext('</li>')
r += htmltext('</ul>')
for category in categories:
if category is None:
category_workflows = [x for x in workflows + unused_workflows if not x.category_id]
else:
category_workflows = [
x for x in workflows + unused_workflows if x.category_id == str(category.id)
]
if category_workflows:
if len(categories) > 1:
r += htmltext('<div class="section">')
if category is None:
r += htmltext('<h2>%s</h2>') % _('Uncategorised')
elif category.id == '_default_category':
pass # no title
else:
r += htmltext('<h2>%s</h2>') % category.name
workflow_section(r, category_workflows)
if len(categories) > 1:
r += htmltext('</div>')
return r.getvalue()
def new(self):
get_response().breadcrumb.append(('new', _('New')))
workflow_ui = WorkflowUI(None)
form = workflow_ui.form_new()
if form.get_widget('cancel').parse():
return redirect('.')
if form.is_submitted() and not form.has_errors():
try:
workflow = workflow_ui.submit_form(form)
except ValueError:
pass
else:
return redirect('%s/' % workflow.id)
self.html_top(title=_('New Workflow'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('New Workflow')
r += form.render()
return r.getvalue()
def _q_lookup(self, component):
directory = WorkflowPage(component)
if directory.workflow.id in ('_default', '_carddef_default'):
return directory
if not directory.workflow.has_user_access(get_request().user):
raise errors.AccessForbiddenError()
return directory
def p_import(self):
form = Form(enctype='multipart/form-data')
form.add(FileWidget, 'file', title=_('File'), required=False)
form.add(UrlWidget, 'url', title=_('Address'), required=False, size=50)
form.add_submit('submit', _('Import Workflow'))
form.add_submit('cancel', _('Cancel'))
if form.get_submit() == 'cancel':
return redirect('.')
if form.is_submitted() and not form.has_errors():
try:
return self.import_submit(form)
except ValueError:
pass
get_response().breadcrumb.append(('import', _('Import')))
self.html_top(title=_('Import Workflow'))
r = TemplateIO(html=True)
r += htmltext('<h2>%s</h2>') % _('Import Workflow')
r += htmltext('<p>%s</p>') % _(
'You can install a new workflow by uploading a file or by pointing to the workflow URL.'
)
r += form.render()
return r.getvalue()
def import_submit(self, form):
if form.get_widget('file').parse():
fp = form.get_widget('file').parse().fp
elif form.get_widget('url').parse():
url = form.get_widget('url').parse()
try:
fp = misc.urlopen(url)
except misc.ConnectionError as e:
form.set_error('url', _('Error loading form (%s).') % str(e))
raise ValueError()
else:
form.set_error('file', _('You have to enter a file or a URL.'))
raise ValueError()
error, reason = False, None
try:
workflow = Workflow.import_from_xml(fp)
except WorkflowImportError as e:
error = True
reason = _(e.msg) % e.msg_args
if hasattr(e, 'render'):
form.add_global_errors([e.render()])
elif e.details:
reason += ' [%s]' % e.details
except ValueError:
error = True
if not error:
global_access = is_global_accessible()
if not global_access:
management_roles = {x.id for x in getattr(workflow.category, 'management_roles', None) or []}
user_roles = set(get_request().user.get_roles())
if not user_roles.intersection(management_roles):
error = True
reason = _('unauthorized category')
if error:
if reason:
msg = _('Invalid File (%s)') % reason
else:
msg = _('Invalid File')
if form.get_widget('url').parse():
form.set_error('url', msg)
else:
form.set_error('file', msg)
raise ValueError()
initial_workflow_name = workflow.name
workflow_names = [x.name for x in Workflow.select()]
copy_no = 1
while workflow.name in workflow_names:
if copy_no == 1:
workflow.name = _('Copy of %s') % initial_workflow_name
else:
workflow.name = _('Copy of %(name)s (%(no)d)') % {
'name': initial_workflow_name,
'no': copy_no,
}
copy_no += 1
workflow.store()
get_session().message = ('info', _('This workflow has been successfully imported.'))
return redirect('%s/' % workflow.id)