passerelle/passerelle/contrib/greco/models.py

504 lines
22 KiB
Python

# Copyright (C) 2016 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import io
import json
import re
import urllib
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import requests
import suds.sudsobject
from django.core.cache import cache
from django.db import models
from django.utils.translation import gettext_lazy as _
from suds.client import Client
from suds.transport import Reply
from suds.transport.http import HttpAuthenticated
from passerelle.base.models import BaseResource
from passerelle.soap import sudsobject_to_dict
from passerelle.utils.api import APIError, endpoint
from .formdata import CREATION_SCHEMA, FormData, list_schema_fields
API_TYPES = (
('soap', 'SOAP'),
('rest', 'REST'),
)
# taken from https://lsimons.wordpress.com/2011/03/17/stripping-illegal-characters-out-of-xml-in-python/
_illegal_xml_chars_RE = re.compile('[\x00-\x08\x0b\x0c\x0e-\x1F\uD800-\uDFFF\uFFFE\uFFFF]')
def escape_xml_illegal_chars(val, replacement='?'):
return _illegal_xml_chars_RE.sub(replacement, val)
class ParameterTypeError(Exception):
http_status = 400
log_error = False
def fill_sudsobject_with_dict(sudsobject, fields, prefix=None):
for key, value in sudsobject:
if prefix:
attr = '%s_%s' % (prefix, key)
else:
attr = key
if isinstance(value, suds.sudsobject.Object):
fill_sudsobject_with_dict(value, fields, attr)
else:
if attr in fields:
# sudsobject.foo.bar <- fields['foo_bar']
field_value = fields[attr]
# duck-type unicode/str
if hasattr(field_value, 'isnumeric'):
field_value = escape_xml_illegal_chars(field_value)
setattr(sudsobject, key, fields[attr])
class Greco(BaseResource):
application = models.CharField(_('Application identifier'), max_length=200)
token_url = models.URLField(_('Token URL'), max_length=256)
token_authorization = models.CharField(_('Token Authorization'), max_length=128)
wsdl_url = models.CharField(_('WSDL or REST URL'), max_length=256) # not URLField, it can be file://
verify_cert = models.BooleanField(default=True, verbose_name=_('Check HTTPS Certificate validity'))
api_type = models.CharField(_('API Type'), max_length=4, choices=API_TYPES, default='soap')
hide_description_fields = ['token_authorization']
category = _('Business Process Connectors')
class Meta:
verbose_name = _('GRECO Webservices')
@property
def use_soap(self):
return self.api_type == 'soap'
def get_token(self, renew=False):
cache_key = 'greco-%s-token' % self.id
if not renew:
token = cache.get(cache_key)
if token:
return token
headers = {'Authorization': 'Basic %s' % self.token_authorization}
resp = self.requests.post(
self.token_url,
headers=headers,
data={'grant_type': 'client_credentials'},
verify=self.verify_cert,
timeout=60,
)
if resp.status_code >= 400:
raise APIError(
'HTTP Transport Error %s' % resp.status_code,
err_code='transport-error-%s' % resp.status_code,
)
resp = resp.json()
token = '%s %s' % (resp.get('token_type'), resp.get('access_token'))
timeout = int(resp.get('expires_in'))
cache.set(cache_key, token, timeout)
self.logger.debug('new token: %s (timeout %ss)', token, timeout)
return token
def get_client(self, attachments=None):
attachments = attachments or []
class Transport(HttpAuthenticated):
def __init__(self, instance, attachments):
self.instance = instance
self.attachments = attachments
HttpAuthenticated.__init__(self)
def send(self, request):
request.message = request.message.replace(b"contentType", b"xm:contentType")
if self.attachments:
# SOAP Attachement format
message = MIMEMultipart('related', type="text/xml", start="<rootpart@entrouvert.org>")
xml = MIMEText(None, _subtype='xml', _charset='utf-8')
xml.add_header('Content-ID', '<rootpart@entrouvert.org>')
# do not base64-encode the soap message
xml.replace_header('Content-Transfer-Encoding', '8bit')
xml_payload = request.message.decode('utf-8')
# hack payload to include attachment filenames in
# SOAP-ENV:Header.
soap_headers = []
for num, attachment in enumerate(self.attachments):
filename = attachment.get('filename') or 'file%s.bin' % num
soap_headers.append('<filename%s>%s</filename%s>' % (num, filename, num))
xml_payload = xml_payload.replace(
'<SOAP-ENV:Header/>', '<SOAP-ENV:Header>%s</SOAP-ENV:Header>' % ''.join(soap_headers)
)
xml.set_payload(xml_payload)
message.attach(xml)
for num, attachment in enumerate(self.attachments):
filename = attachment.get('filename') or 'file%s.bin' % num
content = base64.b64decode(attachment.get('content') or '')
content_type = attachment.get('content_type') or 'application/octet-stream'
maintype, subtype = content_type.split('/', 1)
if maintype == 'text':
for encoding in ('utf-8', 'iso-8859-15'):
try:
content.decode(encoding)
break
except UnicodeDecodeError:
pass
part = MIMEText(content, _subtype=subtype, _charset=encoding)
else:
part = MIMEBase(maintype, subtype, name=filename)
attachment['real_bytes'] = content
attachment['fake_bytes'] = '\ue000%s\ue000' % num
part.set_payload(attachment['fake_bytes'])
part.add_header('Content-Transfer-Encoding', 'binary')
part.add_header('Content-Disposition', 'attachment', name=filename, filename=filename)
part.add_header('Content-ID', '<%s>' % filename)
message.attach(part)
message._write_headers = lambda x: None
message.as_string(unixfrom=False)
# RFC 2045 defines MIME multipart boundaries:
# * boundary := 0*69<bchars> bcharsnospace
# * dash-boundary := "--" boundary
# * delimiter := CRLF dash-boundary
# but Python doesn't use CRLF, will only use LF (on Unix systems
# at least). This is http://bugs.python.org/issue1349106 and has
# been fixed in Python 3.2.
#
# Manually hack message to put \r\n so that the message is
# correctly read by Apache Axis strict parser.
boundary = message.get_boundary()
request.message = (
message.as_string(unixfrom=False)
.replace(boundary + '\n', boundary + '\r\n')
.replace('\n--' + boundary, '\r\n--' + boundary)
.encode('utf-8')
)
for attachment in attachments:
# substitute binary parts
if attachment.get('fake_bytes'):
request.message = request.message.replace(
attachment['fake_bytes'].encode('utf-8'), attachment['real_bytes']
)
request.headers.update(dict(message._headers))
request.headers['Authorization'] = self.instance.get_token()
resp = self.instance.requests.post(
request.url,
data=request.message,
headers=request.headers,
verify=self.instance.verify_cert,
timeout=60,
)
if resp.status_code == 401:
# ask for a new token, and retry
request.headers['Authorization'] = self.instance.get_token(renew=True)
resp = self.instance.requests.post(
request.url,
data=request.message,
headers=request.headers,
verify=self.instance.verify_cert,
timeout=60,
)
if resp.status_code >= 400 and resp.status_code != 500:
raise APIError(
'HTTP Transport Error %s' % resp.status_code,
err_code='transport-error-%s' % resp.status_code,
)
if resp.status_code == 500 and b'Fault' not in resp.content:
raise APIError('Error 500, not a SOAP Fault', err_code='transport-error-500')
return Reply(resp.status_code, resp.headers, resp.content)
return Client(url=self.wsdl_url, transport=Transport(self, attachments))
def _rest_call(self, path, method='get', json_data=None, params=None, files=None):
headers = {'Authorization': self.get_token()}
url = urllib.parse.urljoin(self.wsdl_url, path)
try:
resp = self.requests.request(
method=method,
url=url,
headers=headers,
json=json_data,
params=params,
files=files,
)
except (requests.Timeout, requests.RequestException) as e:
raise APIError(str(e))
if resp.status_code == 401:
# ask for a new token, and retry
headers['Authorization'] = self.get_token(renew=True)
try:
resp = self.requests.request(
method=method,
url=url,
headers=headers,
json=json_data,
params=params,
files=files,
)
except (requests.Timeout, requests.RequestException) as e:
raise APIError(str(e))
try:
resp.raise_for_status()
except requests.RequestException as e:
try:
err_data = resp.json()
except (json.JSONDecodeError, requests.RequestException):
err_data = {'response_text': resp.text}
raise APIError(str(e), data=err_data)
content_type = resp.headers.get('Content-Type')
if content_type and content_type.startswith('application/json'):
try:
return resp.json()
except (json.JSONDecodeError, requests.RequestException) as e:
err_data = {'response_text': resp.text}
raise APIError(str(e), data=err_data)
raise APIError(resp.text)
def check_status(self):
if self.use_soap:
response = self.get_client().service.communicationTest('ping')
else:
response = self._rest_call('communicationTest/ping').get('reponse')
if not response:
raise Exception('empty answer to communication test')
@endpoint(
perm='can_access',
description=_('Communication test'),
long_description='''Response :
{"data": "…[ping]", "err": 0}''',
display_order=1,
)
def ping(self, request):
if self.use_soap:
resp = self.get_client().service.communicationTest('ping')
else:
json_data = self._rest_call('communicationTest/ping')
resp = json_data.get('reponse')
if not resp:
raise APIError('empty response from communicationTest()')
return {'data': resp}
@endpoint(
perm='can_access',
methods=['post'],
description=_('Create a demand'),
long_description=(
'''Payload JSON :
{ "iddemande": "", "description": "", "domaineobjettype": "", "datecreation": "", "datedepot": "", "danger": "", "mediareponse": "", '''
'''"priorite": "", "application": "", "beneficiaire_civilite": "", "beneficiaire_nom": "", "beneficiaire_prenom": "", '''
'''"beneficiaire_email": "", "beneficiaire_mobile": "", "beneficiaire_telephone": "", "beneficiaire_fax": "", '''
'''"beneficiaire_numerovoie": "", "beneficiaire_voie": "", "beneficiaire_codefuvvoie": "", "beneficiaire_coderivolivoie": "", '''
'''"beneficiaire_complement": "", "beneficiaire_codepostal": "", "beneficiaire_commune": "", "beneficiaire_organisation": "", '''
'''"beneficiaire_typetiers": "", "localisation_numerovoie": "", "localisation_voie": "", "localisation_codefuvvoie": "", '''
'''"localisation_coderivolivoie": "", "localisation_complement": "", "localisation_commune": "", "localisation_voiesecante": "", '''
'''"localisation_codefuvvoiesecante": "", "localisation_coderivolivoiesecante": "", "localisation_xgeoloc": "", '''
'''"localisation_ygeoloc": "", "transmetteur_civilite": "", "transmetteur_nom": "", "transmetteur_prenom": "", '''
'''"transmetteur_email": "", "transmetteur_mobile": "", "transmetteur_telephone": "", "transmetteur_fax": "", '''
'''"transmetteur_service": "", "fichier1": {"filename": "….…", "content_type": "…/…", "content": "…base64…" }, "fichier2": { … } }
Response :
{"data": {"application": "", "etat": "1 - Créée", "iddemande": "", "idgreco": "", "motifsrejet": null}, "err": 0}'''
),
display_order=2,
)
def create(self, request):
# get creation fields from payload
try:
formdata = FormData(json.loads(request.body), CREATION_SCHEMA)
except ValueError as e:
raise ParameterTypeError(str(e))
if self.use_soap:
# create suds object from formdata
client = self.get_client(formdata.attachments)
creation = client.factory.create('DemandeCreation')
creation.application = self.application
fill_sudsobject_with_dict(creation, formdata.fields)
# send it to "creer"
resp = client.service.creer(creation)
if resp is None:
raise APIError('empty response from creer()')
data = sudsobject_to_dict(resp)
else:
files = []
for num, attachment in enumerate(formdata.attachments):
filename = attachment.get('filename') or 'file%s.bin' % num
content = base64.b64decode(attachment.get('content') or '')
content_type = attachment.get('content_type') or 'application/octet-stream'
files.append(('files', (filename, content, content_type)))
files.append(
(
'creerRequest',
(None, io.BytesIO(json.dumps(formdata.json).encode('utf-8')), 'application/json'),
)
)
data = self._rest_call('creer', method='post', files=files)
return {'data': data}
@classmethod
def creation_fields(cls):
'''used in greco_detail.html template'''
return list_schema_fields(CREATION_SCHEMA)
@endpoint(
perm='can_access',
description=_('Get a demand'),
long_description=(
'''Response :
{"data": {"commentaireReponse": null, "comptesRendus": null, "dateResolutionEffective": null, "dateResolutionPrevue": "YYYY-MM-DD", '''
'''"etat": "1 - Créée", "groupeResponsable": null, "iddemande": "", "idgreco": "", "motifsrejet": null, "suiteDonnee": null}, '''
'''"err": 0}'''
),
display_order=3,
)
def status(self, request, idgreco, iddemande=None):
params = {
'idgreco': idgreco,
'iddemande': iddemande,
}
if self.use_soap:
resp = self.get_client().service.consulter(params)
if resp is None:
raise APIError('empty response from status()')
data = sudsobject_to_dict(resp)
else:
data = self._rest_call('consulter', params=params)
return {'data': data}
@endpoint(
perm='can_access',
description=_('Get mail response to a demand'),
long_description='''Response :
{"data": {"etat": null, "idgreco": "", "iddemande": "", "motifsrejet": null, "message": ""}, "err": 0}''',
display_order=4,
)
def answer(self, request, idgreco, iddemande, code=None):
params = {
'idgreco': idgreco,
'iddemande': iddemande,
}
if code:
params['taCode'] = code
if self.use_soap:
resp = self.get_client().service.getMail(params)
if resp is None:
raise APIError('empty response from consulter()')
data = sudsobject_to_dict(resp)
else:
data = self._rest_call('getMail', params=params)
return {'data': data}
@endpoint(
perm='can_access',
methods=['get', 'post', 'put', 'patch'],
name='add-information',
description=_('Provide additional information to a demand'),
long_description='''Response :
{"data": {"iddemande": "", "idgreco": "", "motifsrejet": null}, "err": 0}''',
display_order=5,
)
def add_information(self, request, iddemande=None, idgreco=None, information=None):
if request.body:
payload = json.loads(request.body)
if not isinstance(payload, dict):
raise ParameterTypeError('payload must be a dict')
idgreco = payload.get('idgreco') or idgreco
iddemande = payload.get('iddemande') or iddemande
information = payload.get('information') or information
json_body = {
'idgreco': idgreco,
'iddemande': iddemande,
'complementInfo': information,
}
if self.use_soap:
resp = self.get_client().service.ajouterComplementInformation(json_body)
if resp is None:
raise APIError('empty response from ajouterComplementInformation()')
data = sudsobject_to_dict(resp)
else:
data = self._rest_call('ajouterComplementInformation', method='post', json_data=json_body)
return {'data': data}
@endpoint(
perm='can_access',
methods=['get', 'post', 'put', 'patch'],
description=_('Remind a demand'),
long_description='''Response :
{"data": {"iddemande": "", "idgreco": "", "motifsrejet": null}, "err": 0}''',
display_order=6,
)
def update(self, request, iddemande=None, idgreco=None, comment=None):
if request.body:
payload = json.loads(request.body)
if not isinstance(payload, dict):
raise ParameterTypeError('payload must be a dict')
idgreco = payload.get('idgreco') or idgreco
iddemande = payload.get('iddemande') or iddemande
comment = payload.get('comment') or comment
params = {
'idgreco': idgreco,
'iddemande': iddemande,
'commentaire': comment,
}
if self.use_soap:
resp = self.get_client().service.relancer(params)
if resp is None:
raise APIError('empty response from relancer()')
data = sudsobject_to_dict(resp)
else:
data = self._rest_call('relancer', params=params)
return {'data': data}
@endpoint(
perm='can_access',
methods=['post'],
name='add-confirmation',
description=_('Update number of confirmations (+1) related to a demand'),
long_description='''Response :
{"data": {"iddemande": "", "idgreco": "", "motifsrejet": null}, "err": 0}''',
display_order=7,
)
def add_confirmation(self, request):
payload = json.loads(request.body)
if not isinstance(payload, dict):
raise ParameterTypeError('payload must be a dict')
idgreco = payload.get('idgreco')
iddemande = payload.get('iddemande')
nbr = payload.get('nbconfirmation')
params = {
'idGreco': idgreco,
'idDemande': iddemande,
'nbconfirmation': nbr,
}
if self.use_soap:
resp = self.get_client().service.confirmer(params)
if resp is None:
raise APIError('empty response from confirmer()')
data = sudsobject_to_dict(resp)
else:
data = self._rest_call('confirmer', params=params)
return {'data': data}