Ajouter un uuid au model Page (#67710) #25

Merged
lguerin merged 7 commits from wip/67710-page-uuid into main 2023-02-01 18:18:39 +01:00
14 changed files with 790 additions and 143 deletions

View File

@ -168,8 +168,7 @@ class SearchCell(CellBase):
return cell_data return cell_data
for options in cell_data['fields']['_search_services']['options'].values(): for options in cell_data['fields']['_search_services']['options'].values():
if options.get('target_page'): if options.get('target_page'):
page_slug = options['target_page'].strip('/').split('/')[-1] options['target_page'] = Page.get_page_ids_by_uuids().get(options['target_page'])
options['target_page'] = Page.get_page_ids_by_slugs().get(page_slug) or ''
return cell_data return cell_data
def modify_global_context(self, context, request): def modify_global_context(self, context, request):

View File

@ -947,8 +947,7 @@ class WcsCardCell(CardMixin, CellBase):
custom_schema = cell_data['fields']['custom_schema'] custom_schema = cell_data['fields']['custom_schema']
for cell in custom_schema.get('cells') or []: for cell in custom_schema.get('cells') or []:
if cell.get('page'): if cell.get('page'):
page_slug = cell['page'].strip('/').split('/')[-1] cell['page'] = Page.get_page_ids_by_uuids().get(cell['page']) or ''
cell['page'] = Page.get_page_ids_by_slugs().get(page_slug) or ''
return cell_data return cell_data

View File

@ -0,0 +1,16 @@
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('data', '0059_textcell_title'),
]
operations = [
migrations.AddField(
model_name='page',
name='uuid',
field=models.UUIDField(editable=False, null=True),
),
]

View File

@ -0,0 +1,54 @@
import hashlib
import uuid
from django.db import migrations
def natural_key(page):

reprise de différentes méthodes du model Page, pour générer un uuid à partir de la chaine de slug

reprise de différentes méthodes du model Page, pour générer un uuid à partir de la chaine de slug
def get_parents_and_self(page):
pages = [page]
_page = page
while _page.parent_id:
_page = _page.parent
pages.append(_page)
return list(reversed(pages))
def get_online_url(page):
parts = [x.slug for x in get_parents_and_self(page)]
if parts[0] == 'index':
parts = parts[1:]
if not parts:
return '/'
return '/' + '/'.join(parts) + '/'
return get_online_url(page).strip('/')
def forward(apps, schema_editor):
Page = apps.get_model('data', 'Page')
known_uuids = set(Page.objects.filter(uuid__isnull=False).values_list('uuid', flat=True))
for page in Page.objects.filter(uuid__isnull=True):
if page.snapshot is not None:

Pour ma culture, dans quel cas page.snapshot est vide ?

Pour ma culture, dans quel cas page.snapshot est vide ?

Pour une page normale, page.snapshot est None.

Lorsqu'on fait "view" ou "export" sur un snapshot, ça génère une Page avec le snapshot attaché (qu'on ne sort dans le code classique, hors migration qui n'utilise pas les managers, que via le manager Page.snapshots)

Pour une page normale, page.snapshot est None. Lorsqu'on fait "view" ou "export" sur un snapshot, ça génère une Page avec le snapshot attaché (qu'on ne sort dans le code classique, hors migration qui n'utilise pas les managers, que via le manager Page.snapshots)
page.uuid = uuid.uuid4()
page.save()
known_uuids.add(page.uuid)
continue
slug = natural_key(page) or 'index'
slug_hash = hashlib.sha256(slug.encode('utf-8'))
page.uuid = uuid.UUID(slug_hash.hexdigest()[:32])
if page.uuid in known_uuids:
# uuid unicity !
page.uuid = uuid.uuid4()

au cas où, pour éviter les collisions et permettre à la migration suivante de passer (migration qui pose un unique=True sur uuid)

au cas où, pour éviter les collisions et permettre à la migration suivante de passer (migration qui pose un unique=True sur uuid)

Précaution digne du code d'un robot spatial :)

Précaution digne du code d'un robot spatial :)
known_uuids.add(page.uuid)
page.save()
class Migration(migrations.Migration):
dependencies = [
('data', '0060_page_uuid'),
]
operations = [
migrations.RunPython(forward, reverse_code=migrations.RunPython.noop),
]

View File

@ -0,0 +1,18 @@
import uuid
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('data', '0061_page_uuid'),
]
operations = [
migrations.AlterField(
model_name='page',
name='uuid',
field=models.UUIDField(default=uuid.uuid4, editable=False, unique=True),
),
]

View File

@ -0,0 +1,36 @@
from django.db import migrations
def forward(apps, schema_editor):

Pour nettoyer le code: au lieu de faire la migration au load d'un snapshot, pour les anciennes cellules card, migration.
On pourra vider la migration et virer les tests (transactionnels, c'est long) plus tard.

Pour nettoyer le code: au lieu de faire la migration au load d'un snapshot, pour les anciennes cellules card, migration. On pourra vider la migration et virer les tests (transactionnels, c'est long) plus tard.
PageSnapshot = apps.get_model('data', 'PageSnapshot')
for snapshot in PageSnapshot.objects.all():
changed = False
for cell in snapshot.serialization.get('cells') or []:
if cell.get('model') not in ['wcs.wcscardinfoscell', 'wcs.wcscardscell']:
continue
if cell['model'] == 'wcs.wcscardinfoscell':
cell['model'] = 'wcs.wcscardcell'
elif cell['model'] == 'wcs.wcscardscell':
cell['model'] = 'wcs.wcscardcell'
cell['fields']['card_ids'] = ''
cell['fields']['related_card_path'] = '__all__'
cell['fields']['display_mode'] = 'table'
cell['fields']['title_type'] = 'auto'
cell['fields']['custom_schema'] = {}
if cell['fields'].get('custom_title'):
cell['fields']['title_type'] = 'manual'
changed = True
if changed:
snapshot.save()
class Migration(migrations.Migration):
dependencies = [
('data', '0062_page_uuid'),
('wcs', '0053_new_card_cell'),
]
operations = [
migrations.RunPython(forward, reverse_code=migrations.RunPython.noop),
]

View File

@ -0,0 +1,102 @@
import uuid
from django.db import migrations
def get_page_uuid(page_uuids_by_slugs, slug, default_index=False):
try:
uuid.UUID(slug)
except (ValueError, AttributeError):
pass
else:
# it's a uuid, don't change it
lguerin marked this conversation as resolved Outdated

Qu'est-ce qui justifie ce bout ? 0063_old_card_cells juste avant ?

En tout cas si une page a le malheur d'avoir un slug qui ressemble à un uuid ça va mal marcher.

Qu'est-ce qui justifie ce bout ? 0063_old_card_cells juste avant ? En tout cas si une page a le malheur d'avoir un slug qui ressemble à un uuid ça va mal marcher.

Juste au cas où on ait passé la migration 0062 et redémarré le service avant de passer la 0064, pour une raison x ou y. Dans ce cas on commencera à trouver des uuid dans les snapshots, c'était pour que ça fonctionne.
(de même dans la 0061 je ne traite que les uuids manquants, en gardant un set des uuids trouvés et générés)
Tu penses qu'on pourrait avoir des slugs qui sont des uuids ? :)

Juste au cas où on ait passé la migration 0062 et redémarré le service avant de passer la 0064, pour une raison x ou y. Dans ce cas on commencera à trouver des uuid dans les snapshots, c'était pour que ça fonctionne. (de même dans la 0061 je ne traite que les uuids manquants, en gardant un set des uuids trouvés et générés) Tu penses qu'on pourrait avoir des slugs qui sont des uuids ? :)

Tu penses qu'on pourrait avoir des slugs qui sont des uuids ? :)

C'est peu probable j'en conviens mais ça paraît facile de s'en prémunir, par exemple en inversant les deux blocs pour que ce soit 1/ regarder le slug est dans page_uuids_by_slugs 2/ sinon, si c'est un uuid, le renvoyer, ou alors la version solide vraiment regarder si l'uuid est dans {page.uuid for page in pages} avant de le renvoyer.

Mais comme tu le sens :)

> Tu penses qu'on pourrait avoir des slugs qui sont des uuids ? :) C'est peu probable j'en conviens mais ça paraît facile de s'en prémunir, par exemple en inversant les deux blocs pour que ce soit 1/ regarder le slug est dans page_uuids_by_slugs 2/ sinon, si c'est un uuid, le renvoyer, ou alors la version solide vraiment regarder si l'uuid est dans {page.uuid for page in pages} avant de le renvoyer. Mais comme tu le sens :)
if slug in page_uuids_by_slugs.values():
return slug
slug = str(slug).strip('/').rsplit('/', maxsplit=1)[-1]
if not slug and default_index:
slug = 'index'
return page_uuids_by_slugs.get(slug)
def forward(apps, schema_editor):
PageSnapshot = apps.get_model('data', 'PageSnapshot')
Page = apps.get_model('data', 'Page')

Le slug d'une page n'étant pas unique, qu'est-ce qui donne confiance que ça marche ?

Le slug d'une page n'étant pas unique, qu'est-ce qui donne confiance que ça marche ?

Je me suis fait la même remarque, mais dans le code précédent, pour tout ce qui est parent, et référence dans une cellule, on se contente de prendre le dernier bout de la chaîne et de chercher la page correspondante. Si plusieurs slugs, ça pète.
Là ça marchera, mais ça ne prendra peut-être pas la bonne page.

Je me suis fait la même remarque, mais dans le code précédent, pour tout ce qui est parent, et référence dans une cellule, on se contente de prendre le dernier bout de la chaîne et de chercher la page correspondante. Si plusieurs slugs, ça pète. Là ça marchera, mais ça ne prendra peut-être pas la bonne page.

OK si le cas slug dupliqué provoque un crash on peut raisonnablement supposer que ce n'est pas la peine de s'embêter avec.

OK si le cas slug dupliqué provoque un crash on peut raisonnablement supposer que ce n'est pas la peine de s'embêter avec.
page_uuids_by_slugs = {page.slug: str(page.uuid) for page in Page.objects.only('uuid', 'slug')}
for snapshot in PageSnapshot.objects.all():
changed = False

non nécessaire si on fait une restoration de snapshot, mais si on veut juste exporter un snapshot pour le visualiser et peut-être l'importer ailleurs, on aura le bon parent dans l'export.

non nécessaire si on fait une restoration de snapshot, mais si on veut juste exporter un snapshot pour le visualiser et peut-être l'importer ailleurs, on aura le bon parent dans l'export.
if (snapshot.serialization.get('fields') or {}).get('parent'):
new_uuid = get_page_uuid(
page_uuids_by_slugs, snapshot.serialization['fields']['parent'][0], default_index=True
)
if new_uuid:
new_uuid = [new_uuid]
snapshot.serialization['fields']['parent'] = new_uuid
changed = True
for cell in snapshot.serialization.get('cells') or []:
if cell.get('model') not in [
'data.linkcell',
'data.linklistcell',
'search.searchcell',
'wcs.wcscardcell',
]:
continue
if cell['model'] == 'data.linkcell':
if not cell['fields'].get('link_page'):
continue
new_uuid = get_page_uuid(page_uuids_by_slugs, cell['fields']['link_page'][0])
if new_uuid:
new_uuid = [new_uuid]
cell['fields']['link_page'] = new_uuid
changed = True
elif cell['model'] == 'data.linklistcell':
for link in cell['fields'].get('links') or []:
if link.get('model') != 'data.linkcell':
continue
if not link['fields'].get('link_page'):
continue
new_uuid = get_page_uuid(page_uuids_by_slugs, link['fields']['link_page'][0])
if new_uuid:
new_uuid = [new_uuid]
link['fields']['link_page'] = new_uuid
changed = True
elif cell['model'] == 'search.searchcell':
if not cell['fields'].get('_search_services'):
continue
if not cell['fields']['_search_services'].get('options'):
continue
for option in cell['fields']['_search_services']['options'].values():
if not option:
continue
if not option.get('target_page'):
continue
option['target_page'] = get_page_uuid(page_uuids_by_slugs, option['target_page'])
changed = True
elif cell['model'] == 'wcs.wcscardcell':
if not cell['fields'].get('custom_schema'):
continue
for custom_cell in cell['fields']['custom_schema'].get('cells') or []:
if not custom_cell.get('page'):
continue
custom_cell['page'] = get_page_uuid(page_uuids_by_slugs, custom_cell['page'])
changed = True
if changed:
snapshot.save()
class Migration(migrations.Migration):
dependencies = [
('data', '0063_old_card_cells'),
]
operations = [
migrations.RunPython(forward, reverse_code=migrations.RunPython.noop),
]

View File

@ -24,6 +24,7 @@ import os
import re import re
import subprocess import subprocess
import urllib.parse import urllib.parse
import uuid
import feedparser import feedparser
import requests import requests
@ -179,9 +180,8 @@ class PageManager(models.Manager):
self.snapshots = kwargs.pop('snapshots', False) self.snapshots = kwargs.pop('snapshots', False)
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
def get_by_natural_key(self, path): def get_by_natural_key(self, uuid):
parts = [x for x in path.strip('/').split('/') if x] or ['index'] return self.get(uuid=uuid)
return self.get(slug=parts[-1])
def get_queryset(self): def get_queryset(self):
queryset = super().get_queryset() queryset = super().get_queryset()
@ -195,6 +195,7 @@ class Page(models.Model):
objects = PageManager() objects = PageManager()
snapshots = PageManager(snapshots=True) snapshots = PageManager(snapshots=True)
uuid = models.UUIDField(default=uuid.uuid4, editable=False, unique=True)
title = models.CharField(_('Title'), max_length=150) title = models.CharField(_('Title'), max_length=150)
slug = models.SlugField(_('Slug')) slug = models.SlugField(_('Slug'))
sub_slug = models.CharField( sub_slug = models.CharField(
@ -262,12 +263,12 @@ class Page(models.Model):
return str(self.title) return str(self.title)
def natural_key(self): def natural_key(self):
return (self.get_online_url(follow_redirection=False).strip('/'),) return (str(self.uuid),)
@classmethod @classmethod
@utils.cache_during_request @utils.cache_during_request
def get_page_ids_by_slugs(cls): def get_page_ids_by_uuids(cls):
return {page.slug: page.pk for page in cls.objects.only('pk', 'slug')} return {str(page.uuid): page.pk for page in cls.objects.only('pk', 'uuid')}
def picture_extension(self): def picture_extension(self):
if not self.picture: if not self.picture:
@ -584,13 +585,13 @@ class Page(models.Model):
if page is None: if page is None:
qs_kwargs = {} qs_kwargs = {}
if snapshot: if snapshot:
qs_kwargs = {'snapshot': snapshot} qs_kwargs = {'snapshot': snapshot} # don't take uuid from snapshot: it has to be unique !
elif json_page['fields'].get('parent'): else:
qs_kwargs = {'parent__slug': json_page['fields']['parent'][0].split('/')[-1] or 'index'} qs_kwargs = {'uuid': json_page['fields']['uuid']}
page, created = Page.objects.get_or_create(slug=json_page['fields']['slug'], **qs_kwargs) page, created = Page.objects.get_or_create(**qs_kwargs)
json_page['pk'] = page.id json_page['pk'] = page.id
parent_slug = json_page['fields'].get('parent') or [] parent_uuid = json_page['fields'].get('parent') or []
if parent_slug and not Page.objects.filter(slug=parent_slug[0].split('/')[-1] or 'index').exists(): if parent_uuid and not Page.objects.filter(uuid=parent_uuid[0]).exists():
# parent not found, remove it and exclude page from navigation # parent not found, remove it and exclude page from navigation
json_page['fields'].pop('parent') json_page['fields'].pop('parent')
json_page['fields']['exclude_from_navigation'] = True json_page['fields']['exclude_from_navigation'] = True
@ -602,8 +603,12 @@ class Page(models.Model):
) )
% json_page['fields']['title'], % json_page['fields']['title'],
) )
page_uuid = page.uuid
page = next(serializers.deserialize('json', json.dumps([json_page]), ignorenonexistent=True)) page = next(serializers.deserialize('json', json.dumps([json_page]), ignorenonexistent=True))
page.object.snapshot = snapshot page.object.snapshot = snapshot
if snapshot:
# keep the generated uuid
page.object.uuid = page_uuid
page.save() page.save()
for cell in json_page.get('cells'): for cell in json_page.get('cells'):
cell['fields']['groups'] = [[x] for x in cell['fields']['groups'] if isinstance(x, str)] cell['fields']['groups'] = [[x] for x in cell['fields']['groups'] if isinstance(x, str)]
@ -622,16 +627,6 @@ class Page(models.Model):
def load_serialized_cells(cls, cells): def load_serialized_cells(cls, cells):
# load new cells # load new cells
for cell_data in cells: for cell_data in cells:
# legacy card cells, for snapshots
if cell_data['model'] == 'wcs.wcscardinfoscell':
cell_data['model'] = 'wcs.wcscardcell'
elif cell_data['model'] == 'wcs.wcscardscell':
cell_data['model'] = 'wcs.wcscardcell'
cell_data['fields']['related_card_path'] = '__all__'
cell_data['fields']['display_mode'] = 'table'
cell_data['fields']['title_type'] = 'auto'
if cell_data['fields'].get('custom_title'):
cell_data['fields']['title_type'] = 'manual'

ici, le code déplacé dans une migration

ici, le code déplacé dans une migration
model = apps.get_model(cell_data['model']) model = apps.get_model(cell_data['model'])
cell_data = model.prepare_serialized_data(cell_data) cell_data = model.prepare_serialized_data(cell_data)
cell = list(serializers.deserialize('json', json.dumps([cell_data]), ignorenonexistent=True))[0] cell = list(serializers.deserialize('json', json.dumps([cell_data]), ignorenonexistent=True))[0]
@ -651,13 +646,7 @@ class Page(models.Model):
for json_page in json_site: for json_page in json_site:
# pre-create pages # pre-create pages
parent = None page, created = Page.objects.get_or_create(uuid=json_page['fields']['uuid'])
if json_page['fields'].get('parent'):
parent = json_page['fields']['parent'][0].split('/')[-1] or 'index'
page, created = Page.objects.get_or_create(
slug=json_page['fields']['slug'], parent__slug=parent
)
to_load.append((page, created, json_page)) to_load.append((page, created, json_page))
# delete cells of already existing pages # delete cells of already existing pages
@ -720,6 +709,8 @@ class Page(models.Model):
new_page.title = title or _('Copy of %s') % self.title new_page.title = title or _('Copy of %s') % self.title
# reset slug # reset slug
new_page.slug = None new_page.slug = None
# reset uuid
new_page.uuid = uuid.uuid4()
# reset snapshot # reset snapshot
new_page.snapshot = None new_page.snapshot = None
# set order # set order
@ -776,10 +767,12 @@ class PageSnapshot(models.Model):
def restore(self): def restore(self):
json_page = self.serialization json_page = self.serialization
# keep current page uuid
json_page['fields']['uuid'] = str(self.page.uuid)
# keep current page order # keep current page order
json_page['fields']['order'] = self.page.order json_page['fields']['order'] = self.page.order
# and current parent # and current parent
json_page['fields']['parent'] = [self.page.parent.slug] if self.page.parent else None json_page['fields']['parent'] = self.page.parent.natural_key() if self.page.parent else None
# and current exclude_from_navigation value # and current exclude_from_navigation value
json_page['fields']['exclude_from_navigation'] = self.page.exclude_from_navigation json_page['fields']['exclude_from_navigation'] = self.page.exclude_from_navigation
# restore snapshot # restore snapshot
@ -1717,8 +1710,7 @@ class LinkCell(CellBase):
@classmethod @classmethod
def prepare_serialized_data(cls, cell_data): def prepare_serialized_data(cls, cell_data):
if cell_data['fields'].get('link_page'): if cell_data['fields'].get('link_page'):
link_page_slug = cell_data['fields']['link_page'][0].strip('/').split('/')[-1] if cell_data['fields']['link_page'][0] not in Page.get_page_ids_by_uuids():
if link_page_slug not in Page.get_page_ids_by_slugs():
del cell_data['fields']['link_page'] del cell_data['fields']['link_page']
return cell_data return cell_data

View File

@ -659,7 +659,7 @@ def snapshot_export(request, *args, **kwargs):
page = snapshot.get_page() page = snapshot.get_page()
response = HttpResponse(content_type='application/json') response = HttpResponse(content_type='application/json')
response['Content-Disposition'] = 'attachment; filename="export_page_{}_{}.json"'.format( response['Content-Disposition'] = 'attachment; filename="export_page_{}_{}.json"'.format(
snapshot.get_page().slug, snapshot.timestamp.strftime('%Y%m%d') page.slug, snapshot.timestamp.strftime('%Y%m%d')
) )
json.dump(page.get_serialized_page(), response, indent=2) json.dump(page.get_serialized_page(), response, indent=2)
return response return response

View File

@ -5,6 +5,7 @@ import shutil
import sys import sys
import tarfile import tarfile
import tempfile import tempfile
import uuid
from io import BytesIO, StringIO from io import BytesIO, StringIO
import pytest import pytest
@ -97,7 +98,7 @@ def test_import_export(app, some_data):
def test_import_export_with_parent(app, some_data): def test_import_export_with_parent(app, some_data):
output = get_output_of_command('export_site') output = get_output_of_command('export_site')
payload = json.loads(output) payload = json.loads(output)
payload['pages'][1]['fields']['parent'] = ['one'] payload['pages'][1]['fields']['parent'] = [str(Page.objects.get(slug='one').uuid)]
Page.objects.all().delete() Page.objects.all().delete()
import_site(data=payload) import_site(data=payload)
@ -111,7 +112,7 @@ def test_import_export_with_unknown_parent(app, some_data):
output = get_output_of_command('export_site') output = get_output_of_command('export_site')
payload = json.loads(output) payload = json.loads(output)
payload['pages'][0]['fields']['exclude_from_navigation'] = False payload['pages'][0]['fields']['exclude_from_navigation'] = False
payload['pages'][0]['fields']['parent'] = ['unknown-parent'] payload['pages'][0]['fields']['parent'] = [str(uuid.uuid4())]
Page.objects.all().delete() Page.objects.all().delete()
import_site(data=payload) import_site(data=payload)

View File

@ -5,6 +5,7 @@ import os
import re import re
import shutil import shutil
import urllib.parse import urllib.parse
import uuid
from io import BytesIO from io import BytesIO
from unittest import mock from unittest import mock
@ -925,12 +926,16 @@ def test_export_page_snapshot(freezer, app, admin_user):
history = app.get('/manage/pages/%s/history' % page.id, status=200) history = app.get('/manage/pages/%s/history' % page.id, status=200)
resp = history.click('export', index=0, verbose=True) resp = history.click('export', index=0, verbose=True)
snapshot_page = Page.snapshots.latest('pk')
assert snapshot_page.uuid != page.uuid
assert resp.headers['content-type'] == 'application/json' assert resp.headers['content-type'] == 'application/json'
assert resp.headers['content-disposition'] == 'attachment; filename="export_page_one_20200717.json"' assert resp.headers['content-disposition'] == 'attachment; filename="export_page_one_20200717.json"'
assert resp.json['fields']['title'] == 'Updated Title' assert resp.json['fields']['title'] == 'Updated Title'
resp = history.click('export', index=1, verbose=True) resp = history.click('export', index=1, verbose=True)
snapshot_page = Page.snapshots.latest('pk')
assert snapshot_page.uuid != page.uuid
assert resp.headers['content-type'] == 'application/json' assert resp.headers['content-type'] == 'application/json'
assert resp.headers['content-disposition'] == 'attachment; filename="export_page_one_20200716.json"' assert resp.headers['content-disposition'] == 'attachment; filename="export_page_one_20200716.json"'
@ -1001,7 +1006,7 @@ def test_export_page_with_redirection(app, admin_user):
resp = app.get('/manage/pages/%s/' % page1.pk) resp = app.get('/manage/pages/%s/' % page1.pk)
resp = resp.click('Export') resp = resp.click('Export')
assert resp.json['pages'][0]['cells'][0]['fields']['link_page'] == [ assert resp.json['pages'][0]['cells'][0]['fields']['link_page'] == [
'two' str(page2.uuid)
] # and not http://www.example.net ] # and not http://www.example.net
@ -1041,7 +1046,7 @@ def test_site_export_import_json(app, admin_user):
resp.form['site_file'] = Upload('site-export.json', site_export, 'application/json') resp.form['site_file'] = Upload('site-export.json', site_export, 'application/json')
with CaptureQueriesContext(connection) as ctx: with CaptureQueriesContext(connection) as ctx:
resp = resp.form.submit() resp = resp.form.submit()
assert len(ctx.captured_queries) in [304, 305] assert len(ctx.captured_queries) in [295, 296]
assert Page.objects.count() == 4 assert Page.objects.count() == 4
assert PageSnapshot.objects.all().count() == 4 assert PageSnapshot.objects.all().count() == 4
@ -1052,7 +1057,7 @@ def test_site_export_import_json(app, admin_user):
resp.form['site_file'] = Upload('site-export.json', site_export, 'application/json') resp.form['site_file'] = Upload('site-export.json', site_export, 'application/json')
with CaptureQueriesContext(connection) as ctx: with CaptureQueriesContext(connection) as ctx:
resp = resp.form.submit() resp = resp.form.submit()
assert len(ctx.captured_queries) == 274 assert len(ctx.captured_queries) == 276
assert set(Page.objects.get(slug='one').related_cells['cell_types']) == {'data_textcell', 'data_linkcell'} assert set(Page.objects.get(slug='one').related_cells['cell_types']) == {'data_textcell', 'data_linkcell'}
assert Page.objects.count() == 4 assert Page.objects.count() == 4
assert LinkCell.objects.count() == 2 assert LinkCell.objects.count() == 2
@ -1172,7 +1177,7 @@ def test_site_export_import_unknown_parent(app, admin_user):
resp = resp.form.submit() resp = resp.form.submit()
payload = json.loads(force_str(resp.body)) payload = json.loads(force_str(resp.body))
payload['pages'][0]['fields']['exclude_from_navigation'] = False payload['pages'][0]['fields']['exclude_from_navigation'] = False
payload['pages'][0]['fields']['parent'] = ['unknown-parent'] payload['pages'][0]['fields']['parent'] = [str(uuid.uuid4())]
resp = app.get('/manage/') resp = app.get('/manage/')
resp = resp.click('Import Site') resp = resp.click('Import Site')
@ -1193,13 +1198,15 @@ def test_site_export_import_unknown_page(app, admin_user):
resp = app.get('/manage/site-export') resp = app.get('/manage/site-export')
resp = resp.form.submit() resp = resp.form.submit()
payload = json.loads(force_str(resp.body)) payload = json.loads(force_str(resp.body))
payload['pages'][0]['cells'][0]['fields']['root_page'] = ['unknown-parent'] payload['pages'][0]['cells'][0]['fields']['root_page'] = [str(uuid.uuid4())]
Page.objects.all().delete() Page.objects.all().delete()
resp = app.get('/manage/site-import') resp = app.get('/manage/site-import')
resp.form['site_file'] = Upload('site-export.json', force_bytes(json.dumps(payload)), 'application/json') resp.form['site_file'] = Upload('site-export.json', force_bytes(json.dumps(payload)), 'application/json')
resp = resp.form.submit() resp = resp.form.submit()
assert resp.context['form'].errors['site_file'] == ['Unknown page "unknown-parent".'] assert resp.context['form'].errors['site_file'] == [
'Unknown page "%s".' % payload['pages'][0]['cells'][0]['fields']['root_page'][0]
]
assert Page.objects.count() == 0 assert Page.objects.count() == 0
resp = app.get('/manage/site-import') resp = app.get('/manage/site-import')
@ -1307,6 +1314,7 @@ def test_duplicate_page(app, admin_user):
in resp.text in resp.text
) )
assert new_page.exclude_from_navigation is True assert new_page.exclude_from_navigation is True
assert new_page.uuid != page.uuid
page.exclude_from_navigation = True page.exclude_from_navigation = True
page.save() page.save()
@ -2470,9 +2478,6 @@ def test_page_discover_placeholder_with_error_cells(app, admin_user):
def test_page_versionning(app, admin_user): def test_page_versionning(app, admin_user):
Page.objects.all().delete()
PageSnapshot.objects.all()
page = Page(title='One', slug='one') page = Page(title='One', slug='one')
page.save() page.save()
@ -2517,8 +2522,10 @@ def test_page_versionning(app, admin_user):
with CaptureQueriesContext(connection) as ctx: with CaptureQueriesContext(connection) as ctx:
resp2 = resp.click('view', index=1) resp2 = resp.click('view', index=1)
assert len(ctx.captured_queries) == 70 assert len(ctx.captured_queries) == 72
assert Page.snapshots.latest('pk').related_cells == {'cell_types': ['data_textcell']} assert Page.snapshots.latest('pk').related_cells == {'cell_types': ['data_textcell']}
snapshot_page = Page.snapshots.latest('pk')
assert snapshot_page.uuid != page.uuid
assert resp2.text.index('Hello world') < resp2.text.index('Foobar3') assert resp2.text.index('Hello world') < resp2.text.index('Foobar3')
resp2 = resp.click('view', index=0) resp2 = resp.click('view', index=0)
@ -2616,6 +2623,8 @@ def test_restore_page_attributes(app):
snapshot = PageSnapshot.objects.latest('pk') snapshot = PageSnapshot.objects.latest('pk')
# move page2, remove page 1 # move page2, remove page 1
new_uuid = uuid.uuid4()
page2.uuid = new_uuid
page2.parent = page4 page2.parent = page4
page2.order = 3 page2.order = 3
page2.save() page2.save()
@ -2635,6 +2644,7 @@ def test_restore_page_attributes(app):
# check some attributes # check some attributes
assert page2.order == 3 assert page2.order == 3
assert page2.parent == page4 assert page2.parent == page4
assert page2.uuid == new_uuid
assert page2.exclude_from_navigation is False assert page2.exclude_from_navigation is False
PageSnapshot.take(page2) PageSnapshot.take(page2)

504
tests/test_migrations.py Normal file
View File

@ -0,0 +1,504 @@
import uuid
from django.db import connection
from django.db.migrations.executor import MigrationExecutor
def test_page_snapshot_with_old_card_cells_migration(transactional_db):
migrate_from = [('data', '0062_page_uuid')]
migrate_to = [('data', '0063_old_card_cells')]
executor = MigrationExecutor(connection)
old_apps = executor.loader.project_state(migrate_from).apps
executor.migrate(migrate_from)
pagesnapshot_class = old_apps.get_model('data', 'PageSnapshot')
snapshot = pagesnapshot_class.objects.create(
serialization={
'cells': [
{
'model': 'wcs.wcscardinfoscell',
'fields': {
'card_ids': '42,35',
'title_type': 'manual',
'custom_title': 'my-title',
'display_mode': 'card',
'without_user': True,
'custom_schema': {},
'only_for_user': True,
'carddef_reference': 'default:card_model_1',
'related_card_path': '',
'limit': 42,
'slug': 'my-card',
'order': 1,
'groups': [],
'public': True,
'condition': 'my-condition',
'placeholder': 'content',
'template_name': None,
'extra_css_class': '',
'last_update_timestamp': '2022-08-11T13:57:43.362Z',
'restricted_to_unlogged': False,
},
},
{
'model': 'wcs.wcscardscell',
'fields': {
'custom_title': 'my-other-title',
'without_user': False,
'only_for_user': False,
'carddef_reference': 'default:card_model_1',
'limit': 35,
'slug': 'my-other-card',
'order': 2,
'groups': [],
'public': True,
'condition': '',
'placeholder': 'content',
'template_name': None,
'extra_css_class': '',
'last_update_timestamp': '2022-08-12T07:19:18.541Z',
'restricted_to_unlogged': False,
},
},
],
}
)
executor = MigrationExecutor(connection)
executor.migrate(migrate_to)
executor.loader.build_graph()
apps = executor.loader.project_state(migrate_to).apps
pagesnapshot_class = apps.get_model('data', 'PageSnapshot')
snapshot = pagesnapshot_class.objects.get()
assert snapshot.serialization['cells'][0] == {
'model': 'wcs.wcscardcell',
'fields': {
'card_ids': '42,35',
'title_type': 'manual',
'custom_title': 'my-title',
'display_mode': 'card',
'without_user': True,
'custom_schema': {},
'only_for_user': True,
'carddef_reference': 'default:card_model_1',
'related_card_path': '',
'limit': 42,
'slug': 'my-card',
'order': 1,
'groups': [],
'public': True,
'condition': 'my-condition',
'placeholder': 'content',
'template_name': None,
'extra_css_class': '',
'last_update_timestamp': '2022-08-11T13:57:43.362Z',
'restricted_to_unlogged': False,
},
}
assert snapshot.serialization['cells'][1] == {
'model': 'wcs.wcscardcell',
'fields': {
'card_ids': '',
'title_type': 'manual',
'custom_title': 'my-other-title',
'display_mode': 'table',
'without_user': False,
'custom_schema': {},
'only_for_user': False,
'carddef_reference': 'default:card_model_1',
'related_card_path': '__all__',
'limit': 35,
'slug': 'my-other-card',
'order': 2,
'groups': [],
'public': True,
'condition': '',
'placeholder': 'content',
'template_name': None,
'extra_css_class': '',
'last_update_timestamp': '2022-08-12T07:19:18.541Z',
'restricted_to_unlogged': False,
},
}
def test_page_snapshot_uuids_migration(transactional_db):
migrate_from = [('data', '0063_old_card_cells')]
migrate_to = [('data', '0064_snapshot_uuids')]
executor = MigrationExecutor(connection)
old_apps = executor.loader.project_state(migrate_from).apps
executor.migrate(migrate_from)
page_class = old_apps.get_model('data', 'Page')
pagesnapshot_class = old_apps.get_model('data', 'PageSnapshot')
root = page_class.objects.create(order=0, slug='index')
page = page_class.objects.create(order=1, slug='slug')
old_uuid = uuid.uuid4()
link_cells = [
{
'model': 'data.linkcell',
'fields': {
'link_page': None,
},
},
{
'model': 'data.linkcell',
'fields': {
'link_page': [str(old_uuid)],
},
},
{
'model': 'data.linkcell',
'fields': {
'link_page': [str(page.uuid)], # will not change
},
},
{
'model': 'data.linkcell',
'fields': {
'link_page': ['unknown'],
},
},
{
'model': 'data.linkcell',
'fields': {
'link_page': ['slug'],
},
},
{
'model': 'data.linkcell',
'fields': {
'link_page': ['foo/bar/slug'],
},
},
]
snapshot1 = pagesnapshot_class.objects.create(
serialization={
'fields': {
'parent': None,
},
'cells': link_cells
+ [
{
'model': 'data.linklistcell',
'fields': {
'links': link_cells,
},
},
{
'model': 'search.searchcell',
'fields': {
'_search_services': {
'options': {
'foobar': {
'target_page': None,
}
}
}
},
},
{
'model': 'search.searchcell',
'fields': {
'_search_services': {
'options': {
'foobar': {
'target_page': str(old_uuid),
}
}
}
},
},
{
'model': 'search.searchcell',
'fields': {
'_search_services': {
'options': {
'foobar': {
'target_page': str(page.uuid), # will not change
}
}
}
},
},
{
'model': 'search.searchcell',
'fields': {
'_search_services': {
'options': {
'foobar': {
'target_page': 'unknown',
}
}
}
},
},
{
'model': 'search.searchcell',
'fields': {
'_search_services': {
'options': {
'foobar': {
'target_page': 'slug',
}
}
}
},
},
{
'model': 'search.searchcell',
'fields': {
'_search_services': {
'options': {
'foobar': {
'target_page': 'foo/bar/slug',
}
}
}
},
},
{
'model': 'wcs.wcscardcell',
'fields': {
'custom_schema': {
'cells': [
{
'page': None,
},
{
'page': str(old_uuid),
},
{
'page': str(page.uuid), # will not change
},
{
'page': 'unknown',
},
{
'page': 'slug',
},
{
'page': 'foo/bar/slug',
},
],
},
},
},
],
}
)
snapshot2 = pagesnapshot_class.objects.create(
serialization={
'fields': {
'parent': [str(old_uuid)],
}
}
)
snapshot3 = pagesnapshot_class.objects.create(
serialization={
'fields': {
'parent': [str(page.uuid)], # will not change
}
}
)
snapshot4 = pagesnapshot_class.objects.create(
serialization={
'fields': {
'parent': ['unknown'],
}
}
)
snapshot5 = pagesnapshot_class.objects.create(
serialization={
'fields': {
'parent': ['slug'],
}
}
)
snapshot6 = pagesnapshot_class.objects.create(
serialization={
'fields': {
'parent': ['foo/bar/slug'],
}
}
)
snapshot7 = pagesnapshot_class.objects.create(
serialization={
'fields': {
'parent': [''], # index !
}
}
)
executor = MigrationExecutor(connection)
executor.migrate(migrate_to)
executor.loader.build_graph()
apps = executor.loader.project_state(migrate_to).apps
pagesnapshot_class = apps.get_model('data', 'PageSnapshot')
snapshot1 = pagesnapshot_class.objects.get(pk=snapshot1.pk)
assert snapshot1.serialization['fields']['parent'] is None
new_link_cells = [
{
'model': 'data.linkcell',
'fields': {
'link_page': None,
},
},
{
'model': 'data.linkcell',
'fields': {
'link_page': None,
},
},
{
'model': 'data.linkcell',
'fields': {
'link_page': [str(page.uuid)],
},
},
{
'model': 'data.linkcell',
'fields': {
'link_page': None,
},
},
{
'model': 'data.linkcell',
'fields': {
'link_page': [str(page.uuid)],
},
},
{
'model': 'data.linkcell',
'fields': {
'link_page': [str(page.uuid)],
},
},
]
assert snapshot1.serialization['cells'] == new_link_cells + [
{
'model': 'data.linklistcell',
'fields': {
'links': new_link_cells,
},
},
{
'model': 'search.searchcell',
'fields': {
'_search_services': {
'options': {
'foobar': {
'target_page': None,
}
}
}
},
},
{
'model': 'search.searchcell',
'fields': {
'_search_services': {
'options': {
'foobar': {
'target_page': None,
}
}
}
},
},
{
'model': 'search.searchcell',
'fields': {
'_search_services': {
'options': {
'foobar': {
'target_page': str(page.uuid),
}
}
}
},
},
{
'model': 'search.searchcell',
'fields': {
'_search_services': {
'options': {
'foobar': {
'target_page': None,
}
}
}
},
},
{
'model': 'search.searchcell',
'fields': {
'_search_services': {
'options': {
'foobar': {
'target_page': str(page.uuid),
}
}
}
},
},
{
'model': 'search.searchcell',
'fields': {
'_search_services': {
'options': {
'foobar': {
'target_page': str(page.uuid),
}
}
}
},
},
{
'model': 'wcs.wcscardcell',
'fields': {
'custom_schema': {
'cells': [
{
'page': None,
},
{
'page': None,
},
{
'page': str(page.uuid),
},
{
'page': None,
},
{
'page': str(page.uuid),
},
{
'page': str(page.uuid),
},
],
},
},
},
]
snapshot2 = pagesnapshot_class.objects.get(pk=snapshot2.pk)
assert snapshot2.serialization['fields']['parent'] is None
snapshot3 = pagesnapshot_class.objects.get(pk=snapshot3.pk)
assert snapshot3.serialization['fields']['parent'] == [str(page.uuid)]
snapshot4 = pagesnapshot_class.objects.get(pk=snapshot4.pk)
assert snapshot4.serialization['fields']['parent'] is None
snapshot5 = pagesnapshot_class.objects.get(pk=snapshot5.pk)
assert snapshot5.serialization['fields']['parent'] == [str(page.uuid)]
snapshot6 = pagesnapshot_class.objects.get(pk=snapshot6.pk)
assert snapshot6.serialization['fields']['parent'] == [str(page.uuid)]
snapshot7 = pagesnapshot_class.objects.get(pk=snapshot7.pk)
assert snapshot7.serialization['fields']['parent'] == [str(root.uuid)]

View File

@ -2,6 +2,7 @@ import datetime
import json import json
import os import os
import sys import sys
import uuid
from io import StringIO from io import StringIO
from unittest import mock from unittest import mock
@ -1689,12 +1690,9 @@ def test_test_export_import_search_cell_with_target_page():
site_export = get_output_of_command('export_site') site_export = get_output_of_command('export_site')
site_data = json.loads(site_export) site_data = json.loads(site_export)
assert len(site_data['pages']) == 3 assert len(site_data['pages']) == 3
assert ( assert site_data['pages'][-1]['cells'][0]['fields']['_search_services']['options'][
site_data['pages'][-1]['cells'][0]['fields']['_search_services']['options'][ 'cards:c21f969b:card-bar'
'cards:c21f969b:card-bar' ]['target_page'] == str(card_page.uuid)
]['target_page']
== 'root/card'
)
import_site(data={}, clean=True) import_site(data={}, clean=True)
assert Page.objects.all().count() == 0 assert Page.objects.all().count() == 0
@ -1717,8 +1715,8 @@ def test_test_export_import_search_cell_with_target_page():
site_data['pages'][-1]['cells'][0]['fields']['_search_services']['options']['cards:c21f969b:card-bar'][ site_data['pages'][-1]['cells'][0]['fields']['_search_services']['options']['cards:c21f969b:card-bar'][
'target_page' 'target_page'
] = 'unknown' ] = str(uuid.uuid4())
import_site(data=site_data, clean=True) import_site(data=site_data, clean=True)
new_card_page = Page.objects.get(slug='card') new_card_page = Page.objects.get(slug='card')
new_cell = SearchCell.objects.get() new_cell = SearchCell.objects.get()
assert new_cell._search_services['options']['cards:c21f969b:card-bar']['target_page'] == '' assert new_cell._search_services['options']['cards:c21f969b:card-bar']['target_page'] is None

View File

@ -17,7 +17,7 @@ from requests.models import Response
from combo.apps.wcs.forms import WcsCardCellDisplayForm from combo.apps.wcs.forms import WcsCardCellDisplayForm
from combo.apps.wcs.models import WcsCardCell from combo.apps.wcs.models import WcsCardCell
from combo.data.models import Page, PageSnapshot, TextCell, ValidityInfo from combo.data.models import Page, TextCell, ValidityInfo
from combo.data.utils import import_site from combo.data.utils import import_site
from tests.test_manager import login from tests.test_manager import login
from tests.utils import manager_submit_cell from tests.utils import manager_submit_cell
@ -3264,90 +3264,6 @@ def test_card_cell_assets(mock_send, settings, app, admin_user):
assert 'Logo — %s' % cell2.get_label_for_asset() in resp.text assert 'Logo — %s' % cell2.get_label_for_asset() in resp.text
def test_page_snapshot_with_old_card_cells():
page = Page.objects.create(title='xxx', slug='test_snapshots', template_name='standard')
PageSnapshot.take(page)
snapshot = page.pagesnapshot_set.first()
snapshot.serialization['cells'].append(
{
'model': 'wcs.wcscardinfoscell',
'fields': {
'slug': 'my-card',
'limit': 42,
'order': 1,
'groups': [],
'public': True,
'card_ids': '42,35',
'condition': 'my-condition',
'title_type': 'manual',
'placeholder': 'content',
'custom_title': 'my-title',
'display_mode': 'card',
'without_user': True,
'custom_schema': {},
'only_for_user': True,
'template_name': None,
'extra_css_class': '',
'carddef_reference': 'default:card_model_1',
'related_card_path': '',
'last_update_timestamp': '2022-08-11T13:57:43.362Z',
'restricted_to_unlogged': False,
'page': page.pk,
},
}
)
snapshot.serialization['cells'].append(
{
'model': 'wcs.wcscardscell',
'fields': {
'slug': 'my-other-card',
'limit': 35,
'order': 2,
'groups': [],
'public': True,
'condition': '',
'placeholder': 'content',
'custom_title': 'my-other-title',
'without_user': False,
'only_for_user': False,
'template_name': None,
'extra_css_class': '',
'carddef_reference': 'default:card_model_1',
'last_update_timestamp': '2022-08-12T07:19:18.541Z',
'restricted_to_unlogged': False,
'page': page.pk,
},
}
)
old_page = snapshot.get_page()
cell1 = old_page.get_cells()[0]
assert isinstance(cell1, WcsCardCell)
assert cell1.slug == 'my-card'
assert cell1.limit == 42
assert cell1.card_ids == '42,35'
assert cell1.title_type == 'manual'
assert cell1.custom_title == 'my-title'
assert cell1.display_mode == 'card'
assert cell1.without_user is True
assert cell1.custom_schema == {}
assert cell1.only_for_user is True
assert cell1.carddef_reference == 'default:card_model_1'
assert cell1.related_card_path == ''
cell2 = old_page.get_cells()[1]
assert isinstance(cell1, WcsCardCell)
assert cell2.slug == 'my-other-card'
assert cell2.limit == 35
assert cell2.card_ids == ''
assert cell2.title_type == 'manual'
assert cell2.custom_title == 'my-other-title'
assert cell2.display_mode == 'table'
assert cell2.without_user is False
assert cell2.custom_schema == {}
assert cell2.only_for_user is False
assert cell2.carddef_reference == 'default:card_model_1'
assert cell2.related_card_path == '__all__'
def get_output_of_command(command, *args, **kwargs): def get_output_of_command(command, *args, **kwargs):
old_stdout = sys.stdout old_stdout = sys.stdout
output = sys.stdout = StringIO() output = sys.stdout = StringIO()
@ -3387,7 +3303,9 @@ def test_export_import_card_cell_with_page_link():
site_export = get_output_of_command('export_site') site_export = get_output_of_command('export_site')
site_data = json.loads(site_export) site_data = json.loads(site_export)
assert len(site_data['pages']) == 3 assert len(site_data['pages']) == 3
assert site_data['pages'][-1]['cells'][0]['fields']['custom_schema']['cells'][0]['page'] == 'root/card' assert site_data['pages'][-1]['cells'][0]['fields']['custom_schema']['cells'][0]['page'] == str(
card_page.uuid
)
import_site(data={}, clean=True) import_site(data={}, clean=True)
assert Page.objects.all().count() == 0 assert Page.objects.all().count() == 0