lingo/lingo/invoicing/models.py

451 lines
16 KiB
Python

# lingo - payment and billing system
# Copyright (C) 2022 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import copy
import sys
import traceback
from django.contrib.auth.models import Group
from django.contrib.postgres.fields import JSONField
from django.core.serializers.json import DjangoJSONEncoder
from django.db import connection, models, transaction
from django.utils.formats import date_format
from django.utils.text import slugify
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
from lingo.agendas.chrono import ChronoError
from lingo.utils.misc import generate_slug
class RegieImportError(Exception):
pass
class RegieNotConfigured(Exception):
def __init__(self, msg):
self.msg = msg
class PoolPromotionError(Exception):
def __init__(self, msg):
self.msg = msg
class Regie(models.Model):
label = models.CharField(_('Label'), max_length=150)
slug = models.SlugField(_('Identifier'), max_length=160, unique=True)
description = models.TextField(
_('Description'), null=True, blank=True, help_text=_('Optional regie description.')
)
cashier_role = models.ForeignKey(
Group,
blank=True,
null=True,
default=None,
related_name='+',
verbose_name=_('Cashier Role'),
on_delete=models.SET_NULL,
)
counter_name = models.CharField(
_('Counter name'),
default='{yy}',
max_length=50,
)
number_format = models.CharField(
_('Number format'),
default='F{regie_id:02d}-{yy}-{mm}-{number:07d}',
max_length=100,
)
class Meta:
ordering = ['label']
def __str__(self):
return self.label
def save(self, *args, **kwargs):
if not self.slug:
self.slug = generate_slug(self)
super().save(*args, **kwargs)
@property
def base_slug(self):
return slugify(self.label)
def export_json(self):
return {
'label': self.label,
'slug': self.slug,
'description': self.description,
'permissions': {
'cashier': self.cashier_role.name if self.cashier_role else None,
},
}
@classmethod
def import_json(cls, data):
data = copy.deepcopy(data)
permissions = data.pop('permissions') or {}
role_name = permissions.get('cashier')
if role_name:
try:
data['cashier_role'] = Group.objects.get(name=role_name)
except Group.DoesNotExists:
raise RegieImportError('Missing role: %s' % role_name)
except Group.MultipleObjectsReturned:
raise RegieImportError('Multiple role exist with the name: %s' % role_name)
regie, created = cls.objects.update_or_create(slug=data['slug'], defaults=data)
return created, regie
def get_counter_name(self, invoice_date):
return self.counter_name.format(
yyyy=invoice_date.strftime('%Y'),
yy=invoice_date.strftime('%y'),
mm=invoice_date.strftime('%m'),
)
def format_number(self, invoice_date, invoice_number):
return self.number_format.format(
yyyy=invoice_date.strftime('%Y'),
yy=invoice_date.strftime('%y'),
mm=invoice_date.strftime('%m'),
number=invoice_number,
regie_id=self.pk,
)
class Campaign(models.Model):
date_start = models.DateField(_('Start date'))
date_end = models.DateField(_('End date'))
date_issue = models.DateField(_('Issue date'))
injected_lines = models.CharField(
_('Integrate injected lines'),
choices=[
('no', _('No')),
('period', _('Yes, only for the period')),
('all', _('Yes, all injected lines before the end of the period')),
],
default='no',
max_length=10,
)
def __str__(self):
return _('From %(start)s to %(end)s') % {
'start': date_format(self.date_start, 'd/m/Y'),
'end': date_format(self.date_end, 'd/m/Y'),
}
def generate(self, spool=True):
pool = self.pool_set.create(draft=True)
if spool and 'uwsgi' in sys.modules:
from lingo.invoicing.spooler import generate_invoices
tenant = getattr(connection, 'tenant', None)
transaction.on_commit(
lambda: generate_invoices.spool(
campaign_id=str(self.pk), pool_id=str(pool.pk), domain=getattr(tenant, 'domain_url', None)
)
)
return
pool.generate_invoices()
class Pool(models.Model):
campaign = models.ForeignKey(Campaign, on_delete=models.PROTECT)
draft = models.BooleanField(default=True)
created_at = models.DateTimeField(auto_now_add=True)
completed_at = models.DateTimeField(null=True)
status = models.CharField(
choices=[
('registered', _('Registered')),
('running', _('Running')),
('failed', _('Failed')),
('completed', _('Completed')),
],
default='registered',
max_length=100,
)
exception = models.TextField()
@property
def is_last(self):
return not self.campaign.pool_set.filter(created_at__gt=self.created_at).exists()
def generate_invoices(self):
from lingo.invoicing import utils
self.status = 'running'
self.save()
try:
# get agendas with pricing corresponding to the period
agendas = utils.get_agendas(pool=self)
# get subscribed users for each agenda, for the period
users = utils.get_users_from_subscriptions(agendas=agendas, pool=self)
# get invoice lines for all subscribed users, for each agenda in the corresponding period
lines = utils.get_all_invoice_lines(agendas=agendas, users=users, pool=self)
# and generate invoices
utils.generate_invoices_from_lines(agendas=agendas, all_lines=lines, pool=self)
except (RegieNotConfigured, ChronoError) as e:
self.status = 'failed'
self.exception = e.msg
except Exception:
self.status = 'failed'
self.exception = traceback.format_exc()
raise
finally:
if self.status == 'running':
self.status = 'completed'
self.completed_at = now()
self.save()
def promote(self):
if not self.is_last:
# not the last
raise PoolPromotionError('Pool too old')
if not self.draft:
# not a draft
raise PoolPromotionError('Pool is final')
if self.status != 'completed':
# not completed
raise PoolPromotionError('Pool is not completed')
final_pool = copy.deepcopy(self)
final_pool.pk = None
final_pool.draft = False
final_pool.status = 'registered'
final_pool.completed_at = None
final_pool.save()
if 'uwsgi' in sys.modules:
from lingo.invoicing.spooler import populate_from_draft
tenant = getattr(connection, 'tenant', None)
transaction.on_commit(
lambda: populate_from_draft.spool(
draft_pool_id=str(self.pk),
final_pool_id=str(final_pool.pk),
domain=getattr(tenant, 'domain_url', None),
)
)
return
final_pool.populate_from_draft(self)
def populate_from_draft(self, draft_pool):
try:
self.status = 'running'
self.save()
for invoice in draft_pool.draftinvoice_set.all().order_by('pk'):
final_invoice = copy.deepcopy(invoice)
final_invoice.__class__ = Invoice
final_invoice.pk = None
final_invoice.pool = self
final_invoice.set_number()
final_invoice.save()
for line in invoice.lines.all().order_by('pk'):
final_line = copy.deepcopy(line)
final_line.__class__ = InvoiceLine
final_line.pk = None
final_line.invoice = final_invoice
final_line.pool = self
final_line.error_status = ''
final_line.save()
for line in draft_pool.draftinvoiceline_set.filter(invoice__isnull=True).order_by('pk'):
final_line = copy.deepcopy(line)
final_line.__class__ = InvoiceLine
final_line.pk = None
final_line.error_status = ''
final_line.pool = self
final_line.save()
except Exception:
self.status = 'failed'
self.exception = traceback.format_exc()
raise
finally:
if self.status == 'running':
self.status = 'completed'
self.completed_at = now()
self.save()
class AbstractInvoice(models.Model):
label = models.CharField(_('Label'), max_length=300)
total_amount = models.DecimalField(max_digits=9, decimal_places=2, default=0)
date_issue = models.DateField(_('Issue date'))
regie = models.ForeignKey(Regie, on_delete=models.PROTECT)
payer = models.CharField(_('Payer'), max_length=300)
created_at = models.DateTimeField(auto_now_add=True)
pool = models.ForeignKey(Pool, on_delete=models.PROTECT)
class Meta:
abstract = True
class DraftInvoice(AbstractInvoice):
pass
class Counter(models.Model):
regie = models.ForeignKey(Regie, on_delete=models.PROTECT)
name = models.CharField(max_length=128)
value = models.PositiveIntegerField(default=0)
class Meta:
unique_together = (('regie', 'name'),)
@classmethod
def get_count(cls, regie, name):
queryset = cls.objects.select_for_update()
with transaction.atomic():
counter, dummy = queryset.get_or_create(regie=regie, name=name)
counter.value += 1
counter.save()
return counter.value
class Invoice(AbstractInvoice):
number = models.PositiveIntegerField(default=0)
formatted_number = models.CharField(max_length=200)
def set_number(self):
self.number = Counter.get_count(
regie=self.regie,
name=self.regie.get_counter_name(self.created_at),
)
self.formatted_number = self.regie.format_number(self.created_at, self.number)
class InjectedLine(models.Model):
event_date = models.DateField()
slug = models.SlugField(max_length=250)
label = models.CharField(max_length=260)
quantity = models.FloatField()
unit_amount = models.DecimalField(max_digits=9, decimal_places=2)
total_amount = models.DecimalField(max_digits=9, decimal_places=2)
user_external_id = models.CharField(max_length=250)
payer_external_id = models.CharField(max_length=250)
regie = models.ForeignKey(Regie, on_delete=models.PROTECT)
class AbstractInvoiceLine(models.Model):
event_date = models.DateField()
slug = models.SlugField(max_length=250)
label = models.CharField(max_length=260)
quantity = models.FloatField()
unit_amount = models.DecimalField(max_digits=9, decimal_places=2)
total_amount = models.DecimalField(max_digits=9, decimal_places=2)
user_external_id = models.CharField(max_length=250)
user_name = models.CharField(max_length=250)
payer_external_id = models.CharField(max_length=250)
event = JSONField(default=dict)
pricing_data = JSONField(default=dict, encoder=DjangoJSONEncoder)
status = models.CharField(
max_length=10,
choices=[
('success', _('Success')),
('warning', _('Warning')),
('error', _('Error')),
],
)
pool = models.ForeignKey(Pool, on_delete=models.PROTECT)
class Meta:
abstract = True
def get_error_display(self):
if self.status == 'success':
return
error_class = str(self.pricing_data.get('error'))
error_details = self.pricing_data.get('error_details', {})
error_messages = {
'AgendaPricingNotFound': _('Agenda pricing not found'),
'CriteriaConditionNotFound': _('No matching criteria for category: %(category)s'),
'MultipleDefaultCriteriaCondition': _(
'Multiple default criteria found for category: %(category)s'
),
'PricingDataError': _('Impossible to determine a pricing for criterias: %(criterias)s'),
'PricingDataFormatError': _('Pricing is not a %(wanted)s: %(pricing)s'),
'PricingUnknownCheckStatusError': _('Unknown check status: %(status)s'),
'PricingEventNotCheckedError': _('Event is not checked'),
'PricingBookingNotCheckedError': _('Booking is not checked'),
'PricingMultipleBookingError': _('Multiple booking found'),
'PricingBookingCheckTypeError': _('Check type error: %(checktype_error)s'),
}
formats = {
'decimal': _('decimal'),
}
checktype_error = None
if error_details.get('reason'):
reasons = {
'not-found': _('not found'),
'wrong-kind': _('wrong kind (group: %(check_type_group)s, check type: %(check_type)s)'),
'not-configured': _(
'pricing not configured (group: %(check_type_group)s, check type: %(check_type)s)'
),
}
checktype_error = reasons.get(error_details['reason']) % {
'check_type': error_details.get('check_type'),
'check_type_group': error_details.get('check_type_group'),
}
return (
error_messages.get(error_class, '')
% {
'category': error_details.get('category'),
'criterias': ', '.join(
'%s (%s)' % (v, _('category: %s') % k)
for k, v in error_details.get('criterias', {}).items()
),
'pricing': error_details.get('pricing'),
'wanted': formats.get(error_details.get('wanted')),
'status': error_details.get('status'),
'checktype_error': checktype_error,
}
or error_class
)
class DraftInvoiceLine(AbstractInvoiceLine):
invoice = models.ForeignKey(DraftInvoice, on_delete=models.PROTECT, null=True, related_name='lines')
from_injected_line = models.ForeignKey(InjectedLine, on_delete=models.PROTECT, null=True)
class InvoiceLine(AbstractInvoiceLine):
invoice = models.ForeignKey(Invoice, on_delete=models.PROTECT, null=True, related_name='lines')
from_injected_line = models.ForeignKey(InjectedLine, on_delete=models.PROTECT, null=True)
error_status = models.CharField(
max_length=10,
choices=[
('ignored', _('Ignored')),
('fixed', _('Fixed')),
],
blank=True,
)