lingo/lingo/invoicing/models.py

296 lines
10 KiB
Python

# lingo - payment and billing system
# Copyright (C) 2022 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import copy
import sys
import traceback
from django.contrib.auth.models import Group
from django.contrib.postgres.fields import JSONField
from django.core.serializers.json import DjangoJSONEncoder
from django.db import connection, models, transaction
from django.utils.formats import date_format
from django.utils.text import slugify
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
from lingo.agendas.chrono import ChronoError
from lingo.utils.misc import generate_slug
class RegieImportError(Exception):
pass
class RegieNotConfigured(Exception):
def __init__(self, msg):
self.msg = msg
class Regie(models.Model):
label = models.CharField(_('Label'), max_length=150)
slug = models.SlugField(_('Identifier'), max_length=160, unique=True)
description = models.TextField(
_('Description'), null=True, blank=True, help_text=_('Optional regie description.')
)
cashier_role = models.ForeignKey(
Group,
blank=True,
null=True,
default=None,
related_name='+',
verbose_name=_('Cashier Role'),
on_delete=models.SET_NULL,
)
class Meta:
ordering = ['label']
def __str__(self):
return self.label
def save(self, *args, **kwargs):
if not self.slug:
self.slug = generate_slug(self)
super().save(*args, **kwargs)
@property
def base_slug(self):
return slugify(self.label)
def export_json(self):
return {
'label': self.label,
'slug': self.slug,
'description': self.description,
'permissions': {
'cashier': self.cashier_role.name if self.cashier_role else None,
},
}
@classmethod
def import_json(cls, data):
data = copy.deepcopy(data)
permissions = data.pop('permissions') or {}
role_name = permissions.get('cashier')
if role_name:
try:
data['cashier_role'] = Group.objects.get(name=role_name)
except Group.DoesNotExists:
raise RegieImportError('Missing role: %s' % role_name)
except Group.MultipleObjectsReturned:
raise RegieImportError('Multiple role exist with the name: %s' % role_name)
regie, created = cls.objects.update_or_create(slug=data['slug'], defaults=data)
return created, regie
class Campaign(models.Model):
date_start = models.DateField(_('Start date'))
date_end = models.DateField(_('End date'))
date_issue = models.DateField(_('Issue date'))
def __str__(self):
return _('From %(start)s to %(end)s') % {
'start': date_format(self.date_start, 'd/m/Y'),
'end': date_format(self.date_end, 'd/m/Y'),
}
def generate(self, spool=True, draft=True):
pool = self.pool_set.create(draft=draft)
if spool and 'uwsgi' in sys.modules:
from lingo.invoicing.spooler import generate_invoices
tenant = getattr(connection, 'tenant', None)
transaction.on_commit(
lambda: generate_invoices.spool(
campaign_id=str(self.pk), pool_id=str(pool.pk), domain=getattr(tenant, 'domain_url', None)
)
)
return
pool.generate_invoices()
class Pool(models.Model):
campaign = models.ForeignKey(Campaign, on_delete=models.PROTECT)
draft = models.BooleanField(default=True)
created_at = models.DateTimeField(auto_now_add=True)
completed_at = models.DateTimeField(null=True)
status = models.CharField(
choices=[
('registered', _('Registered')),
('running', _('Running')),
('failed', _('Failed')),
('completed', _('Completed')),
],
default='registered',
max_length=100,
)
exception = models.TextField()
def generate_invoices(self):
from lingo.invoicing import utils
self.status = 'running'
self.save()
try:
# get agendas with pricing corresponding to the period
agendas = utils.get_agendas(pool=self)
# get subscribed users for each agenda, for the period
user_external_ids = utils.get_users_from_subscriptions(agendas=agendas, pool=self)
# get invoice lines for all subscribed users, for each agenda in the corresponding period
lines = utils.get_all_invoice_lines(
agendas=agendas, user_external_ids=user_external_ids, pool=self
)
# and generate invoices
utils.generate_invoices_from_lines(agendas=agendas, all_lines=lines, pool=self)
except (RegieNotConfigured, ChronoError) as e:
self.status = 'failed'
self.exception = e.msg
except Exception:
self.status = 'failed'
self.exception = traceback.format_exc()
raise
finally:
if self.status == 'running':
self.status = 'completed'
self.completed_at = now()
self.save()
class AbstractInvoice(models.Model):
label = models.CharField(_('Label'), max_length=300)
total_amount = models.DecimalField(max_digits=9, decimal_places=2, default=0)
date_issue = models.DateField(_('Issue date'))
regie = models.ForeignKey(Regie, on_delete=models.PROTECT)
payer = models.CharField(_('Payer'), max_length=300)
pool = models.ForeignKey(Pool, on_delete=models.PROTECT)
class Meta:
abstract = True
class DraftInvoice(AbstractInvoice):
pass
class Invoice(AbstractInvoice):
pass
class InjectedLine(models.Model):
event_date = models.DateField()
slug = models.SlugField(max_length=250)
label = models.CharField(max_length=260)
quantity = models.FloatField()
unit_amount = models.DecimalField(max_digits=9, decimal_places=2)
total_amount = models.DecimalField(max_digits=9, decimal_places=2)
user_external_id = models.CharField(max_length=250)
payer_external_id = models.CharField(max_length=250)
regie = models.ForeignKey(Regie, on_delete=models.PROTECT)
class AbstractInvoiceLine(models.Model):
slug = models.SlugField(max_length=250)
label = models.CharField(max_length=260)
quantity = models.FloatField()
unit_amount = models.DecimalField(max_digits=9, decimal_places=2)
total_amount = models.DecimalField(max_digits=9, decimal_places=2)
user_external_id = models.CharField(max_length=250)
payer_external_id = models.CharField(max_length=250)
event = JSONField(default=dict)
pricing_data = JSONField(default=dict, encoder=DjangoJSONEncoder)
status = models.CharField(
max_length=10,
choices=[
('success', _('Success')),
('warning', _('Warning')),
('error', _('Error')),
],
)
pool = models.ForeignKey(Pool, on_delete=models.PROTECT)
class Meta:
abstract = True
def get_error_display(self):
if self.status == 'success':
return
error_class = str(self.pricing_data.get('error'))
error_details = self.pricing_data.get('error_details', {})
error_messages = {
'AgendaPricingNotFound': _('Agenda pricing not found'),
'CriteriaConditionNotFound': _('No matching criteria for category: %(category)s'),
'MultipleDefaultCriteriaCondition': _(
'Multiple default criteria found for category: %(category)s'
),
'PricingDataError': _('Impossible to determine a pricing for criterias: %(criterias)s'),
'PricingDataFormatError': _('Pricing is not a %(wanted)s: %(pricing)s'),
'PricingUnknownCheckStatusError': _('Unknown check status: %(status)s'),
'PricingEventNotCheckedError': _('Event is not checked'),
'PricingBookingNotCheckedError': _('Booking is not checked'),
'PricingMultipleBookingError': _('Multiple booking found'),
'PricingBookingCheckTypeError': _('Check type error: %(checktype_error)s'),
}
formats = {
'decimal': _('decimal'),
}
checktype_error = None
if error_details.get('reason'):
reasons = {
'not-found': _('not found'),
'wrong-kind': _('wrong kind (group: %(check_type_group)s, check type: %(check_type)s)'),
'not-configured': _(
'pricing not configured (group: %(check_type_group)s, check type: %(check_type)s)'
),
}
checktype_error = reasons.get(error_details['reason']) % {
'check_type': error_details.get('check_type'),
'check_type_group': error_details.get('check_type_group'),
}
return (
error_messages.get(error_class, '')
% {
'category': error_details.get('category'),
'criterias': ', '.join(
'%s (%s)' % (v, _('category: %s') % k)
for k, v in error_details.get('criterias', {}).items()
),
'pricing': error_details.get('pricing'),
'wanted': formats.get(error_details.get('wanted')),
'status': error_details.get('status'),
'checktype_error': checktype_error,
}
or error_class
)
class DraftInvoiceLine(AbstractInvoiceLine):
invoice = models.ForeignKey(DraftInvoice, on_delete=models.PROTECT, null=True, related_name='lines')
from_injected_line = models.ForeignKey(InjectedLine, on_delete=models.PROTECT, null=True)
class InvoiceLine(AbstractInvoiceLine):
invoice = models.ForeignKey(Invoice, on_delete=models.PROTECT, null=True, related_name='lines')
from_injected_line = models.ForeignKey(InjectedLine, on_delete=models.PROTECT, null=True)