misc: remove formdata/carddata pickle storage (#73930) #82
|
@ -361,6 +361,7 @@ def test_form_title_change(pub):
|
|||
formdef.name = 'form title'
|
||||
formdef.fields = []
|
||||
formdef.store()
|
||||
assert formdef.table_name == f'formdata_{formdef.id}_form_title'
|
||||
|
||||
app = login(get_app(pub))
|
||||
|
||||
|
@ -376,7 +377,7 @@ def test_form_title_change(pub):
|
|||
formdef = FormDef.get(formdef.id)
|
||||
assert formdef.name == 'new title'
|
||||
assert formdef.url_name == 'form-title'
|
||||
assert formdef.internal_identifier == 'new-title'
|
||||
assert formdef.table_name == f'formdata_{formdef.id}_form_title'
|
||||
|
||||
resp = app.get('/backoffice/forms/1/')
|
||||
resp = resp.click('change title')
|
||||
|
@ -1298,8 +1299,8 @@ def test_form_import(pub):
|
|||
assert FormDef.count() == 2
|
||||
assert FormDef.get(1).url_name == 'form-title'
|
||||
assert FormDef.get(2).url_name == 'form-title-1'
|
||||
assert FormDef.get(1).internal_identifier == 'form-title'
|
||||
assert FormDef.get(2).internal_identifier == 'form-title-1'
|
||||
assert FormDef.get(1).table_name == 'formdata_1_form_title'
|
||||
assert FormDef.get(2).table_name == 'formdata_2_form_title_1'
|
||||
|
||||
# import a formdef with an url name that doesn't match its title,
|
||||
# it should be kept intact.
|
||||
|
@ -1311,7 +1312,7 @@ def test_form_import(pub):
|
|||
resp.forms[0]['file'] = Upload('formdef.wcs', formdef_xml)
|
||||
resp = resp.forms[0].submit()
|
||||
assert FormDef.get(3).url_name == 'xxx-other-form-title'
|
||||
assert FormDef.get(3).internal_identifier == 'form-title-2'
|
||||
assert FormDef.get(3).table_name == 'formdata_3_xxx_other_form_title'
|
||||
|
||||
# import an invalid file
|
||||
resp = app.get('/backoffice/forms/')
|
||||
|
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
@ -1,181 +0,0 @@
|
|||
import os
|
||||
import random
|
||||
|
||||
import psycopg2
|
||||
import pytest
|
||||
from django.core.management import call_command, get_commands
|
||||
from django.core.management.base import CommandError
|
||||
from quixote import get_publisher
|
||||
|
||||
from wcs.carddef import CardDef
|
||||
from wcs.fields import BoolField
|
||||
from wcs.formdef import FormDef
|
||||
from wcs.roles import Role
|
||||
from wcs.sql import AnyFormData, cleanup_connection
|
||||
|
||||
from .utilities import clean_temporary_pub, create_temporary_pub, force_connections_close
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def formdeffix():
|
||||
for formdef in FormDef.select():
|
||||
formdef.remove_self()
|
||||
formdef = FormDef()
|
||||
formdef.id = 123
|
||||
formdef.name = 'testform'
|
||||
formdef.description = 'plop'
|
||||
formdef.fields = [BoolField(id='1')]
|
||||
formdef.store()
|
||||
|
||||
data_class = formdef.data_class(mode='files')
|
||||
for value in (True, True, True, False):
|
||||
formdata = data_class()
|
||||
formdata.data = {'1': value}
|
||||
formdata.store()
|
||||
|
||||
return formdef
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def carddeffix():
|
||||
for carddef in CardDef.select():
|
||||
carddef.remove_self()
|
||||
carddef = CardDef()
|
||||
carddef.id = 456
|
||||
carddef.name = 'testcard'
|
||||
carddef.description = 'plop'
|
||||
carddef.fields = [BoolField(id='1')]
|
||||
carddef.store()
|
||||
|
||||
data_class = carddef.data_class(mode='files')
|
||||
for value in (True, True, True, False):
|
||||
formdata = data_class()
|
||||
formdata.data = {'1': value}
|
||||
formdata.store()
|
||||
|
||||
return carddef
|
||||
|
||||
|
||||
@pytest.fixture(scope='module')
|
||||
def cursor():
|
||||
conn = psycopg2.connect(user=os.environ.get('USER'), dbname='postgres')
|
||||
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
cur = conn.cursor()
|
||||
yield cur
|
||||
cur.close()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def database(cursor):
|
||||
i = 0
|
||||
while True:
|
||||
dbname = 'wcstests%d' % random.randint(0, 100000)
|
||||
try:
|
||||
cursor.execute('CREATE DATABASE %s' % dbname)
|
||||
break
|
||||
except psycopg2.Error:
|
||||
if i < 5:
|
||||
i += 1
|
||||
continue
|
||||
raise
|
||||
yield dbname
|
||||
cleanup_connection()
|
||||
cursor.execute('DROP DATABASE %s' % dbname)
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def pub(request):
|
||||
pub = create_temporary_pub(pickle_mode=True)
|
||||
yield pub
|
||||
clean_temporary_pub()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def local_user():
|
||||
get_publisher().user_class.wipe()
|
||||
user = get_publisher().user_class()
|
||||
user.name = 'Jean Darmette'
|
||||
user.email = 'jean.darmette@triffouilis.fr'
|
||||
user.name_identifiers = ['0123456789']
|
||||
user.store()
|
||||
return user
|
||||
|
||||
|
||||
def test_command_exists():
|
||||
assert 'convert_to_sql' in get_commands()
|
||||
|
||||
|
||||
def test_unknown_publisher_fails(pub):
|
||||
with pytest.raises(CommandError) as excinfo:
|
||||
call_command('convert_to_sql', '-d', 'unknown.net', '--database', 'foobar')
|
||||
assert str(excinfo.value) == 'unknown tenant'
|
||||
|
||||
|
||||
def test_failing_connection(pub):
|
||||
with pytest.raises(psycopg2.OperationalError) as excinfo:
|
||||
call_command('convert_to_sql', '-d', 'example.net', '--database', 'foobar', '--port', '666')
|
||||
assert 'could not connect' in str(excinfo.value) or 'connection to server on socket' in str(excinfo.value)
|
||||
|
||||
|
||||
def test_database_does_not_exist(pub):
|
||||
new_database = 'test_%s' % random.randint(1000, 9999)
|
||||
with pytest.raises(psycopg2.OperationalError) as excinfo:
|
||||
call_command('convert_to_sql', '-d', 'example.net', '--database', new_database)
|
||||
assert 'exist' in str(excinfo.value) # works for english + french postgresql
|
||||
|
||||
|
||||
def test_already_migrated_fails():
|
||||
create_temporary_pub()
|
||||
with pytest.raises(CommandError) as excinfo:
|
||||
call_command('convert_to_sql', '-d', 'example.net', '--database', 'foobar')
|
||||
assert str(excinfo.value) == 'tenant already using postgresql'
|
||||
cleanup_connection()
|
||||
force_connections_close()
|
||||
clean_temporary_pub()
|
||||
|
||||
|
||||
def test_setup_database(pub, database):
|
||||
call_command('convert_to_sql', '-d', 'example.net', '--database', database)
|
||||
pub.set_config()
|
||||
assert pub.cfg['postgresql'].get('database') == database
|
||||
|
||||
|
||||
def test_migration(pub, database):
|
||||
assert 'postgresql' not in pub.cfg
|
||||
call_command('convert_to_sql', '-d', 'example.net', '--database', database)
|
||||
pub.set_config()
|
||||
assert 'postgresql' in pub.cfg
|
||||
|
||||
|
||||
def test_data_is_migrated(pub, database, local_user, formdeffix, carddeffix):
|
||||
call_command('convert_to_sql', '-d', 'example.net', '--database', database)
|
||||
pub.set_config()
|
||||
formdefs = FormDef.select()
|
||||
assert len(formdefs) == 1
|
||||
data_class = formdefs[0].data_class(mode='sql')
|
||||
assert len(data_class.keys()) == 4
|
||||
carddefs = CardDef.select()
|
||||
assert len(carddefs) == 1
|
||||
data_class = carddefs[0].data_class(mode='sql')
|
||||
assert len(data_class.keys()) == 4
|
||||
|
||||
# check global table is created
|
||||
objects = AnyFormData.select()
|
||||
assert len(objects) == 4
|
||||
|
||||
# check triggers were created
|
||||
formdata = formdefs[0].data_class(mode='sql')()
|
||||
formdata.just_created()
|
||||
formdata.store()
|
||||
|
||||
objects = AnyFormData.select()
|
||||
assert len(objects) == 5
|
||||
|
||||
|
||||
def test_users_and_roles(pub, database, local_user):
|
||||
role = Role(name='Test Role')
|
||||
role.store()
|
||||
|
||||
call_command('convert_to_sql', '-d', 'example.net', '--database', database)
|
||||
assert len(pub.user_class.get_users_with_name_identifier('0123456789')) == 1
|
||||
assert pub.role_class.count() == 1
|
|
@ -115,20 +115,20 @@ def test_title_change(pub):
|
|||
formdef.store()
|
||||
assert FormDef.get(formdef.id).name == 'foo'
|
||||
assert FormDef.get(formdef.id).url_name == 'foo'
|
||||
assert FormDef.get(formdef.id).internal_identifier == 'foo'
|
||||
assert FormDef.get(formdef.id).table_name == f'formdata_{formdef.id}_foo'
|
||||
|
||||
# makes sure the table_name never changes
|
||||
formdef.name = 'bar'
|
||||
formdef.store()
|
||||
assert FormDef.get(formdef.id).name == 'bar'
|
||||
assert FormDef.get(formdef.id).url_name == 'foo'
|
||||
assert FormDef.get(formdef.id).internal_identifier == 'bar'
|
||||
assert FormDef.get(formdef.id).table_name == f'formdata_{formdef.id}_foo' # didn't change
|
||||
|
||||
# makes sure the internal_name doesn't change if there are submitted forms
|
||||
formdef.data_class()().store()
|
||||
formdef.name = 'baz'
|
||||
formdef.store()
|
||||
assert FormDef.get(formdef.id).name == 'baz'
|
||||
assert FormDef.get(formdef.id).internal_identifier == 'bar' # didn't change
|
||||
assert FormDef.get(formdef.id).table_name == f'formdata_{formdef.id}_foo' # didn't change
|
||||
|
||||
|
||||
def test_overlong_slug(pub):
|
||||
|
@ -213,26 +213,6 @@ def test_substitution_variables_object(pub):
|
|||
assert substs.formdef is formdef
|
||||
|
||||
|
||||
def test_internal_identifier_migration(pub):
|
||||
FormDef.wipe()
|
||||
formdef = FormDef()
|
||||
formdef.name = 'foo'
|
||||
formdef.fields = []
|
||||
formdef.store()
|
||||
|
||||
with open(formdef.get_object_filename(), 'rb') as fd:
|
||||
obj = pickle.load(fd)
|
||||
del obj.internal_identifier
|
||||
with open(formdef.get_object_filename(), 'wb') as fd:
|
||||
pickle.dump(obj, fd)
|
||||
with open(formdef.get_object_filename(), 'rb') as fd:
|
||||
assert pickle.load(fd).internal_identifier is None
|
||||
assert FormDef.get(formdef.id, ignore_migration=True).internal_identifier is None
|
||||
|
||||
formdef = FormDef.get(formdef.id)
|
||||
assert formdef.internal_identifier == 'foo'
|
||||
|
||||
|
||||
def test_unused_file_removal_job(pub):
|
||||
from wcs.formdef import clean_unused_files
|
||||
|
||||
|
|
|
@ -8,8 +8,6 @@ import pytest
|
|||
from quixote import cleanup
|
||||
|
||||
import wcs.qommon.storage as st
|
||||
from wcs.carddef import CardDef
|
||||
from wcs.formdef import FormDef
|
||||
from wcs.qommon.storage import StorableObject, cache_umask
|
||||
|
||||
from .utilities import create_temporary_pub
|
||||
|
@ -489,30 +487,3 @@ def test_umask():
|
|||
cache_umask()
|
||||
test.store()
|
||||
assert (os.stat(test.get_object_filename()).st_mode % 0o1000) == 0o664
|
||||
|
||||
|
||||
def test_load_old_pickle():
|
||||
# check picklized old data (name without '_wcs_' prefix)
|
||||
# formdata
|
||||
shutil.copytree(
|
||||
os.path.join(os.path.dirname(__file__), 'data', 'oldpickledata', 'formdefs'),
|
||||
os.path.join(pub.app_dir, 'formdefs'),
|
||||
)
|
||||
shutil.copytree(
|
||||
os.path.join(os.path.dirname(__file__), 'data', 'oldpickledata', 'form-foo'),
|
||||
os.path.join(pub.app_dir, 'form-foo'),
|
||||
)
|
||||
formdef = FormDef.select()[0]
|
||||
formdef.data_class(mode='files').select()
|
||||
|
||||
# carddata
|
||||
shutil.copytree(
|
||||
os.path.join(os.path.dirname(__file__), 'data', 'oldpickledata', 'carddefs'),
|
||||
os.path.join(pub.app_dir, 'carddefs'),
|
||||
)
|
||||
shutil.copytree(
|
||||
os.path.join(os.path.dirname(__file__), 'data', 'oldpickledata', 'card-bar'),
|
||||
os.path.join(pub.app_dir, 'card-bar'),
|
||||
)
|
||||
carddef = CardDef.select()[0]
|
||||
carddef.data_class(mode='files').select()
|
||||
|
|
|
@ -1412,9 +1412,8 @@ class FormDefPage(Directory):
|
|||
self.formdef.fields = [x for x in self.formdef.fields if x.id not in incompatible_field_ids]
|
||||
self.formdef.store(comment=_('Overwritten (removal of incompatible fields)'))
|
||||
|
||||
# keep current formdef id, url_name, internal identifier and sql table name
|
||||
# keep current formdef id, url_name, and sql table name
|
||||
new_formdef.id = self.formdef.id
|
||||
new_formdef.internal_identifier = self.formdef.internal_identifier
|
||||
new_formdef.url_name = self.formdef.url_name
|
||||
new_formdef.table_name = self.formdef.table_name
|
||||
# keep currently assigned category and workflow
|
||||
|
@ -1905,7 +1904,6 @@ class FormsDirectory(AccessControlled, Directory):
|
|||
raise ValueError()
|
||||
|
||||
self.imported_formdef = formdef
|
||||
formdef.internal_identifier = None # a new one will be set in .store()
|
||||
formdef.disabled = True
|
||||
formdef.store()
|
||||
return redirect('%s/' % formdef.id)
|
||||
|
|
|
@ -296,7 +296,6 @@ class BundleImportJob(AfterJob):
|
|||
|
||||
if element['type'] in ('forms', 'cards'):
|
||||
# keep internal references
|
||||
new_object.internal_identifier = existing_object.internal_identifier
|
||||
new_object.table_name = existing_object.table_name
|
||||
|
||||
if element['type'] in ('forms', 'cards') and not existing_object.name.startswith('[pre-import]'):
|
||||
|
|
|
@ -22,7 +22,6 @@ from subprocess import PIPE, Popen
|
|||
from django.utils.encoding import force_bytes
|
||||
from quixote import get_publisher
|
||||
|
||||
from wcs.carddata import CardData
|
||||
from wcs.categories import CardDefCategory
|
||||
from wcs.formdef import FormDef, FormDefDoesNotExist, get_formdefs_of_all_kinds
|
||||
|
||||
|
@ -62,19 +61,12 @@ class CardDef(FormDef):
|
|||
# carddef
|
||||
if data_class._formdef is self:
|
||||
return data_class
|
||||
if (not mode == 'files') or mode == 'sql':
|
||||
from . import sql
|
||||
from . import sql
|
||||
|
||||
table_name = sql.get_formdef_table_name(self)
|
||||
cls = types.ClassType(
|
||||
self.data_class_name, (sql.SqlCardData,), {'_formdef': self, '_table_name': table_name}
|
||||
)
|
||||
else:
|
||||
cls = types.ClassType(
|
||||
self.data_class_name,
|
||||
(CardData,),
|
||||
{'_names': 'card-%s' % self.internal_identifier, '_formdef': self},
|
||||
)
|
||||
table_name = sql.get_formdef_table_name(self)
|
||||
cls = types.ClassType(
|
||||
self.data_class_name, (sql.SqlCardData,), {'_formdef': self, '_table_name': table_name}
|
||||
)
|
||||
setattr(sys.modules['carddef'], self.data_class_name, cls)
|
||||
setattr(sys.modules['wcs.carddef'], self.data_class_name, cls)
|
||||
|
||||
|
|
|
@ -1,174 +0,0 @@
|
|||
# w.c.s. - web application for online forms
|
||||
# Copyright (C) 2005-2018 Entr'ouvert
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation; either version 2 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import sys
|
||||
import traceback
|
||||
|
||||
import psycopg2
|
||||
from django.core.management.base import BaseCommand, CommandError
|
||||
|
||||
from wcs import sql
|
||||
from wcs.carddef import CardDef
|
||||
from wcs.formdef import FormDef
|
||||
from wcs.qommon.misc import localstrftime
|
||||
from wcs.qommon.publisher import UnknownTenantError, get_publisher_class
|
||||
from wcs.roles import Role
|
||||
from wcs.users import User
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
|
||||
help = 'Setup postgresql connection parameters and migrate existing objects.'
|
||||
|
||||
def add_arguments(self, parser):
|
||||
parser.add_argument('-d', '--domain', required=True)
|
||||
parser.add_argument('--database', required=True)
|
||||
parser.add_argument('--host')
|
||||
parser.add_argument('--port', type=int)
|
||||
parser.add_argument('--user')
|
||||
parser.add_argument('--password')
|
||||
|
||||
def handle(self, **options):
|
||||
self.publisher = self.get_publisher(options['domain'])
|
||||
if self.publisher.has_postgresql_config():
|
||||
raise CommandError('tenant already using postgresql')
|
||||
|
||||
self.setup_connection(**options)
|
||||
sql.get_connection(new=True)
|
||||
|
||||
# create early tables
|
||||
sql.do_snapshots_table()
|
||||
sql.do_custom_views_table()
|
||||
|
||||
self.store_roles()
|
||||
self.store_users()
|
||||
self.store_forms()
|
||||
self.publisher.write_cfg()
|
||||
self.publisher.initialize_sql()
|
||||
self.publisher.cleanup()
|
||||
|
||||
def get_publisher(self, domain):
|
||||
publisher_class = get_publisher_class()
|
||||
publisher = publisher_class.create_publisher()
|
||||
try:
|
||||
publisher.set_tenant_by_hostname(domain)
|
||||
except UnknownTenantError:
|
||||
raise CommandError('unknown tenant')
|
||||
return publisher
|
||||
|
||||
def setup_connection(self, **kwargs):
|
||||
options = {}
|
||||
for k in ['host', 'port', 'database', 'user', 'password']:
|
||||
if k in kwargs:
|
||||
options[k] = kwargs.get(k)
|
||||
self.publisher.cfg['postgresql'] = options
|
||||
|
||||
def store_users(self):
|
||||
self.convert_objects('user', User, sql.SqlUser, skip_global_forms_table_update=True)
|
||||
sql.SqlUser.fix_sequences()
|
||||
|
||||
def store_roles(self):
|
||||
self.convert_objects('role', Role, sql.Role)
|
||||
|
||||
def convert_objects(self, object_name, object_class, object_sql_class, **kwargs):
|
||||
errors = []
|
||||
print('converting %ss' % object_name)
|
||||
getattr(sql, 'do_%s_table' % object_name)()
|
||||
count = object_class.count()
|
||||
for i, obj_id in enumerate(object_class.keys()):
|
||||
obj = object_class.get(obj_id)
|
||||
obj.__class__ = object_sql_class
|
||||
try:
|
||||
obj.store(**kwargs)
|
||||
except AssertionError:
|
||||
errors.append((obj, traceback.format_exc()))
|
||||
self.update_progress(100 * i / count)
|
||||
|
||||
if errors:
|
||||
with open('error_%s.log' % object_name, 'w') as error_log:
|
||||
for obj, trace in errors:
|
||||
error_log.write('obj_id %s\n' % obj.id)
|
||||
error_log.write(trace)
|
||||
error_log.write('-' * 80)
|
||||
error_log.write('\n\n')
|
||||
print('There were some errors, see error_%s.log for details.' % object_name)
|
||||
|
||||
def store_forms(self):
|
||||
errors = []
|
||||
for formdef in FormDef.select(ignore_migration=True) + CardDef.select(ignore_migration=True):
|
||||
print('converting %s' % formdef.name)
|
||||
sql.do_formdef_tables(formdef, rebuild_views=True, rebuild_global_views=True)
|
||||
if hasattr(sys.modules[formdef.pickle_module_name], formdef.data_class_name):
|
||||
delattr(sys.modules[formdef.pickle_module_name], formdef.data_class_name)
|
||||
data_class = formdef.data_class(mode='files')
|
||||
|
||||
if formdef.url_name != formdef.internal_identifier:
|
||||
# handle maybe different name
|
||||
setattr(
|
||||
sys.modules[formdef.pickle_module_name],
|
||||
'_wcs_%s' % formdef.internal_identifier.title(),
|
||||
getattr(sys.modules['formdef'], formdef.data_class_name),
|
||||
)
|
||||
|
||||
count = data_class.count()
|
||||
|
||||
# load all objects a first time, to allow the migrate() code to be
|
||||
# run and the eventual changes properly saved.
|
||||
for id in data_class.keys():
|
||||
try:
|
||||
formdata = data_class.get(id)
|
||||
except AttributeError:
|
||||
continue
|
||||
delattr(sys.modules[formdef.pickle_module_name], formdef.data_class_name)
|
||||
|
||||
# once this is done, reload and store everything in postgresql
|
||||
sql_data_class = formdef.data_class(mode='sql')
|
||||
for i, id in enumerate(data_class.keys()):
|
||||
try:
|
||||
formdata = data_class.get(id)
|
||||
except AttributeError:
|
||||
errors.append((formdata, traceback.format_exc()))
|
||||
continue
|
||||
formdata._formdef = formdef
|
||||
formdata._evolution = formdata.evolution
|
||||
formdata.__class__ = sql_data_class
|
||||
try:
|
||||
formdata.store()
|
||||
except (AssertionError, psycopg2.DataError):
|
||||
errors.append((formdata, traceback.format_exc()))
|
||||
self.update_progress(100 * i / count)
|
||||
sql_data_class.fix_sequences()
|
||||
|
||||
if formdef.migrate():
|
||||
# run formdef migration as we're done and table name can be stored etc.
|
||||
formdef.store()
|
||||
|
||||
if errors:
|
||||
with open('error_formdata.log', 'w') as error_log:
|
||||
for formdata, trace in errors:
|
||||
error_log.write(
|
||||
'%s %s - %s\n' % (formdata.formdef, formdata.id, localstrftime(formdata.receipt_time))
|
||||
)
|
||||
error_log.write(trace)
|
||||
error_log.write('-' * 80)
|
||||
error_log.write('\n\n')
|
||||
print('There were some errors, see error_formdata.log.')
|
||||
|
||||
def update_progress(self, progress, num_columns=120):
|
||||
sys.stdout.write(
|
||||
'[%s] %s%%\r'
|
||||
% (('#' * int((num_columns - 10) * progress / 100)).ljust(num_columns - 15), progress)
|
||||
)
|
|
@ -37,7 +37,6 @@ from quixote.http_request import Upload
|
|||
|
||||
from . import data_sources, fields
|
||||
from .categories import Category
|
||||
from .formdata import FormData
|
||||
from .qommon import PICKLE_KWARGS, _, force_str, get_cfg, pgettext_lazy
|
||||
from .qommon.admin.emails import EmailsDirectory
|
||||
from .qommon.afterjobs import AfterJob
|
||||
|
@ -134,7 +133,6 @@ class FormDef(StorableObject):
|
|||
description = None
|
||||
keywords = None
|
||||
url_name = None
|
||||
internal_identifier = None # used to have a stable pickle object class name
|
||||
table_name = None # for SQL only
|
||||
fields = None
|
||||
category_id = None
|
||||
|
@ -186,7 +184,6 @@ class FormDef(StorableObject):
|
|||
'keywords',
|
||||
'publication_date',
|
||||
'expiration_date',
|
||||
'internal_identifier',
|
||||
'disabled_redirection',
|
||||
'appearance_keywords',
|
||||
'lateral_template',
|
||||
|
@ -252,10 +249,6 @@ class FormDef(StorableObject):
|
|||
changed = True
|
||||
break
|
||||
|
||||
if not self.internal_identifier:
|
||||
self.internal_identifier = self.url_name
|
||||
changed = True
|
||||
|
||||
for f in self.fields or []:
|
||||
changed |= f.migrate()
|
||||
|
||||
|
@ -288,19 +281,13 @@ class FormDef(StorableObject):
|
|||
# formdef
|
||||
if data_class._formdef is self:
|
||||
return data_class
|
||||
if (not mode == 'files') or mode == 'sql':
|
||||
from . import sql
|
||||
|
||||
table_name = sql.get_formdef_table_name(self)
|
||||
cls = types.ClassType(
|
||||
self.data_class_name, (sql.SqlFormData,), {'_formdef': self, '_table_name': table_name}
|
||||
)
|
||||
else:
|
||||
cls = types.ClassType(
|
||||
self.data_class_name,
|
||||
(FormData,),
|
||||
{'_names': 'form-%s' % self.internal_identifier, '_formdef': self},
|
||||
)
|
||||
from . import sql
|
||||
|
||||
table_name = sql.get_formdef_table_name(self)
|
||||
cls = types.ClassType(
|
||||
self.data_class_name, (sql.SqlFormData,), {'_formdef': self, '_table_name': table_name}
|
||||
)
|
||||
setattr(sys.modules['formdef'], self.data_class_name, cls)
|
||||
setattr(sys.modules['wcs.formdef'], self.data_class_name, cls)
|
||||
|
||||
|
@ -329,21 +316,6 @@ class FormDef(StorableObject):
|
|||
new_url_name = '%s-%s' % (base_new_url_name, suffix_no)
|
||||
return new_url_name
|
||||
|
||||
def get_new_internal_identifier(self):
|
||||
new_internal_identifier = simplify(self.name)
|
||||
base_new_internal_identifier = new_internal_identifier
|
||||
suffix_no = 0
|
||||
while True:
|
||||
try:
|
||||
formdef = self.get_by_urlname(new_internal_identifier, ignore_migration=True)
|
||||
except KeyError:
|
||||
break
|
||||
if formdef.id == self.id:
|
||||
break
|
||||
suffix_no += 1
|
||||
new_internal_identifier = '%s-%s' % (base_new_internal_identifier, suffix_no)
|
||||
return new_internal_identifier
|
||||
|
||||
@classmethod
|
||||
def get_new_id(cls, create=False):
|
||||
id = super().get_new_id(create=False)
|
||||
|
@ -422,18 +394,6 @@ class FormDef(StorableObject):
|
|||
|
||||
object_only = kwargs.pop('object_only', False)
|
||||
|
||||
new_internal_identifier = self.get_new_internal_identifier()
|
||||
if not self.internal_identifier:
|
||||
self.internal_identifier = new_internal_identifier
|
||||
if new_internal_identifier != self.internal_identifier:
|
||||
# title changed, internal identifier will be changed only if
|
||||
# the formdef is currently being imported (self.id is None)
|
||||
# or if there are not yet any submitted forms (or if site
|
||||
# is using the SQL storage as internal identifier is not used
|
||||
# in that mode.
|
||||
if self.id is None or not self.data_class().exists():
|
||||
self.internal_identifier = new_internal_identifier
|
||||
|
||||
if not object_only:
|
||||
self.update_relations()
|
||||
|
||||
|
|
Loading…
Reference in New Issue