Ajouter un uuid au model Page (#67710) #25

Merged
lguerin merged 7 commits from wip/67710-page-uuid into main 2023-02-01 18:18:39 +01:00
14 changed files with 790 additions and 143 deletions

View File

@ -168,8 +168,7 @@ class SearchCell(CellBase):
return cell_data
for options in cell_data['fields']['_search_services']['options'].values():
if options.get('target_page'):
page_slug = options['target_page'].strip('/').split('/')[-1]
options['target_page'] = Page.get_page_ids_by_slugs().get(page_slug) or ''
options['target_page'] = Page.get_page_ids_by_uuids().get(options['target_page'])
return cell_data
def modify_global_context(self, context, request):

View File

@ -947,8 +947,7 @@ class WcsCardCell(CardMixin, CellBase):
custom_schema = cell_data['fields']['custom_schema']
for cell in custom_schema.get('cells') or []:
if cell.get('page'):
page_slug = cell['page'].strip('/').split('/')[-1]
cell['page'] = Page.get_page_ids_by_slugs().get(page_slug) or ''
cell['page'] = Page.get_page_ids_by_uuids().get(cell['page']) or ''
return cell_data

View File

@ -0,0 +1,16 @@
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('data', '0059_textcell_title'),
]
operations = [
migrations.AddField(
model_name='page',
name='uuid',
field=models.UUIDField(editable=False, null=True),
),
]

View File

@ -0,0 +1,54 @@
import hashlib
import uuid
from django.db import migrations
def natural_key(page):

reprise de différentes méthodes du model Page, pour générer un uuid à partir de la chaine de slug

reprise de différentes méthodes du model Page, pour générer un uuid à partir de la chaine de slug
def get_parents_and_self(page):
pages = [page]
_page = page
while _page.parent_id:
_page = _page.parent
pages.append(_page)
return list(reversed(pages))
def get_online_url(page):
parts = [x.slug for x in get_parents_and_self(page)]
if parts[0] == 'index':
parts = parts[1:]
if not parts:
return '/'
return '/' + '/'.join(parts) + '/'
return get_online_url(page).strip('/')
def forward(apps, schema_editor):
Page = apps.get_model('data', 'Page')
known_uuids = set(Page.objects.filter(uuid__isnull=False).values_list('uuid', flat=True))
for page in Page.objects.filter(uuid__isnull=True):
if page.snapshot is not None:

Pour ma culture, dans quel cas page.snapshot est vide ?

Pour ma culture, dans quel cas page.snapshot est vide ?

Pour une page normale, page.snapshot est None.

Lorsqu'on fait "view" ou "export" sur un snapshot, ça génère une Page avec le snapshot attaché (qu'on ne sort dans le code classique, hors migration qui n'utilise pas les managers, que via le manager Page.snapshots)

Pour une page normale, page.snapshot est None. Lorsqu'on fait "view" ou "export" sur un snapshot, ça génère une Page avec le snapshot attaché (qu'on ne sort dans le code classique, hors migration qui n'utilise pas les managers, que via le manager Page.snapshots)
page.uuid = uuid.uuid4()
page.save()
known_uuids.add(page.uuid)
continue
slug = natural_key(page) or 'index'
slug_hash = hashlib.sha256(slug.encode('utf-8'))
page.uuid = uuid.UUID(slug_hash.hexdigest()[:32])
if page.uuid in known_uuids:
# uuid unicity !
page.uuid = uuid.uuid4()

au cas où, pour éviter les collisions et permettre à la migration suivante de passer (migration qui pose un unique=True sur uuid)

au cas où, pour éviter les collisions et permettre à la migration suivante de passer (migration qui pose un unique=True sur uuid)

Précaution digne du code d'un robot spatial :)

Précaution digne du code d'un robot spatial :)
known_uuids.add(page.uuid)
page.save()
class Migration(migrations.Migration):
dependencies = [
('data', '0060_page_uuid'),
]
operations = [
migrations.RunPython(forward, reverse_code=migrations.RunPython.noop),
]

View File

@ -0,0 +1,18 @@
import uuid
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('data', '0061_page_uuid'),
]
operations = [
migrations.AlterField(
model_name='page',
name='uuid',
field=models.UUIDField(default=uuid.uuid4, editable=False, unique=True),
),
]

View File

@ -0,0 +1,36 @@
from django.db import migrations
def forward(apps, schema_editor):

Pour nettoyer le code: au lieu de faire la migration au load d'un snapshot, pour les anciennes cellules card, migration.
On pourra vider la migration et virer les tests (transactionnels, c'est long) plus tard.

Pour nettoyer le code: au lieu de faire la migration au load d'un snapshot, pour les anciennes cellules card, migration. On pourra vider la migration et virer les tests (transactionnels, c'est long) plus tard.
PageSnapshot = apps.get_model('data', 'PageSnapshot')
for snapshot in PageSnapshot.objects.all():
changed = False
for cell in snapshot.serialization.get('cells') or []:
if cell.get('model') not in ['wcs.wcscardinfoscell', 'wcs.wcscardscell']:
continue
if cell['model'] == 'wcs.wcscardinfoscell':
cell['model'] = 'wcs.wcscardcell'
elif cell['model'] == 'wcs.wcscardscell':
cell['model'] = 'wcs.wcscardcell'
cell['fields']['card_ids'] = ''
cell['fields']['related_card_path'] = '__all__'
cell['fields']['display_mode'] = 'table'
cell['fields']['title_type'] = 'auto'
cell['fields']['custom_schema'] = {}
if cell['fields'].get('custom_title'):
cell['fields']['title_type'] = 'manual'
changed = True
if changed:
snapshot.save()
class Migration(migrations.Migration):
dependencies = [
('data', '0062_page_uuid'),
('wcs', '0053_new_card_cell'),
]
operations = [
migrations.RunPython(forward, reverse_code=migrations.RunPython.noop),
]

View File

@ -0,0 +1,102 @@
import uuid
from django.db import migrations
def get_page_uuid(page_uuids_by_slugs, slug, default_index=False):
try:
uuid.UUID(slug)
except (ValueError, AttributeError):
pass
else:
# it's a uuid, don't change it
lguerin marked this conversation as resolved Outdated

Qu'est-ce qui justifie ce bout ? 0063_old_card_cells juste avant ?

En tout cas si une page a le malheur d'avoir un slug qui ressemble à un uuid ça va mal marcher.

Qu'est-ce qui justifie ce bout ? 0063_old_card_cells juste avant ? En tout cas si une page a le malheur d'avoir un slug qui ressemble à un uuid ça va mal marcher.

Juste au cas où on ait passé la migration 0062 et redémarré le service avant de passer la 0064, pour une raison x ou y. Dans ce cas on commencera à trouver des uuid dans les snapshots, c'était pour que ça fonctionne.
(de même dans la 0061 je ne traite que les uuids manquants, en gardant un set des uuids trouvés et générés)
Tu penses qu'on pourrait avoir des slugs qui sont des uuids ? :)

Juste au cas où on ait passé la migration 0062 et redémarré le service avant de passer la 0064, pour une raison x ou y. Dans ce cas on commencera à trouver des uuid dans les snapshots, c'était pour que ça fonctionne. (de même dans la 0061 je ne traite que les uuids manquants, en gardant un set des uuids trouvés et générés) Tu penses qu'on pourrait avoir des slugs qui sont des uuids ? :)

Tu penses qu'on pourrait avoir des slugs qui sont des uuids ? :)

C'est peu probable j'en conviens mais ça paraît facile de s'en prémunir, par exemple en inversant les deux blocs pour que ce soit 1/ regarder le slug est dans page_uuids_by_slugs 2/ sinon, si c'est un uuid, le renvoyer, ou alors la version solide vraiment regarder si l'uuid est dans {page.uuid for page in pages} avant de le renvoyer.

Mais comme tu le sens :)

> Tu penses qu'on pourrait avoir des slugs qui sont des uuids ? :) C'est peu probable j'en conviens mais ça paraît facile de s'en prémunir, par exemple en inversant les deux blocs pour que ce soit 1/ regarder le slug est dans page_uuids_by_slugs 2/ sinon, si c'est un uuid, le renvoyer, ou alors la version solide vraiment regarder si l'uuid est dans {page.uuid for page in pages} avant de le renvoyer. Mais comme tu le sens :)
if slug in page_uuids_by_slugs.values():
return slug
slug = str(slug).strip('/').rsplit('/', maxsplit=1)[-1]
if not slug and default_index:
slug = 'index'
return page_uuids_by_slugs.get(slug)
def forward(apps, schema_editor):
PageSnapshot = apps.get_model('data', 'PageSnapshot')
Page = apps.get_model('data', 'Page')

Le slug d'une page n'étant pas unique, qu'est-ce qui donne confiance que ça marche ?

Le slug d'une page n'étant pas unique, qu'est-ce qui donne confiance que ça marche ?

Je me suis fait la même remarque, mais dans le code précédent, pour tout ce qui est parent, et référence dans une cellule, on se contente de prendre le dernier bout de la chaîne et de chercher la page correspondante. Si plusieurs slugs, ça pète.
Là ça marchera, mais ça ne prendra peut-être pas la bonne page.

Je me suis fait la même remarque, mais dans le code précédent, pour tout ce qui est parent, et référence dans une cellule, on se contente de prendre le dernier bout de la chaîne et de chercher la page correspondante. Si plusieurs slugs, ça pète. Là ça marchera, mais ça ne prendra peut-être pas la bonne page.

OK si le cas slug dupliqué provoque un crash on peut raisonnablement supposer que ce n'est pas la peine de s'embêter avec.

OK si le cas slug dupliqué provoque un crash on peut raisonnablement supposer que ce n'est pas la peine de s'embêter avec.
page_uuids_by_slugs = {page.slug: str(page.uuid) for page in Page.objects.only('uuid', 'slug')}
for snapshot in PageSnapshot.objects.all():
changed = False

non nécessaire si on fait une restoration de snapshot, mais si on veut juste exporter un snapshot pour le visualiser et peut-être l'importer ailleurs, on aura le bon parent dans l'export.

non nécessaire si on fait une restoration de snapshot, mais si on veut juste exporter un snapshot pour le visualiser et peut-être l'importer ailleurs, on aura le bon parent dans l'export.
if (snapshot.serialization.get('fields') or {}).get('parent'):
new_uuid = get_page_uuid(
page_uuids_by_slugs, snapshot.serialization['fields']['parent'][0], default_index=True
)
if new_uuid:
new_uuid = [new_uuid]
snapshot.serialization['fields']['parent'] = new_uuid
changed = True
for cell in snapshot.serialization.get('cells') or []:
if cell.get('model') not in [
'data.linkcell',
'data.linklistcell',
'search.searchcell',
'wcs.wcscardcell',
]:
continue
if cell['model'] == 'data.linkcell':
if not cell['fields'].get('link_page'):
continue
new_uuid = get_page_uuid(page_uuids_by_slugs, cell['fields']['link_page'][0])
if new_uuid:
new_uuid = [new_uuid]
cell['fields']['link_page'] = new_uuid
changed = True
elif cell['model'] == 'data.linklistcell':
for link in cell['fields'].get('links') or []:
if link.get('model') != 'data.linkcell':
continue
if not link['fields'].get('link_page'):
continue
new_uuid = get_page_uuid(page_uuids_by_slugs, link['fields']['link_page'][0])
if new_uuid:
new_uuid = [new_uuid]
link['fields']['link_page'] = new_uuid
changed = True
elif cell['model'] == 'search.searchcell':
if not cell['fields'].get('_search_services'):
continue
if not cell['fields']['_search_services'].get('options'):
continue
for option in cell['fields']['_search_services']['options'].values():
if not option:
continue
if not option.get('target_page'):
continue
option['target_page'] = get_page_uuid(page_uuids_by_slugs, option['target_page'])
changed = True
elif cell['model'] == 'wcs.wcscardcell':
if not cell['fields'].get('custom_schema'):
continue
for custom_cell in cell['fields']['custom_schema'].get('cells') or []:
if not custom_cell.get('page'):
continue
custom_cell['page'] = get_page_uuid(page_uuids_by_slugs, custom_cell['page'])
changed = True
if changed:
snapshot.save()
class Migration(migrations.Migration):
dependencies = [
('data', '0063_old_card_cells'),
]
operations = [
migrations.RunPython(forward, reverse_code=migrations.RunPython.noop),
]

View File

@ -24,6 +24,7 @@ import os
import re
import subprocess
import urllib.parse
import uuid
import feedparser
import requests
@ -179,9 +180,8 @@ class PageManager(models.Manager):
self.snapshots = kwargs.pop('snapshots', False)
super().__init__(*args, **kwargs)
def get_by_natural_key(self, path):
parts = [x for x in path.strip('/').split('/') if x] or ['index']
return self.get(slug=parts[-1])
def get_by_natural_key(self, uuid):
return self.get(uuid=uuid)
def get_queryset(self):
queryset = super().get_queryset()
@ -195,6 +195,7 @@ class Page(models.Model):
objects = PageManager()
snapshots = PageManager(snapshots=True)
uuid = models.UUIDField(default=uuid.uuid4, editable=False, unique=True)
title = models.CharField(_('Title'), max_length=150)
slug = models.SlugField(_('Slug'))
sub_slug = models.CharField(
@ -262,12 +263,12 @@ class Page(models.Model):
return str(self.title)
def natural_key(self):
return (self.get_online_url(follow_redirection=False).strip('/'),)
return (str(self.uuid),)
@classmethod
@utils.cache_during_request
def get_page_ids_by_slugs(cls):
return {page.slug: page.pk for page in cls.objects.only('pk', 'slug')}
def get_page_ids_by_uuids(cls):
return {str(page.uuid): page.pk for page in cls.objects.only('pk', 'uuid')}
def picture_extension(self):
if not self.picture:
@ -584,13 +585,13 @@ class Page(models.Model):
if page is None:
qs_kwargs = {}
if snapshot:
qs_kwargs = {'snapshot': snapshot}
elif json_page['fields'].get('parent'):
qs_kwargs = {'parent__slug': json_page['fields']['parent'][0].split('/')[-1] or 'index'}
page, created = Page.objects.get_or_create(slug=json_page['fields']['slug'], **qs_kwargs)
qs_kwargs = {'snapshot': snapshot} # don't take uuid from snapshot: it has to be unique !
else:
qs_kwargs = {'uuid': json_page['fields']['uuid']}
page, created = Page.objects.get_or_create(**qs_kwargs)
json_page['pk'] = page.id
parent_slug = json_page['fields'].get('parent') or []
if parent_slug and not Page.objects.filter(slug=parent_slug[0].split('/')[-1] or 'index').exists():
parent_uuid = json_page['fields'].get('parent') or []
if parent_uuid and not Page.objects.filter(uuid=parent_uuid[0]).exists():
# parent not found, remove it and exclude page from navigation
json_page['fields'].pop('parent')
json_page['fields']['exclude_from_navigation'] = True
@ -602,8 +603,12 @@ class Page(models.Model):
)
% json_page['fields']['title'],
)
page_uuid = page.uuid
page = next(serializers.deserialize('json', json.dumps([json_page]), ignorenonexistent=True))
page.object.snapshot = snapshot
if snapshot:
# keep the generated uuid
page.object.uuid = page_uuid
page.save()
for cell in json_page.get('cells'):
cell['fields']['groups'] = [[x] for x in cell['fields']['groups'] if isinstance(x, str)]
@ -622,16 +627,6 @@ class Page(models.Model):
def load_serialized_cells(cls, cells):
# load new cells
for cell_data in cells:
# legacy card cells, for snapshots
if cell_data['model'] == 'wcs.wcscardinfoscell':
cell_data['model'] = 'wcs.wcscardcell'
elif cell_data['model'] == 'wcs.wcscardscell':
cell_data['model'] = 'wcs.wcscardcell'
cell_data['fields']['related_card_path'] = '__all__'
cell_data['fields']['display_mode'] = 'table'
cell_data['fields']['title_type'] = 'auto'
if cell_data['fields'].get('custom_title'):
cell_data['fields']['title_type'] = 'manual'

ici, le code déplacé dans une migration

ici, le code déplacé dans une migration
model = apps.get_model(cell_data['model'])
cell_data = model.prepare_serialized_data(cell_data)
cell = list(serializers.deserialize('json', json.dumps([cell_data]), ignorenonexistent=True))[0]
@ -651,13 +646,7 @@ class Page(models.Model):
for json_page in json_site:
# pre-create pages
parent = None
if json_page['fields'].get('parent'):
parent = json_page['fields']['parent'][0].split('/')[-1] or 'index'
page, created = Page.objects.get_or_create(
slug=json_page['fields']['slug'], parent__slug=parent
)
page, created = Page.objects.get_or_create(uuid=json_page['fields']['uuid'])
to_load.append((page, created, json_page))
# delete cells of already existing pages
@ -720,6 +709,8 @@ class Page(models.Model):
new_page.title = title or _('Copy of %s') % self.title
# reset slug
new_page.slug = None
# reset uuid
new_page.uuid = uuid.uuid4()
# reset snapshot
new_page.snapshot = None
# set order
@ -776,10 +767,12 @@ class PageSnapshot(models.Model):
def restore(self):
json_page = self.serialization
# keep current page uuid
json_page['fields']['uuid'] = str(self.page.uuid)
# keep current page order
json_page['fields']['order'] = self.page.order
# and current parent
json_page['fields']['parent'] = [self.page.parent.slug] if self.page.parent else None
json_page['fields']['parent'] = self.page.parent.natural_key() if self.page.parent else None
# and current exclude_from_navigation value
json_page['fields']['exclude_from_navigation'] = self.page.exclude_from_navigation
# restore snapshot
@ -1717,8 +1710,7 @@ class LinkCell(CellBase):
@classmethod
def prepare_serialized_data(cls, cell_data):
if cell_data['fields'].get('link_page'):
link_page_slug = cell_data['fields']['link_page'][0].strip('/').split('/')[-1]
if link_page_slug not in Page.get_page_ids_by_slugs():
if cell_data['fields']['link_page'][0] not in Page.get_page_ids_by_uuids():
del cell_data['fields']['link_page']
return cell_data

View File

@ -659,7 +659,7 @@ def snapshot_export(request, *args, **kwargs):
page = snapshot.get_page()
response = HttpResponse(content_type='application/json')
response['Content-Disposition'] = 'attachment; filename="export_page_{}_{}.json"'.format(
snapshot.get_page().slug, snapshot.timestamp.strftime('%Y%m%d')
page.slug, snapshot.timestamp.strftime('%Y%m%d')
)
json.dump(page.get_serialized_page(), response, indent=2)
return response

View File

@ -5,6 +5,7 @@ import shutil
import sys
import tarfile
import tempfile
import uuid
from io import BytesIO, StringIO
import pytest
@ -97,7 +98,7 @@ def test_import_export(app, some_data):
def test_import_export_with_parent(app, some_data):
output = get_output_of_command('export_site')
payload = json.loads(output)
payload['pages'][1]['fields']['parent'] = ['one']
payload['pages'][1]['fields']['parent'] = [str(Page.objects.get(slug='one').uuid)]
Page.objects.all().delete()
import_site(data=payload)
@ -111,7 +112,7 @@ def test_import_export_with_unknown_parent(app, some_data):
output = get_output_of_command('export_site')
payload = json.loads(output)
payload['pages'][0]['fields']['exclude_from_navigation'] = False
payload['pages'][0]['fields']['parent'] = ['unknown-parent']
payload['pages'][0]['fields']['parent'] = [str(uuid.uuid4())]
Page.objects.all().delete()
import_site(data=payload)

View File

@ -5,6 +5,7 @@ import os
import re
import shutil
import urllib.parse
import uuid
from io import BytesIO
from unittest import mock
@ -925,12 +926,16 @@ def test_export_page_snapshot(freezer, app, admin_user):
history = app.get('/manage/pages/%s/history' % page.id, status=200)
resp = history.click('export', index=0, verbose=True)
snapshot_page = Page.snapshots.latest('pk')
assert snapshot_page.uuid != page.uuid
assert resp.headers['content-type'] == 'application/json'
assert resp.headers['content-disposition'] == 'attachment; filename="export_page_one_20200717.json"'
assert resp.json['fields']['title'] == 'Updated Title'
resp = history.click('export', index=1, verbose=True)
snapshot_page = Page.snapshots.latest('pk')
assert snapshot_page.uuid != page.uuid
assert resp.headers['content-type'] == 'application/json'
assert resp.headers['content-disposition'] == 'attachment; filename="export_page_one_20200716.json"'
@ -1001,7 +1006,7 @@ def test_export_page_with_redirection(app, admin_user):
resp = app.get('/manage/pages/%s/' % page1.pk)
resp = resp.click('Export')
assert resp.json['pages'][0]['cells'][0]['fields']['link_page'] == [
'two'
str(page2.uuid)
] # and not http://www.example.net
@ -1041,7 +1046,7 @@ def test_site_export_import_json(app, admin_user):
resp.form['site_file'] = Upload('site-export.json', site_export, 'application/json')
with CaptureQueriesContext(connection) as ctx:
resp = resp.form.submit()
assert len(ctx.captured_queries) in [304, 305]
assert len(ctx.captured_queries) in [295, 296]
assert Page.objects.count() == 4
assert PageSnapshot.objects.all().count() == 4
@ -1052,7 +1057,7 @@ def test_site_export_import_json(app, admin_user):
resp.form['site_file'] = Upload('site-export.json', site_export, 'application/json')
with CaptureQueriesContext(connection) as ctx:
resp = resp.form.submit()
assert len(ctx.captured_queries) == 274
assert len(ctx.captured_queries) == 276
assert set(Page.objects.get(slug='one').related_cells['cell_types']) == {'data_textcell', 'data_linkcell'}
assert Page.objects.count() == 4
assert LinkCell.objects.count() == 2
@ -1172,7 +1177,7 @@ def test_site_export_import_unknown_parent(app, admin_user):
resp = resp.form.submit()
payload = json.loads(force_str(resp.body))
payload['pages'][0]['fields']['exclude_from_navigation'] = False
payload['pages'][0]['fields']['parent'] = ['unknown-parent']
payload['pages'][0]['fields']['parent'] = [str(uuid.uuid4())]
resp = app.get('/manage/')
resp = resp.click('Import Site')
@ -1193,13 +1198,15 @@ def test_site_export_import_unknown_page(app, admin_user):
resp = app.get('/manage/site-export')
resp = resp.form.submit()
payload = json.loads(force_str(resp.body))
payload['pages'][0]['cells'][0]['fields']['root_page'] = ['unknown-parent']
payload['pages'][0]['cells'][0]['fields']['root_page'] = [str(uuid.uuid4())]
Page.objects.all().delete()
resp = app.get('/manage/site-import')
resp.form['site_file'] = Upload('site-export.json', force_bytes(json.dumps(payload)), 'application/json')
resp = resp.form.submit()
assert resp.context['form'].errors['site_file'] == ['Unknown page "unknown-parent".']
assert resp.context['form'].errors['site_file'] == [
'Unknown page "%s".' % payload['pages'][0]['cells'][0]['fields']['root_page'][0]
]
assert Page.objects.count() == 0
resp = app.get('/manage/site-import')
@ -1307,6 +1314,7 @@ def test_duplicate_page(app, admin_user):
in resp.text
)
assert new_page.exclude_from_navigation is True
assert new_page.uuid != page.uuid
page.exclude_from_navigation = True
page.save()
@ -2470,9 +2478,6 @@ def test_page_discover_placeholder_with_error_cells(app, admin_user):
def test_page_versionning(app, admin_user):
Page.objects.all().delete()
PageSnapshot.objects.all()
page = Page(title='One', slug='one')
page.save()
@ -2517,8 +2522,10 @@ def test_page_versionning(app, admin_user):
with CaptureQueriesContext(connection) as ctx:
resp2 = resp.click('view', index=1)
assert len(ctx.captured_queries) == 70
assert len(ctx.captured_queries) == 72
assert Page.snapshots.latest('pk').related_cells == {'cell_types': ['data_textcell']}
snapshot_page = Page.snapshots.latest('pk')
assert snapshot_page.uuid != page.uuid
assert resp2.text.index('Hello world') < resp2.text.index('Foobar3')
resp2 = resp.click('view', index=0)
@ -2616,6 +2623,8 @@ def test_restore_page_attributes(app):
snapshot = PageSnapshot.objects.latest('pk')
# move page2, remove page 1
new_uuid = uuid.uuid4()
page2.uuid = new_uuid
page2.parent = page4
page2.order = 3
page2.save()
@ -2635,6 +2644,7 @@ def test_restore_page_attributes(app):
# check some attributes
assert page2.order == 3
assert page2.parent == page4
assert page2.uuid == new_uuid
assert page2.exclude_from_navigation is False
PageSnapshot.take(page2)

504
tests/test_migrations.py Normal file
View File

@ -0,0 +1,504 @@
import uuid
from django.db import connection
from django.db.migrations.executor import MigrationExecutor
def test_page_snapshot_with_old_card_cells_migration(transactional_db):
migrate_from = [('data', '0062_page_uuid')]
migrate_to = [('data', '0063_old_card_cells')]
executor = MigrationExecutor(connection)
old_apps = executor.loader.project_state(migrate_from).apps
executor.migrate(migrate_from)
pagesnapshot_class = old_apps.get_model('data', 'PageSnapshot')
snapshot = pagesnapshot_class.objects.create(
serialization={
'cells': [
{
'model': 'wcs.wcscardinfoscell',
'fields': {
'card_ids': '42,35',
'title_type': 'manual',
'custom_title': 'my-title',
'display_mode': 'card',
'without_user': True,
'custom_schema': {},
'only_for_user': True,
'carddef_reference': 'default:card_model_1',
'related_card_path': '',
'limit': 42,
'slug': 'my-card',
'order': 1,
'groups': [],
'public': True,
'condition': 'my-condition',
'placeholder': 'content',
'template_name': None,
'extra_css_class': '',
'last_update_timestamp': '2022-08-11T13:57:43.362Z',
'restricted_to_unlogged': False,
},
},
{
'model': 'wcs.wcscardscell',
'fields': {
'custom_title': 'my-other-title',
'without_user': False,
'only_for_user': False,
'carddef_reference': 'default:card_model_1',
'limit': 35,
'slug': 'my-other-card',
'order': 2,
'groups': [],
'public': True,
'condition': '',
'placeholder': 'content',
'template_name': None,
'extra_css_class': '',
'last_update_timestamp': '2022-08-12T07:19:18.541Z',
'restricted_to_unlogged': False,
},
},
],
}
)
executor = MigrationExecutor(connection)
executor.migrate(migrate_to)
executor.loader.build_graph()
apps = executor.loader.project_state(migrate_to).apps
pagesnapshot_class = apps.get_model('data', 'PageSnapshot')
snapshot = pagesnapshot_class.objects.get()
assert snapshot.serialization['cells'][0] == {
'model': 'wcs.wcscardcell',
'fields': {
'card_ids': '42,35',
'title_type': 'manual',
'custom_title': 'my-title',
'display_mode': 'card',
'without_user': True,
'custom_schema': {},
'only_for_user': True,
'carddef_reference': 'default:card_model_1',
'related_card_path': '',
'limit': 42,
'slug': 'my-card',
'order': 1,
'groups': [],
'public': True,
'condition': 'my-condition',
'placeholder': 'content',
'template_name': None,
'extra_css_class': '',
'last_update_timestamp': '2022-08-11T13:57:43.362Z',
'restricted_to_unlogged': False,
},
}
assert snapshot.serialization['cells'][1] == {
'model': 'wcs.wcscardcell',
'fields': {
'card_ids': '',
'title_type': 'manual',
'custom_title': 'my-other-title',
'display_mode': 'table',
'without_user': False,
'custom_schema': {},
'only_for_user': False,
'carddef_reference': 'default:card_model_1',
'related_card_path': '__all__',
'limit': 35,
'slug': 'my-other-card',
'order': 2,
'groups': [],
'public': True,
'condition': '',
'placeholder': 'content',
'template_name': None,
'extra_css_class': '',
'last_update_timestamp': '2022-08-12T07:19:18.541Z',
'restricted_to_unlogged': False,
},
}
def test_page_snapshot_uuids_migration(transactional_db):
migrate_from = [('data', '0063_old_card_cells')]
migrate_to = [('data', '0064_snapshot_uuids')]
executor = MigrationExecutor(connection)
old_apps = executor.loader.project_state(migrate_from).apps
executor.migrate(migrate_from)
page_class = old_apps.get_model('data', 'Page')
pagesnapshot_class = old_apps.get_model('data', 'PageSnapshot')
root = page_class.objects.create(order=0, slug='index')
page = page_class.objects.create(order=1, slug='slug')
old_uuid = uuid.uuid4()
link_cells = [
{
'model': 'data.linkcell',
'fields': {
'link_page': None,
},
},
{
'model': 'data.linkcell',
'fields': {
'link_page': [str(old_uuid)],
},
},
{
'model': 'data.linkcell',
'fields': {
'link_page': [str(page.uuid)], # will not change
},
},
{
'model': 'data.linkcell',
'fields': {
'link_page': ['unknown'],
},
},
{
'model': 'data.linkcell',
'fields': {
'link_page': ['slug'],
},
},
{
'model': 'data.linkcell',
'fields': {
'link_page': ['foo/bar/slug'],
},
},
]
snapshot1 = pagesnapshot_class.objects.create(
serialization={
'fields': {
'parent': None,
},
'cells': link_cells
+ [
{
'model': 'data.linklistcell',
'fields': {
'links': link_cells,
},
},
{
'model': 'search.searchcell',
'fields': {
'_search_services': {
'options': {
'foobar': {
'target_page': None,
}
}
}
},
},
{
'model': 'search.searchcell',
'fields': {
'_search_services': {
'options': {
'foobar': {
'target_page': str(old_uuid),
}
}
}
},
},
{
'model': 'search.searchcell',
'fields': {
'_search_services': {
'options': {
'foobar': {
'target_page': str(page.uuid), # will not change
}
}
}
},
},
{
'model': 'search.searchcell',
'fields': {
'_search_services': {
'options': {
'foobar': {
'target_page': 'unknown',
}
}
}
},
},
{
'model': 'search.searchcell',
'fields': {
'_search_services': {
'options': {
'foobar': {
'target_page': 'slug',
}
}
}
},
},
{
'model': 'search.searchcell',
'fields': {
'_search_services': {
'options': {
'foobar': {
'target_page': 'foo/bar/slug',
}
}
}
},
},
{
'model': 'wcs.wcscardcell',
'fields': {
'custom_schema': {
'cells': [
{
'page': None,
},
{
'page': str(old_uuid),
},
{
'page': str(page.uuid), # will not change
},
{
'page': 'unknown',
},
{
'page': 'slug',
},
{
'page': 'foo/bar/slug',
},
],
},
},
},
],
}
)
snapshot2 = pagesnapshot_class.objects.create(
serialization={
'fields': {
'parent': [str(old_uuid)],
}
}
)
snapshot3 = pagesnapshot_class.objects.create(
serialization={
'fields': {
'parent': [str(page.uuid)], # will not change
}
}
)
snapshot4 = pagesnapshot_class.objects.create(
serialization={
'fields': {
'parent': ['unknown'],
}
}
)
snapshot5 = pagesnapshot_class.objects.create(
serialization={
'fields': {
'parent': ['slug'],
}
}
)
snapshot6 = pagesnapshot_class.objects.create(
serialization={
'fields': {
'parent': ['foo/bar/slug'],
}
}
)
snapshot7 = pagesnapshot_class.objects.create(
serialization={
'fields': {
'parent': [''], # index !
}
}
)
executor = MigrationExecutor(connection)
executor.migrate(migrate_to)
executor.loader.build_graph()
apps = executor.loader.project_state(migrate_to).apps
pagesnapshot_class = apps.get_model('data', 'PageSnapshot')
snapshot1 = pagesnapshot_class.objects.get(pk=snapshot1.pk)
assert snapshot1.serialization['fields']['parent'] is None
new_link_cells = [
{
'model': 'data.linkcell',
'fields': {
'link_page': None,
},
},
{
'model': 'data.linkcell',
'fields': {
'link_page': None,
},
},
{
'model': 'data.linkcell',
'fields': {
'link_page': [str(page.uuid)],
},
},
{
'model': 'data.linkcell',
'fields': {
'link_page': None,
},
},
{
'model': 'data.linkcell',
'fields': {
'link_page': [str(page.uuid)],
},
},
{
'model': 'data.linkcell',
'fields': {
'link_page': [str(page.uuid)],
},
},
]
assert snapshot1.serialization['cells'] == new_link_cells + [
{
'model': 'data.linklistcell',
'fields': {
'links': new_link_cells,
},
},
{
'model': 'search.searchcell',
'fields': {
'_search_services': {
'options': {
'foobar': {
'target_page': None,
}
}
}
},
},
{
'model': 'search.searchcell',
'fields': {
'_search_services': {
'options': {
'foobar': {
'target_page': None,
}
}
}
},
},
{
'model': 'search.searchcell',
'fields': {
'_search_services': {
'options': {
'foobar': {
'target_page': str(page.uuid),
}
}
}
},
},
{
'model': 'search.searchcell',
'fields': {
'_search_services': {
'options': {
'foobar': {
'target_page': None,
}
}
}
},
},
{
'model': 'search.searchcell',
'fields': {
'_search_services': {
'options': {
'foobar': {
'target_page': str(page.uuid),
}
}
}
},
},
{
'model': 'search.searchcell',
'fields': {
'_search_services': {
'options': {
'foobar': {
'target_page': str(page.uuid),
}
}
}
},
},
{
'model': 'wcs.wcscardcell',
'fields': {
'custom_schema': {
'cells': [
{
'page': None,
},
{
'page': None,
},
{
'page': str(page.uuid),
},
{
'page': None,
},
{
'page': str(page.uuid),
},
{
'page': str(page.uuid),
},
],
},
},
},
]
snapshot2 = pagesnapshot_class.objects.get(pk=snapshot2.pk)
assert snapshot2.serialization['fields']['parent'] is None
snapshot3 = pagesnapshot_class.objects.get(pk=snapshot3.pk)
assert snapshot3.serialization['fields']['parent'] == [str(page.uuid)]
snapshot4 = pagesnapshot_class.objects.get(pk=snapshot4.pk)
assert snapshot4.serialization['fields']['parent'] is None
snapshot5 = pagesnapshot_class.objects.get(pk=snapshot5.pk)
assert snapshot5.serialization['fields']['parent'] == [str(page.uuid)]
snapshot6 = pagesnapshot_class.objects.get(pk=snapshot6.pk)
assert snapshot6.serialization['fields']['parent'] == [str(page.uuid)]
snapshot7 = pagesnapshot_class.objects.get(pk=snapshot7.pk)
assert snapshot7.serialization['fields']['parent'] == [str(root.uuid)]

View File

@ -2,6 +2,7 @@ import datetime
import json
import os
import sys
import uuid
from io import StringIO
from unittest import mock
@ -1689,12 +1690,9 @@ def test_test_export_import_search_cell_with_target_page():
site_export = get_output_of_command('export_site')
site_data = json.loads(site_export)
assert len(site_data['pages']) == 3
assert (
site_data['pages'][-1]['cells'][0]['fields']['_search_services']['options'][
'cards:c21f969b:card-bar'
]['target_page']
== 'root/card'
)
assert site_data['pages'][-1]['cells'][0]['fields']['_search_services']['options'][
'cards:c21f969b:card-bar'
]['target_page'] == str(card_page.uuid)
import_site(data={}, clean=True)
assert Page.objects.all().count() == 0
@ -1717,8 +1715,8 @@ def test_test_export_import_search_cell_with_target_page():
site_data['pages'][-1]['cells'][0]['fields']['_search_services']['options']['cards:c21f969b:card-bar'][
'target_page'
] = 'unknown'
] = str(uuid.uuid4())
import_site(data=site_data, clean=True)
new_card_page = Page.objects.get(slug='card')
new_cell = SearchCell.objects.get()
assert new_cell._search_services['options']['cards:c21f969b:card-bar']['target_page'] == ''
assert new_cell._search_services['options']['cards:c21f969b:card-bar']['target_page'] is None

View File

@ -17,7 +17,7 @@ from requests.models import Response
from combo.apps.wcs.forms import WcsCardCellDisplayForm
from combo.apps.wcs.models import WcsCardCell
from combo.data.models import Page, PageSnapshot, TextCell, ValidityInfo
from combo.data.models import Page, TextCell, ValidityInfo
from combo.data.utils import import_site
from tests.test_manager import login
from tests.utils import manager_submit_cell
@ -3264,90 +3264,6 @@ def test_card_cell_assets(mock_send, settings, app, admin_user):
assert 'Logo — %s' % cell2.get_label_for_asset() in resp.text
def test_page_snapshot_with_old_card_cells():
page = Page.objects.create(title='xxx', slug='test_snapshots', template_name='standard')
PageSnapshot.take(page)
snapshot = page.pagesnapshot_set.first()
snapshot.serialization['cells'].append(
{
'model': 'wcs.wcscardinfoscell',
'fields': {
'slug': 'my-card',
'limit': 42,
'order': 1,
'groups': [],
'public': True,
'card_ids': '42,35',
'condition': 'my-condition',
'title_type': 'manual',
'placeholder': 'content',
'custom_title': 'my-title',
'display_mode': 'card',
'without_user': True,
'custom_schema': {},
'only_for_user': True,
'template_name': None,
'extra_css_class': '',
'carddef_reference': 'default:card_model_1',
'related_card_path': '',
'last_update_timestamp': '2022-08-11T13:57:43.362Z',
'restricted_to_unlogged': False,
'page': page.pk,
},
}
)
snapshot.serialization['cells'].append(
{
'model': 'wcs.wcscardscell',
'fields': {
'slug': 'my-other-card',
'limit': 35,
'order': 2,
'groups': [],
'public': True,
'condition': '',
'placeholder': 'content',
'custom_title': 'my-other-title',
'without_user': False,
'only_for_user': False,
'template_name': None,
'extra_css_class': '',
'carddef_reference': 'default:card_model_1',
'last_update_timestamp': '2022-08-12T07:19:18.541Z',
'restricted_to_unlogged': False,
'page': page.pk,
},
}
)
old_page = snapshot.get_page()
cell1 = old_page.get_cells()[0]
assert isinstance(cell1, WcsCardCell)
assert cell1.slug == 'my-card'
assert cell1.limit == 42
assert cell1.card_ids == '42,35'
assert cell1.title_type == 'manual'
assert cell1.custom_title == 'my-title'
assert cell1.display_mode == 'card'
assert cell1.without_user is True
assert cell1.custom_schema == {}
assert cell1.only_for_user is True
assert cell1.carddef_reference == 'default:card_model_1'
assert cell1.related_card_path == ''
cell2 = old_page.get_cells()[1]
assert isinstance(cell1, WcsCardCell)
assert cell2.slug == 'my-other-card'
assert cell2.limit == 35
assert cell2.card_ids == ''
assert cell2.title_type == 'manual'
assert cell2.custom_title == 'my-other-title'
assert cell2.display_mode == 'table'
assert cell2.without_user is False
assert cell2.custom_schema == {}
assert cell2.only_for_user is False
assert cell2.carddef_reference == 'default:card_model_1'
assert cell2.related_card_path == '__all__'
def get_output_of_command(command, *args, **kwargs):
old_stdout = sys.stdout
output = sys.stdout = StringIO()
@ -3387,7 +3303,9 @@ def test_export_import_card_cell_with_page_link():
site_export = get_output_of_command('export_site')
site_data = json.loads(site_export)
assert len(site_data['pages']) == 3
assert site_data['pages'][-1]['cells'][0]['fields']['custom_schema']['cells'][0]['page'] == 'root/card'
assert site_data['pages'][-1]['cells'][0]['fields']['custom_schema']['cells'][0]['page'] == str(
card_page.uuid
)
import_site(data={}, clean=True)
assert Page.objects.all().count() == 0