combo/combo/apps/maps/models.py

610 lines
22 KiB
Python

# combo - content management system
# Copyright (C) 2017 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import copy
import json
import pyproj
from django import forms
from django.conf import settings
from django.core import serializers, validators
from django.db import models
from django.db.models import OuterRef, Subquery
from django.urls import reverse
from django.utils.encoding import force_str
from django.utils.html import escape
from django.utils.text import slugify
from django.utils.translation import gettext_lazy as _
from requests.exceptions import RequestException
from requests.models import PreparedRequest
from combo.data.library import register_cell_class
from combo.data.models import CellBase
from combo.data.utils import ImportSiteError
from combo.utils import get_templated_url, requests
KIND = [
('tiles', _('Tiles')),
('geojson', _('GeoJSON')),
]
ICONS = [
('ambulance', _('Ambulance')),
('asterisk', _('Asterisk')),
('bell', _('Bell')),
('bicycle', _('Bicycle')),
('book', _('Book')),
('broken_chain', _('Broken chain')),
('building', _('Building')),
('bus', _('Bus')),
('car', _('Car')),
('checkmark', _('Checkmark')),
('cube', _('Cube')),
('drop', _('Drop')),
('eye', _('Eye')),
('flag', _('Flag')),
('gavel', _('Gavel')),
('hospital', _('Hospital')),
('house', _('House')),
('lightbulb', _('Lightbulb')),
('map_signs', _('Map signs')),
('motorcycle', _('Motorcycle')),
('paint_brush', _('Paint brush')),
('paw', _('Paw')),
('recycle', _('Recycle')),
('road', _('Road')),
('shower', _('Shower')),
('star', _('Star')),
('subway', _('Subway')),
('taxi', _('Taxi')),
('train', _('Train')),
('trash', _('Trash')),
('tree', _('Tree')),
('truck', _('Truck')),
('university', _('University')),
('warning', _('Warning')),
('wheelchair', _('Wheelchair')),
]
MARKER_SIZES = [
('_small', _('Small')),
('_medium', _('Medium')),
('_large', _('Large')),
]
MARKER_BEHAVIOUR_ONCLICK = [
('none', _('Nothing')),
('display_data', _('Display data in popup')),
]
ZOOM_LEVELS = [
('0', _('Whole world')),
('6', _('Country')),
('9', _('Wide area')),
('11', _('Area')),
('13', _('Town')),
('16', _('Small road')),
('18', _('Neighbourhood')),
('19', _('Ant')),
]
class MapLayerManager(models.Manager):
def get_by_natural_key(self, slug):
return self.get(slug=slug)
class MapLayer(models.Model):
objects = MapLayerManager()
label = models.CharField(_('Label'), max_length=128)
slug = models.SlugField(_('Identifier'), unique=True)
kind = models.CharField(max_length=10, choices=KIND, default='geojson')
tiles_template_url = models.CharField(
_('Tiles URL'),
max_length=1024,
help_text=_('For example: %s') % settings.COMBO_MAP_TILE_URLTEMPLATE,
blank=True,
null=True,
)
tiles_attribution = models.CharField(
_('Attribution'),
max_length=1024,
help_text=_('For example: %s') % escape(settings.COMBO_MAP_ATTRIBUTION),
blank=True,
null=True,
)
tiles_default = models.BooleanField(_('Default tiles layer'), default=False)
geojson_url = models.CharField(_('Geojson URL'), max_length=1024)
marker_colour = models.CharField(_('Marker or surface colour'), max_length=100, default='#0000FF')
marker_size = models.CharField(_('Marker size'), max_length=10, default='_large', choices=MARKER_SIZES)
icon = models.CharField(_('Marker icon'), max_length=32, blank=True, null=True, choices=ICONS)
icon_colour = models.CharField(_('Icon colour'), max_length=7, default='#000000')
cache_duration = models.PositiveIntegerField(_('Cache duration'), default=60, help_text=_('In seconds.'))
include_user_identifier = models.BooleanField(_('Include user identifier in request'), default=True)
geojson_query_parameter = models.CharField(
_("Query parameter for fulltext requests"),
max_length=100,
blank=True,
help_text=_('Name of the parameter to use for querying the GeoJSON layer (typically, q)'),
)
geojson_accepts_circle_param = models.BooleanField(
_('GeoJSON URL accepts a "cirle" parameter'), default=False
)
class Meta:
ordering = ('label',)
def save(self, *args, **kwargs):
if not self.slug:
base_slug = slugify(self.label)[:45]
slug = base_slug
i = 1
while MapLayer.objects.filter(slug=slug).exists():
slug = '%s-%s' % (base_slug, i)
i += 1
self.slug = slug
super().save(*args, **kwargs)
def __str__(self):
return str(self.label)
def natural_key(self):
return (self.slug,)
@classmethod
def get_default_tiles_layer(cls):
return cls.objects.filter(kind='tiles', tiles_default=True).first()
@classmethod
def export_all_for_json(cls):
return [x.get_as_serialized_object() for x in MapLayer.objects.all()]
def get_as_serialized_object(self):
serialized_layer = json.loads(
serializers.serialize(
'json', [self], use_natural_foreign_keys=True, use_natural_primary_keys=True
)
)[0]
del serialized_layer['model']
return serialized_layer
@classmethod
def load_serialized_objects(cls, json_site):
for json_layer in json_site:
cls.load_serialized_object(json_layer)
@classmethod
def load_serialized_object(cls, json_layer):
json_layer['model'] = 'maps.maplayer'
layer, dummy = MapLayer.objects.get_or_create(slug=json_layer['fields']['slug'])
json_layer['pk'] = layer.id
layer = next(serializers.deserialize('json', json.dumps([json_layer]), ignorenonexistent=True))
layer.save()
def get_geojson(self, request, properties=''):
geojson_url = get_templated_url(self.geojson_url)
query_parameter = self.geojson_query_parameter
if request and query_parameter and query_parameter in request.GET:
req = PreparedRequest()
req.prepare_url(geojson_url, {'q': request.GET[query_parameter]})
geojson_url = req.url
distance = center_lat = center_lng = None
try:
distance = float(request.GET['distance'])
center_lat = float(request.GET['lat'])
center_lng = float(request.GET['lng'])
distance_params = True
except (AttributeError, ValueError, KeyError):
distance_params = False
if request and self.geojson_accepts_circle_param and distance_params:
req = PreparedRequest()
req.prepare_url(geojson_url, {'circle': '%s,%s,%s' % (center_lng, center_lat, distance)})
geojson_url = req.url
try:
response = requests.get(
geojson_url,
remote_service='auto',
cache_duration=self.cache_duration,
user=request.user if (request and self.include_user_identifier) else None,
without_user=not (self.include_user_identifier),
headers={'accept': 'application/json'},
)
response.raise_for_status()
except RequestException as e:
return {
'type': 'FeatureCollection',
'features': [],
'_combo_err_desc': "Bad response from requested URL (%s)" % e,
}
try:
data = response.json()
except json.JSONDecodeError:
return {
'type': 'FeatureCollection',
'features': [],
'_combo_err_desc': "Non JSON response from requested URL",
}
if data is None:
return {
'type': 'FeatureCollection',
'features': [],
'_combo_err_desc': "Empty JSON response",
}
if 'features' in data:
features = data['features']
else:
features = data
if not isinstance(features, list):
return {
'type': 'FeatureCollection',
'features': [],
'_combo_err_desc': "Wrong GeoJSON response",
}
if features:
if not isinstance(features[0], dict):
return {
'type': 'FeatureCollection',
'features': [],
'_combo_err_desc': "Wrong GeoJSON response",
}
if not ({'geometry', 'properties'} <= set(features[0].keys())):
return {
'type': 'FeatureCollection',
'features': [],
'_combo_err_desc': "Wrong GeoJSON response",
}
if properties:
properties = [x.strip() for x in properties.split(',') if x.strip()]
for feature in features:
property_values = feature['properties']
if 'display_fields' in feature['properties']:
# w.c.s. content, filter fields on varnames
feature['properties']['display_fields'] = [
x for x in feature['properties']['display_fields'] if x.get('varname') in properties
]
else:
# classic geojson, filter properties
feature['properties'] = dict(
[x for x in feature['properties'].items() if x[0] in properties]
)
# keep the property for marker's colour
if self.marker_colour and not self.marker_colour.startswith('#'):
first_dotted_name = self.marker_colour.split('.')[0]
if first_dotted_name in property_values:
feature['properties'][first_dotted_name] = property_values[first_dotted_name]
if request and not self.geojson_accepts_circle_param and distance_params:
geod = pyproj.Geod(ellps='WGS84')
south_lat = geod.fwd(center_lng, center_lat, 180, distance)[1]
north_lat = geod.fwd(center_lng, center_lat, 0, distance)[1]
east_lng = geod.fwd(center_lng, center_lat, 90, distance)[0]
west_lng = geod.fwd(center_lng, center_lat, -90, distance)[0]
def match_geom(feature):
if feature['geometry']['type'] != 'Point':
return True
lng, lat = feature['geometry']['coordinates']
return bool(west_lng < lng < east_lng and south_lat < lat < north_lat)
features = [x for x in features if match_geom(x)]
if request and not query_parameter and request.GET.get('q'):
# all words must match
query_words = [slugify(x) for x in request.GET['q'].split()]
def match_words(feature):
matching_query_words = set()
feature_words = [self.label]
def get_feature_words(properties):
for property in properties.values():
if isinstance(property, str):
feature_words.append(property)
elif isinstance(property, dict):
get_feature_words(property)
get_feature_words(feature['properties'])
for feature_word in feature_words:
for word in query_words:
if word in slugify(feature_word):
matching_query_words.add(word)
if len(matching_query_words) == len(query_words):
return True
return False
features = [x for x in features if match_words(x)]
return {'type': 'FeatureCollection', 'features': features}
def can_have_assets(self):
return self.get_asset_slot_templates()
def get_label_for_asset(self):
return _('"%s" map layer') % self.label
def get_asset_slot_key(self, key):
return 'maplayer:%s:%s' % (key, self.slug)
def get_asset_slot_templates(self):
return settings.COMBO_MAP_LAYER_ASSET_SLOTS or {}
def get_asset_slot_keys(self):
if not self.can_have_assets():
return {}
slot_templates = self.get_asset_slot_templates()
return {self.get_asset_slot_key(key): key for key in slot_templates.keys()}
def get_asset_slots(self):
if not self.can_have_assets():
return {}
slot_templates = self.get_asset_slot_templates()
slots = {}
for slot_template_key, slot_template_data in slot_templates.items():
suffix = ''
if slot_template_data.get('suffix'):
suffix = ' (%s)' % slot_template_data['suffix']
slot_key = self.get_asset_slot_key(slot_template_key)
label = '%(prefix)s%(label)s%(suffix)s' % {
'prefix': slot_template_data['prefix'],
'label': self.get_label_for_asset(),
'suffix': suffix,
}
short_label = '%(prefix)s%(suffix)s' % {'prefix': slot_template_data['prefix'], 'suffix': suffix}
slots[slot_key] = {
'label': label,
'short_label': short_label,
}
slots[slot_key].update(slot_template_data)
return slots
@register_cell_class
class Map(CellBase):
title = models.CharField(_('Title'), max_length=150, blank=True)
initial_state = models.CharField(
_('Initial state'),
max_length=20,
choices=[
('default-position', _('Centered on default position')),
('device-location', _('Centered on device location')),
('fit-markers', _('Centered to fit all markers')),
],
default='default-position',
)
initial_zoom = models.CharField(_('Initial zoom level'), max_length=2, choices=ZOOM_LEVELS, default='13')
min_zoom = models.CharField(_('Minimal zoom level'), max_length=2, choices=ZOOM_LEVELS, default='0')
max_zoom = models.CharField(_('Maximal zoom level'), max_length=2, choices=ZOOM_LEVELS, default=19)
group_markers = models.BooleanField(_('Group markers in clusters'), default=False)
marker_behaviour_onclick = models.CharField(
_('Marker behaviour on click'), max_length=32, default='none', choices=MARKER_BEHAVIOUR_ONCLICK
)
layers = models.ManyToManyField(MapLayer, through='MapLayerOptions', verbose_name=_('Layers'), blank=True)
default_template_name = 'maps/map_cell.html'
manager_form_template = 'maps/map_cell_form.html'
class Meta:
verbose_name = _('Map')
class Media:
js = (
'/jsi18n',
'xstatic/leaflet.js',
'js/leaflet-gps.js',
'js/combo.map.js',
'xstatic/leaflet.markercluster.js',
'xstatic/leaflet-gesture-handling.min.js',
)
css = {
'all': ('xstatic/leaflet.css', 'css/combo.map.css', 'xstatic/leaflet-gesture-handling.min.css')
}
def get_default_position(self):
return settings.COMBO_MAP_DEFAULT_POSITION
def get_default_form_class(self):
fields = (
'initial_state',
'group_markers',
'marker_behaviour_onclick',
)
return forms.models.modelform_factory(self.__class__, fields=fields)
def get_manager_tabs(self):
tabs = super().get_manager_tabs()
tabs.insert(
1,
{
'slug': 'zoom',
'name': _('Zoom'),
'fields': ['initial_zoom', 'min_zoom', 'max_zoom'],
},
)
return tabs
@classmethod
def is_enabled(cls):
return MapLayer.objects.exists()
def get_tiles_layers(self):
tiles_layers = []
options_qs = (
self.maplayeroptions_set.filter(map_layer__kind='tiles')
.select_related('map_layer')
.order_by('-opacity')
)
for options in options_qs:
tiles_layers.append(
{
'tile_urltemplate': options.map_layer.tiles_template_url,
'map_attribution': options.map_layer.tiles_attribution,
'opacity': options.opacity or 0,
}
)
# check if at least one layer with opacity set to 1 exists
if any([l['opacity'] == 1 for l in tiles_layers]):
return tiles_layers
# add the default tiles layer
default_tiles_layer = MapLayer.get_default_tiles_layer()
if default_tiles_layer is not None:
tiles_layers.insert(
0,
{
'tile_urltemplate': default_tiles_layer.tiles_template_url,
'map_attribution': default_tiles_layer.tiles_attribution,
'opacity': 1,
},
)
else:
tiles_layers.insert(
0,
{
'tile_urltemplate': settings.COMBO_MAP_TILE_URLTEMPLATE,
'map_attribution': force_str(settings.COMBO_MAP_ATTRIBUTION),
'opacity': 1,
},
)
return tiles_layers
def get_geojson_layers(self):
if not self.pk:
return []
options = MapLayerOptions.objects.filter(map_cell=self, map_layer=OuterRef('pk'))
layers = self.layers.filter(kind='geojson').annotate(
properties=Subquery(options.values('properties'))
)
return [
{
'url': reverse('mapcell-geojson', kwargs={'cell_id': self.pk, 'layer_slug': l.slug}),
'slug': l.slug,
'icon': l.icon,
'icon_colour': l.icon_colour,
'marker_colour': l.marker_colour,
'marker_size': l.marker_size,
'properties': [x.strip() for x in l.properties.split(',')],
}
for l in layers
]
def get_cell_extra_context(self, context):
ctx = super().get_cell_extra_context(context)
ctx['title'] = self.title
default_position = self.get_default_position()
ctx['init_lat'] = default_position['lat']
ctx['init_lng'] = default_position['lng']
ctx['initial_state'] = self.initial_state
ctx['initial_zoom'] = self.initial_zoom
ctx['min_zoom'] = self.min_zoom
ctx['max_zoom'] = self.max_zoom
ctx['geojson_layers'] = self.get_geojson_layers()
ctx['tiles_layers'] = self.get_tiles_layers()
ctx['max_bounds'] = settings.COMBO_MAP_MAX_BOUNDS
ctx['group_markers'] = self.group_markers
ctx['marker_behaviour_onclick'] = self.marker_behaviour_onclick
return ctx
def get_maplayer_options(self):
return self.maplayeroptions_set.order_by('map_layer__kind', 'map_layer__label')
def get_free_geojson_layers(self):
used_layers = self.maplayeroptions_set.values('map_layer')
return MapLayer.objects.filter(kind='geojson').exclude(pk__in=used_layers)
def get_free_tiles_layers(self):
used_layers = self.maplayeroptions_set.values('map_layer')
return MapLayer.objects.filter(kind='tiles').exclude(pk__in=used_layers)
def export_subobjects(self):
return {
'layers': [x.get_as_serialized_object() for x in MapLayerOptions.objects.filter(map_cell=self)]
}
@classmethod
def prepare_serialized_data(cls, cell_data):
# ensure compatibility with old exports
if 'layers' in cell_data['fields']:
layers = cell_data['fields'].pop('layers')
cell_data['layers'] = [
{'fields': {'map_layer': layer}, 'model': 'maps.maplayeroptions'} for layer in layers
]
for layer_options in cell_data.get('layers') or []:
maplayer_slug = layer_options['fields']['map_layer'][0]
try:
MapLayer.objects.get(slug=maplayer_slug)
except MapLayer.DoesNotExist:
raise ImportSiteError(_('Unknown map layer "%s"') % maplayer_slug)
return cell_data
def import_subobjects(self, cell_json):
if 'layers' not in cell_json:
return
for layer in cell_json['layers']:
layer['fields']['map_cell'] = self.pk
for layer in serializers.deserialize('json', json.dumps(cell_json['layers'])):
layer.save()
def duplicate_m2m(self, new_cell):
# set layers
for options in self.maplayeroptions_set.all():
new_options = copy.deepcopy(options)
new_options.pk = None
new_options.map_cell = new_cell
new_options.save()
class MapLayerOptions(models.Model):
map_cell = models.ForeignKey(Map, on_delete=models.CASCADE, db_column='map_id')
map_layer = models.ForeignKey(
MapLayer, verbose_name=_('Layer'), on_delete=models.CASCADE, db_column='maplayer_id'
)
opacity = models.FloatField(
verbose_name=_('Opacity'),
validators=[validators.MinValueValidator(0), validators.MaxValueValidator(1)],
null=True,
help_text=_('Float value between 0 (transparent) and 1 (opaque)'),
)
properties = models.CharField(
_('Properties'),
max_length=500,
blank=True,
help_text=_('List of properties to display in popup, separated by commas.'),
)
class Meta:
db_table = 'maps_map_layers'
unique_together = ('map_cell', 'map_layer')
def get_as_serialized_object(self):
serialized_options = json.loads(
serializers.serialize(
'json', [self], use_natural_foreign_keys=True, use_natural_primary_keys=True
)
)[0]
del serialized_options['fields']['map_cell']
del serialized_options['pk']
return serialized_options