514 lines
19 KiB
Python
514 lines
19 KiB
Python
# w.c.s. - web application for online forms
|
|
# Copyright (C) 2005-2019 Entr'ouvert
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import copy
|
|
import csv
|
|
import datetime
|
|
import io
|
|
import json
|
|
|
|
from quixote import get_publisher, get_request, get_response, redirect
|
|
from quixote.html import htmltext
|
|
|
|
from wcs import fields
|
|
from wcs.carddef import CardDef
|
|
from wcs.categories import CardDefCategory
|
|
|
|
from ..qommon import _, errors, ngettext, template
|
|
from ..qommon.afterjobs import AfterJob
|
|
from ..qommon.backoffice.menu import html_top
|
|
from ..qommon.form import FileWidget, Form
|
|
from .management import FormBackOfficeStatusPage, FormPage, ManagementDirectory
|
|
from .submission import FormFillPage
|
|
|
|
|
|
def get_import_csv_fields(carddef):
|
|
class UserField:
|
|
key = 'user'
|
|
id = '_user'
|
|
label = _('User (email or UUID)')
|
|
|
|
def convert_value_from_str(self, x):
|
|
return x
|
|
|
|
def set_value(self, data, value):
|
|
data['_user'] = value
|
|
|
|
# skip non-data fields
|
|
csv_fields = [x for x in (carddef.fields or []) if isinstance(x, fields.WidgetField)]
|
|
if carddef.user_support == 'optional':
|
|
return [UserField()] + csv_fields
|
|
return csv_fields
|
|
|
|
|
|
class DataManagementDirectory(ManagementDirectory):
|
|
do_not_call_in_templates = True
|
|
_q_exports = ['']
|
|
|
|
def add_breadcrumb(self):
|
|
get_response().breadcrumb.append(('data/', _('Cards')))
|
|
|
|
def is_accessible(self, user):
|
|
if not user.can_go_in_backoffice():
|
|
return False
|
|
if get_publisher().get_backoffice_root().is_global_accessible('cards') and CardDef.keys():
|
|
# open for admins as soon as there are cards
|
|
return True
|
|
# only include data management if there are accessible cards
|
|
for carddef in CardDef.select(ignore_errors=True, lightweight=True, iterator=True):
|
|
for role_id in user.get_roles():
|
|
if role_id in (carddef.backoffice_submission_roles or []):
|
|
return True
|
|
if role_id in (carddef.workflow_roles or {}).values():
|
|
return True
|
|
return False
|
|
|
|
def get_carddefs(self):
|
|
user = get_request().user
|
|
if not user:
|
|
return
|
|
carddefs = CardDef.select(order_by='name', ignore_errors=True, lightweight=True)
|
|
carddefs = [c for c in carddefs if user.is_admin or c.is_of_concern_for_user(user)]
|
|
cats = CardDefCategory.select()
|
|
CardDefCategory.sort_by_position(cats)
|
|
for c in cats + [None]:
|
|
for carddef in carddefs:
|
|
if c is None and not carddef.category_id:
|
|
yield carddef
|
|
if c is not None and carddef.category_id == c.id:
|
|
yield carddef
|
|
|
|
def _q_index(self):
|
|
html_top('data_management', _('Cards'))
|
|
if not (CardDef.exists()):
|
|
return self.empty_site_message(_('Cards'))
|
|
return template.QommonTemplateResponse(
|
|
templates=['wcs/backoffice/data-management.html'], context={'view': self}
|
|
)
|
|
|
|
def _q_lookup(self, component):
|
|
return CardPage(component)
|
|
|
|
|
|
class CardPage(FormPage):
|
|
_q_exports = [
|
|
'',
|
|
'csv',
|
|
'ods',
|
|
'json',
|
|
'export',
|
|
'map',
|
|
'geojson',
|
|
'add',
|
|
'actions',
|
|
('export-spreadsheet', 'export_spreadsheet'),
|
|
('save-view', 'save_view'),
|
|
('delete-view', 'delete_view'),
|
|
('import-file', 'import_file'),
|
|
('filter-options', 'filter_options'),
|
|
('data-sample-csv', 'data_sample_csv'),
|
|
]
|
|
admin_permission = 'cards'
|
|
formdef_class = CardDef
|
|
export_data_label = _('Export Data')
|
|
search_label = _('Search in card content')
|
|
formdef_view_label = _('View Card')
|
|
has_json_export_support = True
|
|
ensure_parent_category_in_url = False
|
|
|
|
@property
|
|
def add(self):
|
|
return CardFillPage(self.formdef.url_name)
|
|
|
|
def listing_top_actions(self):
|
|
if not self.formdef.can_user_add_cards(get_request().user):
|
|
return ''
|
|
return htmltext('<span class="actions"><a href="./add/">%s</a></span>') % _('Add')
|
|
|
|
def get_default_filters(self, mode):
|
|
if self.view:
|
|
return self.view.get_default_filters()
|
|
return ()
|
|
|
|
def get_default_columns(self):
|
|
if self.view:
|
|
field_ids = self.view.get_columns()
|
|
else:
|
|
field_ids = ['id', 'time']
|
|
for field in self.formdef.get_all_fields():
|
|
if hasattr(field, 'get_view_value') and field.include_in_listing:
|
|
field_ids.append(field.id)
|
|
return field_ids
|
|
|
|
def get_filter_from_query(self, default=Ellipsis):
|
|
return super().get_filter_from_query(default='all' if default is Ellipsis else default)
|
|
|
|
def get_formdata_sidebar_actions(self, qs=''):
|
|
r = super().get_formdata_sidebar_actions(qs=qs)
|
|
if self.formdef.can_user_add_cards(get_request().user):
|
|
r += htmltext('<li><a rel="popup" href="import-file">%s</a></li>') % _('Import data from a file')
|
|
return r
|
|
|
|
def data_sample_csv(self):
|
|
carddef_fields = get_import_csv_fields(self.formdef)
|
|
output = io.StringIO()
|
|
if len(carddef_fields) == 1:
|
|
csv_output = csv.writer(output, quoting=csv.QUOTE_NONE, delimiter='\uE000', escapechar='\uE001')
|
|
else:
|
|
csv_output = csv.writer(output, quoting=csv.QUOTE_ALL)
|
|
|
|
csv_output.writerow([f.label for f in carddef_fields])
|
|
sample_line = []
|
|
for f in carddef_fields:
|
|
if f.convert_value_from_str is None:
|
|
value = _('will be ignored - type %s not supported') % f.get_type_label()
|
|
elif isinstance(f, fields.DateField):
|
|
value = datetime.date.today()
|
|
elif isinstance(f, fields.BoolField):
|
|
value = _('Yes')
|
|
elif isinstance(f, fields.EmailField):
|
|
value = 'foo@example.com'
|
|
elif isinstance(f, fields.MapField):
|
|
value = get_publisher().get_default_position()
|
|
elif isinstance(f, fields.ItemsField):
|
|
value = 'id1|id2|...'
|
|
else:
|
|
value = 'value'
|
|
sample_line.append(value)
|
|
csv_output.writerow(sample_line)
|
|
response = get_response()
|
|
response.set_content_type('text/plain')
|
|
response.set_header(
|
|
'content-disposition', 'attachment; filename=%s-sample.csv' % self.formdef.url_name
|
|
)
|
|
return output.getvalue()
|
|
|
|
def import_file(self):
|
|
if not self.formdef.can_user_add_cards(get_request().user):
|
|
raise errors.AccessForbiddenError()
|
|
context = {'required_fields': []}
|
|
|
|
form = Form(enctype='multipart/form-data', use_tokens=False)
|
|
form.add(FileWidget, 'file', title=_('File'), required=True)
|
|
form.add_submit('submit', _('Submit'))
|
|
form.add_submit('cancel', _('Cancel'))
|
|
if form.get_widget('cancel').parse():
|
|
return redirect('.')
|
|
|
|
if form.is_submitted() and not form.has_errors():
|
|
file_content = form.get_widget('file').parse().fp.read()
|
|
try:
|
|
json_content = json.loads(file_content)
|
|
except ValueError:
|
|
# not json -> CSV
|
|
try:
|
|
return self.import_csv_submit(file_content, submission_agent_id=get_request().user.id)
|
|
except ValueError as e:
|
|
form.set_error('file', e)
|
|
else:
|
|
try:
|
|
return self.import_json_submit(json_content)
|
|
except ValueError as e:
|
|
form.set_error('file', e)
|
|
|
|
get_response().breadcrumb.append(('import-file', _('Import File')))
|
|
html_top('data_management', _('Import File'))
|
|
context['html_form'] = form
|
|
context['impossible_csv_fields'] = self.get_csv_impossible_fields()
|
|
|
|
return template.QommonTemplateResponse(
|
|
templates=['wcs/backoffice/card-data-import-form.html'], context=context
|
|
)
|
|
|
|
def get_csv_impossible_fields(self):
|
|
impossible_fields = []
|
|
for field in get_import_csv_fields(self.formdef):
|
|
if not hasattr(field, 'required'):
|
|
continue
|
|
if field.required and field.convert_value_from_str is None:
|
|
impossible_fields.append(field.label)
|
|
return impossible_fields
|
|
|
|
def import_csv_submit(self, content, afterjob=True, api=False, submission_agent_id=None):
|
|
if b'\0' in content:
|
|
raise ValueError(_('Invalid file format.'))
|
|
|
|
impossible_fields = self.get_csv_impossible_fields()
|
|
if impossible_fields:
|
|
error = ngettext(
|
|
'%s is required but cannot be filled from CSV.',
|
|
'%s are required but cannot be filled from CSV.',
|
|
len(impossible_fields),
|
|
) % ', '.join(impossible_fields)
|
|
raise ValueError(error)
|
|
|
|
for charset in ('utf-8', 'iso-8859-15'):
|
|
try:
|
|
content = content.decode(charset)
|
|
break
|
|
except UnicodeDecodeError:
|
|
continue
|
|
|
|
try:
|
|
dialect = csv.Sniffer().sniff(content)
|
|
except csv.Error:
|
|
dialect = None
|
|
|
|
reader = csv.reader(content.splitlines(keepends=True), dialect=dialect)
|
|
try:
|
|
caption = next(reader)
|
|
except StopIteration:
|
|
raise ValueError(_('Invalid CSV file.'))
|
|
|
|
carddef_fields = get_import_csv_fields(self.formdef)
|
|
if len(caption) < len(carddef_fields):
|
|
raise ValueError(_('CSV file contains less columns than card fields.'))
|
|
|
|
data_lines = []
|
|
incomplete_lines = []
|
|
for line_no, csv_line in enumerate(reader):
|
|
if len(csv_line) != len(carddef_fields):
|
|
# +2 because header and counting from 1.
|
|
incomplete_lines.append(str(line_no + 2))
|
|
continue
|
|
data_lines.append(csv_line)
|
|
|
|
if incomplete_lines:
|
|
error_message = _('CSV file contains lines with wrong number of columns.')
|
|
if len(incomplete_lines) < 5:
|
|
error_message += ' ' + _('(line numbers %s)') % ', '.join(incomplete_lines)
|
|
else:
|
|
error_message += ' ' + _('(line numbers %s and more)') % ', '.join(incomplete_lines[:5])
|
|
raise ValueError(error_message)
|
|
|
|
job = ImportFromCsvAfterJob(
|
|
carddef=self.formdef, data_lines=data_lines, submission_agent_id=submission_agent_id
|
|
)
|
|
if afterjob:
|
|
get_response().add_after_job(job)
|
|
if api:
|
|
return job
|
|
return redirect(job.get_processing_url())
|
|
else:
|
|
job.execute()
|
|
|
|
def import_json_submit(self, json_content, afterjob=True, api=False):
|
|
# basic check, looks like valid json card content?
|
|
if not isinstance(json_content, dict) or 'data' not in json_content:
|
|
raise ValueError(_('Invalid JSON file'))
|
|
|
|
job = ImportFromJsonAfterJob(carddef=self.formdef, json_content=json_content)
|
|
if afterjob:
|
|
get_response().add_after_job(job)
|
|
if api:
|
|
return job
|
|
return redirect(job.get_processing_url())
|
|
else:
|
|
job.execute()
|
|
|
|
def _q_lookup(self, component):
|
|
|
|
view_lookup_response = self._q_lookup_view(component)
|
|
if view_lookup_response is not None:
|
|
return view_lookup_response
|
|
|
|
try:
|
|
filled = self.formdef.data_class().get(component)
|
|
except KeyError:
|
|
raise errors.TraversalError()
|
|
return CardBackOfficeStatusPage(self.formdef, filled, parent_view=self)
|
|
|
|
|
|
class CardFillPage(FormFillPage):
|
|
formdef_class = CardDef
|
|
has_channel_support = False
|
|
has_user_support = False
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
if self.formdef.user_support == 'optional':
|
|
self.has_user_support = True
|
|
|
|
def get_default_return_url(self):
|
|
if self.formdef.is_of_concern_for_user(get_request().user):
|
|
return '%s/data/%s/' % (get_publisher().get_backoffice_url(), self.formdef.url_name)
|
|
else:
|
|
# redirect to cards index page if the user is not allowed to see the cards
|
|
return '%s/data/' % get_publisher().get_backoffice_url()
|
|
|
|
def redirect_after_submitted(self, url, filled):
|
|
if get_request().form.get('_popup'):
|
|
popup_response_data = json.dumps(
|
|
{
|
|
'value': str(filled.id),
|
|
'obj': str(filled.default_digest),
|
|
}
|
|
)
|
|
return template.QommonTemplateResponse(
|
|
templates=['wcs/backoffice/popup_response.html'],
|
|
context={'popup_response_data': popup_response_data},
|
|
is_django_native=True,
|
|
)
|
|
return super().redirect_after_submitted(url, filled)
|
|
|
|
def create_form(self, *args, **kwargs):
|
|
form = super().create_form(*args, **kwargs)
|
|
if get_request().form.get('_popup'):
|
|
form.add_hidden('_popup', 1)
|
|
return form
|
|
|
|
|
|
class CardBackOfficeStatusPage(FormBackOfficeStatusPage):
|
|
form_page_class = CardFillPage
|
|
|
|
sidebar_recorded_message = _('The card has been recorded on %(date)s with the number %(number)s.')
|
|
sidebar_recorded_by_agent_message = _(
|
|
'The card has been recorded on %(date)s with the number %(number)s by %(agent)s.'
|
|
)
|
|
|
|
def html_top(self, title=None):
|
|
return html_top('data_management', title)
|
|
|
|
def should_fold_summary(self, mine, request_user):
|
|
return False
|
|
|
|
def should_fold_history(self):
|
|
return True
|
|
|
|
|
|
class ImportFromCsvAfterJob(AfterJob):
|
|
def __init__(self, carddef, data_lines, submission_agent_id):
|
|
super().__init__(
|
|
label=_('Importing data into cards'),
|
|
carddef_class=carddef.__class__,
|
|
carddef_id=carddef.id,
|
|
data_lines=data_lines,
|
|
submission_agent_id=submission_agent_id,
|
|
)
|
|
|
|
def user_lookup(self, user_value):
|
|
if self.carddef.user_support != 'optional':
|
|
return None
|
|
return get_publisher().user_class.lookup_by_string(user_value)
|
|
|
|
def execute(self):
|
|
self.carddef = self.kwargs['carddef_class'].get(self.kwargs['carddef_id'])
|
|
carddata_class = self.carddef.data_class()
|
|
self.submission_agent_id = self.kwargs['submission_agent_id']
|
|
self.total_count = len(self.kwargs['data_lines'])
|
|
self.store()
|
|
|
|
carddef_fields = get_import_csv_fields(self.carddef)
|
|
|
|
for csv_line in self.kwargs['data_lines']:
|
|
data_instance = carddata_class()
|
|
data_instance.data = {}
|
|
|
|
for i, field in enumerate(carddef_fields):
|
|
value = csv_line[i].strip()
|
|
# skip empty values
|
|
if not value:
|
|
continue
|
|
# skip unsupported field types
|
|
if field.convert_value_from_str is None:
|
|
continue
|
|
field.set_value(data_instance.data, field.convert_value_from_str(value))
|
|
|
|
user_value = data_instance.data.pop('_user', None)
|
|
data_instance.user = self.user_lookup(user_value)
|
|
data_instance.submission_agent_id = self.submission_agent_id
|
|
data_instance.submission_channel = 'file-import'
|
|
data_instance.just_created()
|
|
data_instance.store()
|
|
|
|
get_publisher().substitutions.reset()
|
|
get_publisher().substitutions.feed(get_publisher())
|
|
get_publisher().substitutions.feed(data_instance)
|
|
|
|
data_instance.record_workflow_event('csv-import-created')
|
|
data_instance.perform_workflow()
|
|
self.increment_count()
|
|
|
|
def done_action_url(self):
|
|
carddef = self.kwargs['carddef_class'].get(self.kwargs['carddef_id'])
|
|
return carddef.get_url()
|
|
|
|
def done_action_label(self):
|
|
return _('Back to Listing')
|
|
|
|
def done_button_attributes(self):
|
|
return {'data-redirect-auto': 'true'}
|
|
|
|
|
|
class ImportFromJsonAfterJob(AfterJob):
|
|
def __init__(self, carddef, json_content):
|
|
super().__init__(
|
|
label=_('Importing data into cards'),
|
|
carddef_class=carddef.__class__,
|
|
carddef_id=carddef.id,
|
|
json_content=json_content,
|
|
)
|
|
|
|
@property
|
|
def carddef(self):
|
|
return self.kwargs['carddef_class'].get(self.kwargs['carddef_id'])
|
|
|
|
def execute(self):
|
|
json_content = self.kwargs['json_content']
|
|
from wcs.api import posted_json_data_to_formdata_data
|
|
|
|
for json_data in json_content['data']:
|
|
json_data = copy.deepcopy(json_data)
|
|
carddata = self.carddef.data_class()()
|
|
carddata.data = posted_json_data_to_formdata_data(self.carddef, json_data['fields'])
|
|
if 'user' in json_data:
|
|
carddata.set_user_from_json(json_data['user'])
|
|
|
|
carddata.just_created()
|
|
|
|
if 'workflow' not in json_data:
|
|
# perform as new
|
|
carddata.store()
|
|
|
|
get_publisher().substitutions.reset()
|
|
get_publisher().substitutions.feed(get_publisher())
|
|
get_publisher().substitutions.feed(carddata)
|
|
carddata.record_workflow_event('json-import-created')
|
|
carddata.perform_workflow()
|
|
else:
|
|
if 'fields' in json_data['workflow']:
|
|
backoffice_data_dict = posted_json_data_to_formdata_data(
|
|
self.carddef, json_data['workflow']['fields']
|
|
)
|
|
carddata.data.update(backoffice_data_dict)
|
|
|
|
status = json_data['workflow'].get('real_status') or json_data['workflow'].get('status')
|
|
carddata.status = status.get('id')
|
|
carddata.evolution[-1].status = status
|
|
carddata.store()
|
|
carddata.record_workflow_event('json-import-created')
|
|
|
|
self.increment_count()
|
|
|
|
def done_action_url(self):
|
|
return self.carddef.get_url()
|
|
|
|
def done_action_label(self):
|
|
return _('Back to Listing')
|
|
|
|
def done_button_attributes(self):
|
|
return {'data-redirect-auto': 'true'}
|