276 lines
11 KiB
Python
276 lines
11 KiB
Python
# passerelle - uniform access to multiple data sources and services
|
|
# Copyright (C) 2017 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import re
|
|
|
|
from django.conf import settings
|
|
from django.db import models, transaction
|
|
from django.utils import timezone
|
|
from django.utils.translation import gettext_lazy as _
|
|
|
|
from passerelle.base.models import BaseResource
|
|
from passerelle.utils.api import endpoint
|
|
|
|
|
|
def get_tcl_data_url(key):
|
|
if key == 'tclarret':
|
|
return settings.TCL_GEOJSON_URL_TEMPLATE % key
|
|
return settings.TCL_URL_TEMPLATE % key
|
|
|
|
|
|
class Tcl(BaseResource):
|
|
category = _('Transport')
|
|
|
|
class Meta:
|
|
verbose_name = _('TCL')
|
|
|
|
@endpoint(
|
|
pattern=r'^(?P<identifier>\w+)/?$',
|
|
perm='can_access',
|
|
description=_('Info about a stop'),
|
|
example_pattern='{identifier}/',
|
|
parameters={'identifier': {'description': _('Stop Identifier'), 'example_value': '30211'}},
|
|
)
|
|
def stop(self, request, identifier):
|
|
stop_object = Stop.objects.get(id_data=identifier)
|
|
stop = {
|
|
'nom': stop_object.nom,
|
|
'lat': str(stop_object.latitude),
|
|
'lng': str(stop_object.longitude),
|
|
'passings': [],
|
|
'passings_by_line': [],
|
|
}
|
|
response = self.requests.get(
|
|
get_tcl_data_url('tclpassagearret'), params={'field': 'id', 'value': identifier}
|
|
)
|
|
response.raise_for_status()
|
|
|
|
passings_by_line = {}
|
|
metro_lines = {
|
|
'301': 'A',
|
|
'302': 'B',
|
|
'303': 'C',
|
|
'304': 'D',
|
|
'325': 'F1',
|
|
'326': 'F2',
|
|
}
|
|
for k, v in list(metro_lines.items()): # additional codes...
|
|
metro_lines[k + 'A'] = v
|
|
for passing in response.json()['values']:
|
|
for line_code in (passing['ligne'], passing['ligne'][:-1], metro_lines.get(passing['ligne'])):
|
|
try:
|
|
line = Line.objects.filter(code_titan_short=line_code)[0]
|
|
except IndexError:
|
|
passing['line_info'] = {'ligne': passing['ligne']}
|
|
else:
|
|
passing['line_info'] = line.get_info_dict()
|
|
break
|
|
stop['passings'].append(passing)
|
|
# create dictionary key from both line number and direction
|
|
line_info_key = passing['line_info']['ligne'] + '-' + passing['direction']
|
|
if line_info_key not in passings_by_line:
|
|
passings_by_line[line_info_key] = []
|
|
passings_by_line[line_info_key].append(passing)
|
|
|
|
stop['passings'].sort(key=lambda x: x['heurepassage'])
|
|
stop['passings_by_line'] = sorted(
|
|
(
|
|
{'ligne': v[0]['line_info']['ligne'], 'line_info': v[0]['line_info'], 'passings': v}
|
|
for k, v in passings_by_line.items()
|
|
),
|
|
key=lambda x: x['passings'][0]['heurepassage'],
|
|
)
|
|
|
|
if not stop['passings']:
|
|
# if there are no known passings, include all lines.
|
|
stop['passings_by_line'] = []
|
|
for line_name in sorted(stop_object.desserte.split(',')):
|
|
short_code, sens = line_name.split(':')
|
|
if sens == 'R':
|
|
sens = 'Retour'
|
|
elif sens == 'A':
|
|
sens = 'Aller'
|
|
try:
|
|
line = Line.objects.filter(code_titan_short=short_code, sens=sens)[0]
|
|
except IndexError:
|
|
continue
|
|
fake_passing = {}
|
|
fake_passing['line_info'] = line.get_info_dict()
|
|
fake_passing['line_info']['direction'] = (
|
|
line.libelle.split(' - ')[1] if ' - ' in line.libelle else ''
|
|
)
|
|
stop['passings_by_line'].append(fake_passing)
|
|
|
|
return {'data': stop}
|
|
|
|
def check_status(self):
|
|
stop_object = Stop.objects.all().first()
|
|
if stop_object:
|
|
self.stop(None, stop_object.id)
|
|
|
|
def daily(self):
|
|
super().daily()
|
|
start_update = timezone.now()
|
|
|
|
with transaction.atomic():
|
|
for key in ('tcllignebus', 'tcllignemf', 'tcllignetram'):
|
|
url = get_tcl_data_url(key)
|
|
response = self.requests.get(url)
|
|
response.raise_for_status()
|
|
for line_data in response.json()['values']:
|
|
if 'code_trace' in line_data:
|
|
line_data['code_titan'] = line_data['code_trace']
|
|
if 'nom_trace' in line_data:
|
|
line_data['libelle'] = line_data['nom_trace']
|
|
line, dummy = Line.objects.get_or_create(
|
|
code_titan=line_data['code_titan'],
|
|
defaults={'transport_key': key, 'ligne': line_data['ligne']},
|
|
)
|
|
line.__dict__.update(line_data)
|
|
line.transport_key = key
|
|
line.save()
|
|
|
|
url = get_tcl_data_url('tclarret')
|
|
response = self.requests.get(url)
|
|
response.raise_for_status()
|
|
for feature in response.json()['features']:
|
|
arret_data = feature['properties']
|
|
arret_data['id_data'] = arret_data.pop('id')
|
|
stop, dummy = Stop.objects.get_or_create(id_data=arret_data['id_data'])
|
|
stop.__dict__.update(arret_data)
|
|
stop.pmr = bool(stop.pmr == 't')
|
|
stop.escalator = bool(stop.escalator == 't')
|
|
stop.ascenseur = bool(stop.ascenseur == 't')
|
|
stop.longitude = feature['geometry']['coordinates'][0]
|
|
stop.latitude = feature['geometry']['coordinates'][1]
|
|
stop.save()
|
|
|
|
Line.objects.filter(last_update__lt=start_update).delete()
|
|
Stop.objects.filter(last_update__lt=start_update).delete()
|
|
|
|
|
|
class Line(models.Model):
|
|
transport_key = models.CharField(max_length=20)
|
|
indice = models.CharField(max_length=20, blank=True)
|
|
couleur = models.CharField(max_length=20)
|
|
ut = models.CharField(max_length=10, blank=True, null=True)
|
|
ligne = models.CharField(max_length=10)
|
|
code_titan = models.CharField(max_length=10)
|
|
sens = models.CharField(max_length=10)
|
|
libelle = models.CharField(max_length=100)
|
|
infos = models.CharField(max_length=100, blank=True)
|
|
|
|
code_titan_short = models.CharField(max_length=10)
|
|
html_bg_color = models.CharField(max_length=10)
|
|
html_fg_color = models.CharField(max_length=10)
|
|
|
|
# computed elements to recreate the official display of line names
|
|
display_prefix = models.CharField(max_length=10, blank=True)
|
|
display_suffix_text = models.CharField(max_length=10, blank=True)
|
|
display_suffix_color = models.CharField(max_length=10, blank=True)
|
|
display_style = models.CharField(max_length=10, blank=True)
|
|
|
|
last_update = models.DateTimeField(_('Last update'), auto_now=True)
|
|
|
|
def get_foreground_colour(self, background_colour):
|
|
"""Calculates the luminance of the given colour (six hexadecimal digits)
|
|
and returns an appropriate foreground colour."""
|
|
# luminance coefficients taken from section C-9 from
|
|
# http://www.faqs.org/faqs/graphics/colorspace-faq/
|
|
brightess = (
|
|
int(background_colour[0:2], 16) * 0.212671
|
|
+ int(background_colour[2:4], 16) * 0.715160
|
|
+ int(background_colour[4:6], 16) * 0.072169
|
|
)
|
|
if brightess > 128:
|
|
fg_colour = '000000'
|
|
else:
|
|
fg_colour = 'ffffff'
|
|
return fg_colour
|
|
|
|
def get_info_dict(self):
|
|
return {
|
|
'ligne': self.ligne,
|
|
'libelle': self.libelle,
|
|
'infos': self.infos,
|
|
'transport_key': self.transport_key,
|
|
'html_bg_color': self.html_bg_color,
|
|
'html_fg_color': self.html_fg_color,
|
|
'display_prefix': self.display_prefix,
|
|
'display_suffix_text': self.display_suffix_text,
|
|
'display_suffix_color': self.display_suffix_color,
|
|
'display_style': self.display_style,
|
|
}
|
|
|
|
def save(self, *args, **kwargs):
|
|
if not self.couleur or self.couleur == 'None':
|
|
self.couleur = '255 255 255'
|
|
self.html_bg_color = '%02x%02x%02x' % tuple(int(x) for x in self.couleur.split())
|
|
self.html_fg_color = self.get_foreground_colour(self.html_bg_color)
|
|
self.code_titan_short = re.sub(r'[abr].*', '', self.code_titan)
|
|
if '-' in self.code_titan_short:
|
|
# new code format, apparently <code>-\d
|
|
self.code_titan_short = self.code_titan_short.split('-')[0]
|
|
|
|
# compute display parts
|
|
if self.transport_key == 'tcllignebus' and self.ligne.startswith('C'):
|
|
self.display_prefix = 'C'
|
|
self.display_suffix_text = self.ligne[1:]
|
|
self.display_suffix_color = '#6d6e71'
|
|
elif self.transport_key == 'tcllignebus' and self.ligne.startswith('PL'):
|
|
self.display_prefix = 'PL'
|
|
self.display_suffix_text = self.ligne[2:]
|
|
self.display_suffix_color = '#0c4da2'
|
|
elif self.transport_key == 'tcllignebus':
|
|
self.display_prefix = 'BUS'
|
|
self.display_suffix_text = self.ligne
|
|
self.display_suffix_color = '#ffffff'
|
|
self.display_style = 'busline' # red border...
|
|
elif self.transport_key == 'tcllignemf' and self.ligne.startswith('F'):
|
|
self.display_prefix = 'F'
|
|
self.display_suffix_text = self.ligne[1:]
|
|
self.display_suffix_color = '#8cc751'
|
|
elif self.transport_key == 'tcllignemf':
|
|
self.display_prefix = 'M'
|
|
self.display_suffix_text = self.ligne
|
|
self.display_suffix_color = {
|
|
'A': '#ee3897',
|
|
'B': '#007dc6',
|
|
'C': '#f99d1c',
|
|
'D': '#00ac4f',
|
|
}.get(self.ligne)
|
|
elif self.transport_key == 'tcllignetram':
|
|
self.display_prefix = 'T'
|
|
self.display_suffix_text = self.ligne[1:]
|
|
self.display_suffix_color = '#883f98'
|
|
else:
|
|
raise Exception('unknown TCL line %r' % self.code_titan)
|
|
|
|
return super().save(*args, **kwargs)
|
|
|
|
|
|
class Stop(models.Model):
|
|
id_data = models.CharField(max_length=10)
|
|
nom = models.CharField(max_length=50)
|
|
desserte = models.CharField(max_length=200)
|
|
pmr = models.BooleanField(default=False)
|
|
escalator = models.BooleanField(default=False)
|
|
ascenseur = models.BooleanField(default=False)
|
|
latitude = models.FloatField(null=True)
|
|
longitude = models.FloatField(null=True)
|
|
|
|
last_update = models.DateTimeField(_('Last update'), auto_now=True)
|