wip/71528-generate-draft-invoices (#71528) #1
|
@ -10,6 +10,9 @@ recursive-include lingo/templates *.html *.txt
|
|||
recursive-include lingo/manager/templates *.html *.txt
|
||||
recursive-include lingo/pricing/templates *.html *.txt
|
||||
|
||||
# sql (migrations)
|
||||
recursive-include lingo/invoicing/sql *.sql
|
||||
|
||||
include COPYING README
|
||||
include MANIFEST.in
|
||||
include VERSION
|
||||
|
|
|
@ -132,14 +132,41 @@ def get_events(event_slugs, error_message=None, error_message_with_details=None)
|
|||
return result['data']
|
||||
|
||||
|
||||
def get_subscriptions(agenda_slug, user_external_id):
|
||||
result = get_chrono_json(
|
||||
'api/agenda/%s/subscription/?user_external_id=%s' % (agenda_slug, user_external_id)
|
||||
)
|
||||
if not result or not result.get('data'):
|
||||
def get_subscriptions(agenda_slug, user_external_id=None, date_start=None, date_end=None):
|
||||
url = 'api/agenda/%s/subscription/' % agenda_slug
|
||||
params = {}
|
||||
if user_external_id:
|
||||
params['user_external_id'] = user_external_id
|
||||
if date_start:
|
||||
params['date_start'] = date_start
|
||||
if date_end:
|
||||
params['date_end'] = date_end
|
||||
if params:
|
||||
url += '?%s' % '&'.join(['%s=%s' % (k, v) for k, v in params.items()])
|
||||
result = get_chrono_json(url)
|
||||
if not result:
|
||||
raise ChronoError(_('Unable to get subscription details'))
|
||||
if result.get('err'):
|
||||
raise ChronoError(_('Unable to get subscription details (%s)') % result['err_desc'])
|
||||
if not result.get('data'):
|
||||
if 'data' not in result:
|
||||
raise ChronoError(_('Unable to get subscription details'))
|
||||
return result['data']
|
||||
|
||||
|
||||
def get_check_status(agenda_slugs, user_external_id, date_start, date_end):
|
||||
result = get_chrono_json(
|
||||
'api/agendas/events/check-status/?user_external_id=%s&agendas=%s&date_start=%s&date_end=%s'
|
||||
% (
|
||||
user_external_id,
|
||||
','.join(agenda_slugs),
|
||||
date_start,
|
||||
date_end,
|
||||
)
|
||||
)
|
||||
if not result:
|
||||
raise ChronoError(_('Unable to get check status'))
|
||||
if result.get('err'):
|
||||
raise ChronoError(_('Unable to get check status (%s)') % result['err_desc'])
|
||||
if 'data' not in result:
|
||||
raise ChronoError(_('Unable to get check status'))
|
||||
return result['data']
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
# lingo - payment and billing system
|
||||
# Copyright (C) 2022 Entr'ouvert
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU Affero General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import datetime
|
||||
|
||||
from django.core.management.base import BaseCommand, CommandError
|
||||
|
||||
from lingo.invoicing.utils import generate_invoices
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = 'Generate invoicing for a period'
|
||||
|
||||
def add_arguments(self, parser):
|
||||
parser.add_argument('date_start')
|
||||
parser.add_argument('date_end')
|
||||
|
||||
def handle(self, *args, **options):
|
||||
try:
|
||||
date_start = datetime.datetime.fromisoformat(options['date_start']).date()
|
||||
except ValueError:
|
||||
raise CommandError('Bad value "%s" for date_start' % options['date_start'])
|
||||
try:
|
||||
date_end = datetime.datetime.fromisoformat(options['date_end']).date()
|
||||
except ValueError:
|
||||
raise CommandError('Bad value "%s" for date_end' % options['date_end'])
|
||||
|
||||
generate_invoices(date_start=date_start, date_end=date_end)
|
||||
|
||||
self.stdout.write(
|
||||
self.style.SUCCESS('Invoicing generation OK (start: %s, end: %s)' % (date_start, date_end))
|
||||
)
|
|
@ -0,0 +1,124 @@
|
|||
import django.contrib.postgres.fields.jsonb
|
||||
import django.db.models.deletion
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('invoicing', '0001_initial'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name='Invoice',
|
||||
fields=[
|
||||
(
|
||||
'id',
|
||||
models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'),
|
||||
),
|
||||
('label', models.CharField(max_length=300, verbose_name='Label')),
|
||||
('total_amount', models.DecimalField(decimal_places=2, max_digits=9, default=0)),
|
||||
('date_issue', models.DateField(verbose_name='Issue date')),
|
||||
('payer', models.CharField(max_length=300, verbose_name='Payer')),
|
||||
(
|
||||
'regie',
|
||||
models.ForeignKey(
|
||||
on_delete=django.db.models.deletion.PROTECT,
|
||||
to='invoicing.Regie',
|
||||
),
|
||||
),
|
||||
],
|
||||
options={
|
||||
'abstract': False,
|
||||
},
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name='InvoiceLine',
|
||||
fields=[
|
||||
(
|
||||
'id',
|
||||
models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'),
|
||||
),
|
||||
('slug', models.SlugField(max_length=250)),
|
||||
('label', models.CharField(max_length=260)),
|
||||
('quantity', models.FloatField()),
|
||||
('unit_amount', models.DecimalField(decimal_places=2, max_digits=9)),
|
||||
('total_amount', models.DecimalField(decimal_places=2, max_digits=9)),
|
||||
('user_external_id', models.CharField(max_length=250)),
|
||||
('payer_external_id', models.CharField(max_length=250)),
|
||||
('event', django.contrib.postgres.fields.jsonb.JSONField(default=dict)),
|
||||
('pricing_data', django.contrib.postgres.fields.jsonb.JSONField(default=dict)),
|
||||
(
|
||||
'status',
|
||||
models.CharField(choices=[('success', 'Success'), ('error', 'Error')], max_length=10),
|
||||
),
|
||||
(
|
||||
'invoice',
|
||||
models.ForeignKey(
|
||||
null=True,
|
||||
on_delete=django.db.models.deletion.PROTECT,
|
||||
to='invoicing.Invoice',
|
||||
related_name='lines',
|
||||
),
|
||||
),
|
||||
],
|
||||
options={
|
||||
'abstract': False,
|
||||
},
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name='DraftInvoice',
|
||||
fields=[
|
||||
(
|
||||
'id',
|
||||
models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'),
|
||||
),
|
||||
('label', models.CharField(max_length=300, verbose_name='Label')),
|
||||
('total_amount', models.DecimalField(decimal_places=2, max_digits=9, default=0)),
|
||||
('date_issue', models.DateField(verbose_name='Issue date')),
|
||||
('payer', models.CharField(max_length=300, verbose_name='Payer')),
|
||||
(
|
||||
'regie',
|
||||
models.ForeignKey(on_delete=django.db.models.deletion.PROTECT, to='invoicing.Regie'),
|
||||
),
|
||||
],
|
||||
options={
|
||||
'abstract': False,
|
||||
},
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name='DraftInvoiceLine',
|
||||
fields=[
|
||||
(
|
||||
'id',
|
||||
models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'),
|
||||
),
|
||||
('slug', models.SlugField(max_length=250)),
|
||||
('label', models.CharField(max_length=260)),
|
||||
('quantity', models.FloatField()),
|
||||
('unit_amount', models.DecimalField(decimal_places=2, max_digits=9)),
|
||||
('total_amount', models.DecimalField(decimal_places=2, max_digits=9)),
|
||||
('user_external_id', models.CharField(max_length=250)),
|
||||
('payer_external_id', models.CharField(max_length=250)),
|
||||
('event', django.contrib.postgres.fields.jsonb.JSONField(default=dict)),
|
||||
('pricing_data', django.contrib.postgres.fields.jsonb.JSONField(default=dict)),
|
||||
(
|
||||
'status',
|
||||
models.CharField(choices=[('success', 'Success'), ('error', 'Error')], max_length=10),
|
||||
),
|
||||
(
|
||||
'invoice',
|
||||
models.ForeignKey(
|
||||
null=True,
|
||||
on_delete=django.db.models.deletion.PROTECT,
|
||||
to='invoicing.DraftInvoice',
|
||||
related_name='lines',
|
||||
),
|
||||
),
|
||||
],
|
||||
options={
|
||||
'abstract': False,
|
||||
},
|
||||
),
|
||||
]
|
|
@ -0,0 +1,27 @@
|
|||
import os
|
||||
|
||||
from django.db import migrations
|
||||
|
||||
with open(
|
||||
os.path.join(
|
||||
os.path.dirname(os.path.realpath(__file__)),
|
||||
'..',
|
||||
'sql',
|
||||
'invoice_triggers_for_amount.sql',
|
||||
)
|
||||
) as sql_file:
|
||||
sql_triggers = sql_file.read()
|
||||
|
||||
|
||||
sql_forwards = sql_triggers
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('invoicing', '0002_invoice'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.RunSQL(sql=sql_forwards, reverse_sql=migrations.RunSQL.noop),
|
||||
]
|
|
@ -17,6 +17,7 @@
|
|||
import copy
|
||||
|
||||
from django.contrib.auth.models import Group
|
||||
from django.contrib.postgres.fields import JSONField
|
||||
from django.db import models
|
||||
from django.utils.text import slugify
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
@ -84,3 +85,53 @@ class Regie(models.Model):
|
|||
|
||||
regie, created = cls.objects.update_or_create(slug=data['slug'], defaults=data)
|
||||
return created, regie
|
||||
|
||||
|
||||
class AbstractInvoice(models.Model):
|
||||
label = models.CharField(_('Label'), max_length=300)
|
||||
total_amount = models.DecimalField(max_digits=9, decimal_places=2, default=0)
|
||||
date_issue = models.DateField(_('Issue date'))
|
||||
regie = models.ForeignKey(Regie, on_delete=models.PROTECT)
|
||||
payer = models.CharField(_('Payer'), max_length=300)
|
||||
|
||||
class Meta:
|
||||
abstract = True
|
||||
|
||||
|
||||
class DraftInvoice(AbstractInvoice):
|
||||
pass
|
||||
|
||||
|
||||
class Invoice(AbstractInvoice):
|
||||
pass
|
||||
|
||||
|
||||
class AbstractInvoiceLine(models.Model):
|
||||
slug = models.SlugField(max_length=250)
|
||||
label = models.CharField(max_length=260)
|
||||
quantity = models.FloatField()
|
||||
unit_amount = models.DecimalField(max_digits=9, decimal_places=2)
|
||||
total_amount = models.DecimalField(max_digits=9, decimal_places=2)
|
||||
|
||||
user_external_id = models.CharField(max_length=250)
|
||||
payer_external_id = models.CharField(max_length=250)
|
||||
event = JSONField(default=dict)
|
||||
pricing_data = JSONField(default=dict)
|
||||
status = models.CharField(
|
||||
max_length=10,
|
||||
choices=[
|
||||
('success', _('Success')),
|
||||
('error', _('Error')),
|
||||
],
|
||||
)
|
||||
|
||||
class Meta:
|
||||
abstract = True
|
||||
|
||||
|
||||
class DraftInvoiceLine(AbstractInvoiceLine):
|
||||
invoice = models.ForeignKey(DraftInvoice, on_delete=models.PROTECT, null=True, related_name='lines')
|
||||
|
||||
|
||||
class InvoiceLine(AbstractInvoiceLine):
|
||||
invoice = models.ForeignKey(Invoice, on_delete=models.PROTECT, null=True, related_name='lines')
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
CREATE OR REPLACE FUNCTION set_invoice_total_amount() RETURNS TRIGGER AS $$
|
||||
DECLARE
|
||||
invoice_ids integer[];
|
||||
BEGIN
|
||||
IF TG_OP = 'INSERT' THEN
|
||||
invoice_ids := ARRAY[NEW.invoice_id];
|
||||
ELSIF TG_OP = 'DELETE' THEN
|
||||
invoice_ids := ARRAY[OLD.invoice_id];
|
||||
ELSIF TG_OP = 'UPDATE' THEN
|
||||
invoice_ids := ARRAY[NEW.invoice_id, OLD.invoice_id];
|
||||
END IF;
|
||||
|
||||
EXECUTE 'UPDATE ' || substring(TG_TABLE_NAME for length(TG_TABLE_NAME) - 4) || ' i
|
||||
SET total_amount = COALESCE(
|
||||
(
|
||||
SELECT SUM(l.total_amount)
|
||||
FROM ' || TG_TABLE_NAME || ' l
|
||||
WHERE l.invoice_id = i.id
|
||||
AND l.status = ''success''
|
||||
), 0
|
||||
)
|
||||
WHERE id = ANY($1);' USING invoice_ids;
|
||||
|
||||
IF TG_OP = 'DELETE' THEN
|
||||
RETURN OLD;
|
||||
ELSE
|
||||
RETURN NEW;
|
||||
END IF;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
|
||||
DROP TRIGGER IF EXISTS set_draftinvoice_total_amount_trg ON invoicing_draftinvoiceline;
|
||||
CREATE TRIGGER set_draftinvoice_total_amount_trg
|
||||
AFTER INSERT OR UPDATE OF total_amount, invoice_id, status OR DELETE ON invoicing_draftinvoiceline
|
||||
FOR EACH ROW
|
||||
EXECUTE PROCEDURE set_invoice_total_amount();
|
||||
|
||||
|
||||
DROP TRIGGER IF EXISTS set_invoice_total_amount_trg ON invoicing_invoiceline;
|
||||
CREATE TRIGGER set_invoice_total_amount_trg
|
||||
AFTER INSERT OR UPDATE OF total_amount, invoice_id, status OR DELETE ON invoicing_invoiceline
|
||||
FOR EACH ROW
|
||||
EXECUTE PROCEDURE set_invoice_total_amount();
|
|
@ -0,0 +1,246 @@
|
|||
# lingo - payment and billing system
|
||||
# Copyright (C) 2022 Entr'ouvert
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU Affero General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import collections
|
||||
import datetime
|
||||
|
||||
from django.test.client import RequestFactory
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
|
||||
from lingo.agendas.chrono import get_check_status, get_subscriptions
|
||||
from lingo.agendas.models import Agenda
|
||||
from lingo.invoicing.models import DraftInvoice, DraftInvoiceLine
|
||||
from lingo.pricing.models import AgendaPricing, AgendaPricingNotFound, PricingError
|
||||
|
||||
|
||||
def get_agendas(date_start, date_end):
|
||||
agendas_pricings = AgendaPricing.objects.filter(flat_fee_schedule=False).extra(
|
||||
where=["(date_start, date_end) OVERLAPS (%s, %s)"], params=[date_start, date_end]
|
||||
)
|
||||
return Agenda.objects.filter(pk__in=agendas_pricings.values('agendas')).order_by('pk')
|
||||
|
||||
|
||||
def get_all_subscriptions(agendas, date_start, date_end):
|
||||
# result:
|
||||
# {
|
||||
# 'user_id': {
|
||||
# 'agenda_slug': [sub1, sub2, ...],
|
||||
# 'agenda_slug2': [sub1, sub2, ...],
|
||||
# },
|
||||
# ...
|
||||
# }
|
||||
all_subscriptions = {}
|
||||
for agenda in agendas:
|
||||
subscriptions = get_subscriptions(agenda_slug=agenda.slug, date_start=date_start, date_end=date_end)
|
||||
for subscription in subscriptions:
|
||||
if subscription['user_external_id'] not in all_subscriptions:
|
||||
all_subscriptions[subscription['user_external_id']] = {}
|
||||
user_subs = all_subscriptions[subscription['user_external_id']]
|
||||
if agenda.slug not in user_subs:
|
||||
user_subs[agenda.slug] = []
|
||||
user_subs[agenda.slug].append(subscription)
|
||||
return all_subscriptions
|
||||
|
||||
|
||||
def get_invoice_lines_for_user(
|
||||
agendas, agendas_pricings, user_external_id, subscriptions, date_start, date_end
|
||||
):
|
||||
def get_agenda_pricing(agendas_pricings_for_agenda, date_event):
|
||||
# same logic as AgendaPricing.get_agenda_pricing
|
||||
for agenda_pricing in agendas_pricings_for_agenda:
|
||||
if agenda_pricing.date_start > date_event:
|
||||
continue
|
||||
if agenda_pricing.date_end <= date_event:
|
||||
continue
|
||||
return agenda_pricing
|
||||
raise AgendaPricingNotFound
|
||||
|
||||
def get_subscription(subscriptions, event_date):
|
||||
# get subscription matching event_date
|
||||
for subscription in subscriptions:
|
||||
sub_start_date = datetime.date.fromisoformat(subscription['date_start'])
|
||||
sub_end_date = datetime.date.fromisoformat(subscription['date_end'])
|
||||
if sub_start_date > event_date:
|
||||
continue
|
||||
if sub_end_date <= event_date:
|
||||
continue
|
||||
return subscription
|
||||
|
||||
if not agendas or not subscriptions:
|
||||
return []
|
||||
|
||||
agendas_pricings_by_agendas = collections.defaultdict(list)
|
||||
for agenda_pricing in agendas_pricings:
|
||||
if agenda_pricing.flat_fee_schedule:
|
||||
continue
|
||||
for agenda in agenda_pricing.agendas.all():
|
||||
agendas_pricings_by_agendas[agenda.slug].append(agenda_pricing)
|
||||
|
||||
# get check status for user_external_id, on agendas, for the period
|
||||
check_status_list = get_check_status(
|
||||
agenda_slugs=list(subscriptions.keys()),
|
||||
user_external_id=user_external_id,
|
||||
date_start=date_start,
|
||||
date_end=date_end,
|
||||
)
|
||||
request = RequestFactory().get('/') # XXX
|
||||
agendas_by_slug = {a.slug: a for a in agendas}
|
||||
|
||||
# build lines from check status
|
||||
lines = []
|
||||
for check_status in check_status_list:
|
||||
serialized_event = check_status['event']
|
||||
event_date = datetime.datetime.fromisoformat(serialized_event['start_datetime']).date()
|
||||
event_slug = '%s@%s' % (serialized_event['agenda'], serialized_event['slug'])
|
||||
if serialized_event['agenda'] not in subscriptions:
|
||||
# should not happen, check-status endpoint is based on real subscriptions
|
||||
continue
|
||||
serialized_subscription = get_subscription(
|
||||
subscriptions=subscriptions[serialized_event['agenda']],
|
||||
event_date=event_date,
|
||||
)
|
||||
if not serialized_subscription:
|
||||
# should not happen, check-status endpoint is based on real subscriptions
|
||||
continue
|
||||
|
||||
agenda = agendas_by_slug[serialized_event['agenda']]
|
||||
try:
|
||||
agenda_pricing = get_agenda_pricing(agendas_pricings_by_agendas.get(agenda.slug), event_date)
|
||||
pricing_data = agenda_pricing.get_pricing_data_for_event(
|
||||
request=request,
|
||||
agenda=agenda,
|
||||
event=serialized_event,
|
||||
subscription=serialized_subscription,
|
||||
check_status=check_status['check_status'],
|
||||
booking=check_status['booking'],
|
||||
user_external_id=user_external_id,
|
||||
adult_external_id=user_external_id, # XXX
|
||||
)
|
||||
except AgendaPricingNotFound:
|
||||
# no pricing, no invoice
|
||||
# can happen if pricing model defined only on a part of the requested period
|
||||
# XXX error, warning, or ignore ?
|
||||
continue
|
||||
except PricingError as e:
|
||||
# XXX explicit each error
|
||||
# XXX and log context ?
|
||||
pricing_error = {'error': e.details}
|
||||
lines.append(
|
||||
DraftInvoiceLine(
|
||||
slug=event_slug,
|
||||
label=serialized_event['label'],
|
||||
quantity=0,
|
||||
unit_amount=0,
|
||||
total_amount=0,
|
||||
user_external_id=user_external_id,
|
||||
payer_external_id=user_external_id, # XXX
|
||||
event=serialized_event,
|
||||
pricing_data=pricing_error,
|
||||
status='error',
|
||||
)
|
||||
)
|
||||
else:
|
||||
# XXX log all context !
|
||||
lines.append(
|
||||
DraftInvoiceLine(
|
||||
slug=event_slug,
|
||||
label=serialized_event['label'],
|
||||
quantity=1,
|
||||
unit_amount=pricing_data['pricing'],
|
||||
total_amount=pricing_data['pricing'],
|
||||
user_external_id=user_external_id,
|
||||
payer_external_id=user_external_id, # XXX
|
||||
event=serialized_event,
|
||||
pricing_data=pricing_data,
|
||||
status='success',
|
||||
)
|
||||
)
|
||||
DraftInvoiceLine.objects.bulk_create(lines)
|
||||
return lines
|
||||
|
||||
|
||||
def get_all_invoice_lines(agendas, subscriptions, date_start, date_end):
|
||||
agendas_pricings = (
|
||||
AgendaPricing.objects.filter(flat_fee_schedule=False)
|
||||
.extra(where=["(date_start, date_end) OVERLAPS (%s, %s)"], params=[date_start, date_end])
|
||||
.prefetch_related('agendas', 'pricing__criterias', 'pricing__categories')
|
||||
)
|
||||
|
||||
lines = []
|
||||
for user_external_id, user_subs in subscriptions.items():
|
||||
# generate lines for each user
|
||||
lines += get_invoice_lines_for_user(
|
||||
agendas=agendas,
|
||||
agendas_pricings=agendas_pricings,
|
||||
user_external_id=user_external_id,
|
||||
subscriptions=user_subs,
|
||||
date_start=date_start,
|
||||
date_end=date_end,
|
||||
)
|
||||
return lines
|
||||
|
||||
|
||||
def generate_invoices_from_lines(agendas, all_lines, date_start, date_end):
|
||||
agendas_by_slug = {a.slug: a for a in agendas}
|
||||
|
||||
# regroup lines by regie, and by payer_external_id (payer)
|
||||
lines_by_regie = {}
|
||||
for line in all_lines:
|
||||
if line.status != 'success':
|
||||
# ignore lines in error
|
||||
continue
|
||||
agenda_slug = line.event['agenda']
|
||||
if agenda_slug not in agendas_by_slug:
|
||||
# should not happen
|
||||
continue
|
||||
regie = agendas_by_slug[agenda_slug].regie
|
||||
if not regie:
|
||||
# XXX what should we do if regie is not configured ?
|
||||
continue
|
||||
if regie.pk not in lines_by_regie:
|
||||
lines_by_regie[regie.pk] = {}
|
||||
regie_subs = lines_by_regie[regie.pk]
|
||||
if line.payer_external_id not in regie_subs:
|
||||
regie_subs[line.payer_external_id] = []
|
||||
regie_subs[line.payer_external_id].append(line)
|
||||
|
||||
# generate invoices by regie and by payer_external_id (payer)
|
||||
invoices = []
|
||||
for regie_id, regie_subs in lines_by_regie.items():
|
||||
for payer_external_id, adult_lines in regie_subs.items():
|
||||
invoice = DraftInvoice.objects.create(
|
||||
label=_('Invoice from %s to %s') % (date_start, date_end - datetime.timedelta(days=1)),
|
||||
date_issue=date_end, # XXX
|
||||
regie_id=regie_id,
|
||||
payer=payer_external_id,
|
||||
)
|
||||
DraftInvoiceLine.objects.filter(pk__in=[line.pk for line in adult_lines]).update(invoice=invoice)
|
||||
invoices.append(invoice)
|
||||
|
||||
return invoices
|
||||
|
||||
|
||||
def generate_invoices(date_start, date_end):
|
||||
# get agendas with pricing corresponding to the period
|
||||
agendas = get_agendas(date_start=date_start, date_end=date_end)
|
||||
# get subscriptions for each agenda, for the period
|
||||
subscriptions = get_all_subscriptions(agendas=agendas, date_start=date_start, date_end=date_end)
|
||||
# get invoice lines for all subscribed users, for each agenda in the corresponding period
|
||||
lines = get_all_invoice_lines(
|
||||
agendas=agendas, subscriptions=subscriptions, date_start=date_start, date_end=date_end
|
||||
)
|
||||
# and generate invoices
|
||||
generate_invoices_from_lines(agendas=agendas, all_lines=lines, date_start=date_start, date_end=date_end)
|
|
@ -507,12 +507,16 @@ class AgendaPricing(models.Model):
|
|||
def compute_pricing(self, context):
|
||||
criterias = {}
|
||||
categories = []
|
||||
# for each category (ordered)
|
||||
for category in self.pricing.categories.all().order_by('pricingcriteriacategory__order'):
|
||||
# for each category
|
||||
for category in self.pricing.categories.all():
|
||||
criterias[category.slug] = None
|
||||
categories.append(category.slug)
|
||||
# find the first matching criteria (criterias are ordered)
|
||||
for criteria in self.pricing.criterias.filter(category=category, default=False):
|
||||
for criteria in self.pricing.criterias.all():
|
||||
if criteria.category_id != category.pk:
|
||||
continue
|
||||
if criteria.default:
|
||||
continue
|
||||
condition = criteria.compute_condition(context)
|
||||
if condition:
|
||||
criterias[category.slug] = criteria.slug
|
||||
|
@ -520,7 +524,9 @@ class AgendaPricing(models.Model):
|
|||
if criterias[category.slug] is not None:
|
||||
continue
|
||||
# if no match, take default criteria if only once defined
|
||||
default_criterias = self.pricing.criterias.filter(category=category, default=True)
|
||||
default_criterias = [
|
||||
c for c in self.pricing.criterias.all() if c.default and c.category_id == category.pk
|
||||
]
|
||||
if len(default_criterias) > 1:
|
||||
raise MultipleDefaultCriteriaCondition(details={'category': category.slug})
|
||||
if not default_criterias:
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
import datetime
|
||||
import json
|
||||
from unittest import mock
|
||||
|
||||
|
@ -8,6 +9,7 @@ from requests.models import Response
|
|||
from lingo.agendas.chrono import (
|
||||
ChronoError,
|
||||
collect_agenda_data,
|
||||
get_check_status,
|
||||
get_event,
|
||||
get_events,
|
||||
get_subscriptions,
|
||||
|
@ -282,12 +284,12 @@ def test_get_events():
|
|||
def test_get_subscriptions_no_service(settings):
|
||||
settings.KNOWN_SERVICES = {}
|
||||
with pytest.raises(ChronoError) as e:
|
||||
get_subscriptions('foo', 'user:1')
|
||||
get_subscriptions(agenda_slug='foo')
|
||||
assert str(e.value) == 'Unable to get subscription details'
|
||||
|
||||
settings.KNOWN_SERVICES = {'other': []}
|
||||
with pytest.raises(ChronoError) as e:
|
||||
get_subscriptions('foo', 'user:1')
|
||||
get_subscriptions(agenda_slug='foo')
|
||||
assert str(e.value) == 'Unable to get subscription details'
|
||||
|
||||
|
||||
|
@ -295,7 +297,7 @@ def test_get_subscriptions():
|
|||
with mock.patch('requests.Session.get') as requests_get:
|
||||
requests_get.side_effect = ConnectionError()
|
||||
with pytest.raises(ChronoError) as e:
|
||||
get_subscriptions('foo', 'user:1')
|
||||
get_subscriptions(agenda_slug='foo')
|
||||
assert str(e.value) == 'Unable to get subscription details'
|
||||
|
||||
with mock.patch('requests.Session.get') as requests_get:
|
||||
|
@ -303,7 +305,7 @@ def test_get_subscriptions():
|
|||
mock_resp.status_code = 500
|
||||
requests_get.return_value = mock_resp
|
||||
with pytest.raises(ChronoError) as e:
|
||||
get_subscriptions('foo', 'user:1')
|
||||
get_subscriptions(agenda_slug='foo')
|
||||
assert str(e.value) == 'Unable to get subscription details'
|
||||
|
||||
with mock.patch('requests.Session.get') as requests_get:
|
||||
|
@ -311,25 +313,148 @@ def test_get_subscriptions():
|
|||
mock_resp.status_code = 404
|
||||
requests_get.return_value = mock_resp
|
||||
with pytest.raises(ChronoError) as e:
|
||||
get_subscriptions('foo', 'user:1')
|
||||
get_subscriptions(agenda_slug='foo')
|
||||
assert str(e.value) == 'Unable to get subscription details'
|
||||
|
||||
with mock.patch('requests.Session.get') as requests_get:
|
||||
requests_get.return_value = MockedRequestResponse(content=json.dumps({'foo': 'bar'}))
|
||||
with pytest.raises(ChronoError) as e:
|
||||
get_subscriptions('foo', 'user:1')
|
||||
get_subscriptions(agenda_slug='foo')
|
||||
assert str(e.value) == 'Unable to get subscription details'
|
||||
|
||||
data = {'data': []}
|
||||
with mock.patch('requests.Session.get') as requests_get:
|
||||
requests_get.return_value = MockedRequestResponse(content=json.dumps(data))
|
||||
with pytest.raises(ChronoError) as e:
|
||||
get_subscriptions('foo', 'user:1')
|
||||
assert str(e.value) == 'Unable to get subscription details'
|
||||
assert requests_get.call_args_list[0][0] == ('api/agenda/foo/subscription/?user_external_id=user:1',)
|
||||
assert get_subscriptions(agenda_slug='foo') == []
|
||||
assert requests_get.call_args_list[0][0] == ('api/agenda/foo/subscription/',)
|
||||
assert requests_get.call_args_list[0][1]['remote_service']['url'] == 'http://chrono.example.org'
|
||||
|
||||
data = {'data': ['foo', 'bar']}
|
||||
with mock.patch('requests.Session.get') as requests_get:
|
||||
requests_get.return_value = MockedRequestResponse(content=json.dumps(data))
|
||||
assert get_subscriptions('foo', 'user:1') == ['foo', 'bar']
|
||||
assert get_subscriptions(agenda_slug='foo') == ['foo', 'bar']
|
||||
|
||||
data = {'data': []}
|
||||
with mock.patch('requests.Session.get') as requests_get:
|
||||
requests_get.return_value = MockedRequestResponse(content=json.dumps(data))
|
||||
get_subscriptions(agenda_slug='foo', user_external_id='user:1')
|
||||
assert requests_get.call_args_list[0][0] == ('api/agenda/foo/subscription/?user_external_id=user:1',)
|
||||
assert requests_get.call_args_list[0][1]['remote_service']['url'] == 'http://chrono.example.org'
|
||||
requests_get.reset_mock()
|
||||
get_subscriptions(agenda_slug='foo', date_start=datetime.date(2022, 9, 1))
|
||||
assert requests_get.call_args_list[0][0] == ('api/agenda/foo/subscription/?date_start=2022-09-01',)
|
||||
assert requests_get.call_args_list[0][1]['remote_service']['url'] == 'http://chrono.example.org'
|
||||
requests_get.reset_mock()
|
||||
get_subscriptions(agenda_slug='foo', date_end=datetime.date(2022, 10, 1))
|
||||
assert requests_get.call_args_list[0][0] == ('api/agenda/foo/subscription/?date_end=2022-10-01',)
|
||||
assert requests_get.call_args_list[0][1]['remote_service']['url'] == 'http://chrono.example.org'
|
||||
requests_get.reset_mock()
|
||||
get_subscriptions(
|
||||
agenda_slug='foo',
|
||||
user_external_id='user:1',
|
||||
date_start=datetime.date(2022, 9, 1),
|
||||
date_end=datetime.date(2022, 10, 1),
|
||||
)
|
||||
assert requests_get.call_args_list[0][0] == (
|
||||
'api/agenda/foo/subscription/?user_external_id=user:1&date_start=2022-09-01&date_end=2022-10-01',
|
||||
)
|
||||
assert requests_get.call_args_list[0][1]['remote_service']['url'] == 'http://chrono.example.org'
|
||||
|
||||
|
||||
def test_get_check_status_no_service(settings):
|
||||
settings.KNOWN_SERVICES = {}
|
||||
with pytest.raises(ChronoError) as e:
|
||||
get_check_status(
|
||||
agenda_slugs=['foo'],
|
||||
user_external_id='user:1',
|
||||
date_start=datetime.date(2022, 9, 1),
|
||||
date_end=datetime.date(2022, 10, 1),
|
||||
)
|
||||
assert str(e.value) == 'Unable to get check status'
|
||||
|
||||
settings.KNOWN_SERVICES = {'other': []}
|
||||
with pytest.raises(ChronoError) as e:
|
||||
get_check_status(
|
||||
agenda_slugs=['foo'],
|
||||
user_external_id='user:1',
|
||||
date_start=datetime.date(2022, 9, 1),
|
||||
date_end=datetime.date(2022, 10, 1),
|
||||
)
|
||||
assert str(e.value) == 'Unable to get check status'
|
||||
|
||||
|
||||
def test_get_check_status():
|
||||
with mock.patch('requests.Session.get') as requests_get:
|
||||
requests_get.side_effect = ConnectionError()
|
||||
with pytest.raises(ChronoError) as e:
|
||||
get_check_status(
|
||||
agenda_slugs=['foo', 'bar'],
|
||||
user_external_id='user:1',
|
||||
date_start=datetime.date(2022, 9, 1),
|
||||
date_end=datetime.date(2022, 10, 1),
|
||||
)
|
||||
assert str(e.value) == 'Unable to get check status'
|
||||
|
||||
with mock.patch('requests.Session.get') as requests_get:
|
||||
mock_resp = Response()
|
||||
mock_resp.status_code = 500
|
||||
requests_get.return_value = mock_resp
|
||||
with pytest.raises(ChronoError) as e:
|
||||
get_check_status(
|
||||
agenda_slugs=['foo', 'bar'],
|
||||
user_external_id='user:1',
|
||||
date_start=datetime.date(2022, 9, 1),
|
||||
date_end=datetime.date(2022, 10, 1),
|
||||
)
|
||||
assert str(e.value) == 'Unable to get check status'
|
||||
|
||||
with mock.patch('requests.Session.get') as requests_get:
|
||||
mock_resp = Response()
|
||||
mock_resp.status_code = 404
|
||||
requests_get.return_value = mock_resp
|
||||
with pytest.raises(ChronoError) as e:
|
||||
get_check_status(
|
||||
agenda_slugs=['foo', 'bar'],
|
||||
user_external_id='user:1',
|
||||
date_start=datetime.date(2022, 9, 1),
|
||||
date_end=datetime.date(2022, 10, 1),
|
||||
)
|
||||
assert str(e.value) == 'Unable to get check status'
|
||||
|
||||
with mock.patch('requests.Session.get') as requests_get:
|
||||
requests_get.return_value = MockedRequestResponse(content=json.dumps({'foo': 'bar'}))
|
||||
with pytest.raises(ChronoError) as e:
|
||||
get_check_status(
|
||||
agenda_slugs=['foo', 'bar'],
|
||||
user_external_id='user:1',
|
||||
date_start=datetime.date(2022, 9, 1),
|
||||
date_end=datetime.date(2022, 10, 1),
|
||||
)
|
||||
assert str(e.value) == 'Unable to get check status'
|
||||
|
||||
data = {'data': []}
|
||||
with mock.patch('requests.Session.get') as requests_get:
|
||||
requests_get.return_value = MockedRequestResponse(content=json.dumps(data))
|
||||
assert (
|
||||
get_check_status(
|
||||
agenda_slugs=['foo', 'bar'],
|
||||
user_external_id='user:1',
|
||||
date_start=datetime.date(2022, 9, 1),
|
||||
date_end=datetime.date(2022, 10, 1),
|
||||
)
|
||||
== []
|
||||
)
|
||||
assert requests_get.call_args_list[0][0] == (
|
||||
'api/agendas/events/check-status/?user_external_id=user:1&agendas=foo,bar&date_start=2022-09-01&date_end=2022-10-01',
|
||||
)
|
||||
assert requests_get.call_args_list[0][1]['remote_service']['url'] == 'http://chrono.example.org'
|
||||
|
||||
data = {'data': ['foo', 'bar']}
|
||||
with mock.patch('requests.Session.get') as requests_get:
|
||||
requests_get.return_value = MockedRequestResponse(content=json.dumps(data))
|
||||
assert get_check_status(
|
||||
agenda_slugs=['foo', 'bar'],
|
||||
user_external_id='user:1',
|
||||
date_start=datetime.date(2022, 9, 1),
|
||||
date_end=datetime.date(2022, 10, 1),
|
||||
) == ['foo', 'bar']
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,136 @@
|
|||
import datetime
|
||||
|
||||
import pytest
|
||||
|
||||
from lingo.invoicing.models import DraftInvoice, DraftInvoiceLine, Invoice, InvoiceLine, Regie
|
||||
|
||||
pytestmark = pytest.mark.django_db
|
||||
|
||||
|
||||
@pytest.mark.parametrize('draft', [True, False])
|
||||
def test_invoice_total_amount(draft):
|
||||
regie = Regie.objects.create()
|
||||
invoice_model = DraftInvoice if draft else Invoice
|
||||
line_model = DraftInvoiceLine if draft else InvoiceLine
|
||||
|
||||
invoice = invoice_model.objects.create(date_issue=datetime.date.today(), regie=regie)
|
||||
assert invoice.total_amount == 0
|
||||
invoice2 = invoice_model.objects.create(date_issue=datetime.date.today(), regie=regie)
|
||||
assert invoice2.total_amount == 0
|
||||
|
||||
# line with error status, ignored
|
||||
line = line_model.objects.create(
|
||||
invoice=invoice, # with invoice
|
||||
quantity=0,
|
||||
unit_amount=0,
|
||||
total_amount=0,
|
||||
status='error',
|
||||
)
|
||||
invoice.refresh_from_db()
|
||||
assert invoice.total_amount == 0
|
||||
invoice2.refresh_from_db()
|
||||
assert invoice2.total_amount == 0
|
||||
|
||||
# update line
|
||||
line.unit_amount = 10
|
||||
line.quantity = 1
|
||||
line.total_amount = 10
|
||||
line.save()
|
||||
invoice.refresh_from_db()
|
||||
# still in error
|
||||
assert invoice.total_amount == 0
|
||||
invoice2.refresh_from_db()
|
||||
assert invoice2.total_amount == 0
|
||||
|
||||
# update line status
|
||||
line.status = 'success'
|
||||
line.save()
|
||||
invoice.refresh_from_db()
|
||||
assert invoice.total_amount == 10
|
||||
invoice2.refresh_from_db()
|
||||
assert invoice2.total_amount == 0
|
||||
|
||||
# update other field
|
||||
line.unit_amount = 12 # total amount is wrong
|
||||
line.save()
|
||||
invoice.refresh_from_db()
|
||||
assert invoice.total_amount == 10
|
||||
invoice2.refresh_from_db()
|
||||
assert invoice2.total_amount == 0
|
||||
|
||||
# update total_amount
|
||||
line.total_amount = 12
|
||||
line.save()
|
||||
invoice.refresh_from_db()
|
||||
assert invoice.total_amount == 12
|
||||
invoice2.refresh_from_db()
|
||||
assert invoice2.total_amount == 0
|
||||
|
||||
# create line with invoice, status success
|
||||
line2 = line_model.objects.create(
|
||||
invoice=invoice,
|
||||
quantity=1,
|
||||
unit_amount=20,
|
||||
total_amount=20,
|
||||
status='success',
|
||||
)
|
||||
invoice.refresh_from_db()
|
||||
assert invoice.total_amount == 32
|
||||
invoice2.refresh_from_db()
|
||||
assert invoice2.total_amount == 0
|
||||
|
||||
# change invoice
|
||||
line2.invoice = invoice2
|
||||
line2.save()
|
||||
invoice.refresh_from_db()
|
||||
assert invoice.total_amount == 12
|
||||
invoice2.refresh_from_db()
|
||||
assert invoice2.total_amount == 20
|
||||
|
||||
# delete line
|
||||
line2.delete()
|
||||
invoice.refresh_from_db()
|
||||
assert invoice.total_amount == 12
|
||||
invoice2.refresh_from_db()
|
||||
assert invoice2.total_amount == 0
|
||||
|
||||
# create line without invoice, status success
|
||||
line3 = line_model.objects.create(
|
||||
quantity=1,
|
||||
unit_amount=20,
|
||||
total_amount=20,
|
||||
status='success',
|
||||
)
|
||||
invoice.refresh_from_db()
|
||||
assert invoice.total_amount == 12
|
||||
invoice2.refresh_from_db()
|
||||
assert invoice2.total_amount == 0
|
||||
|
||||
# set invoice
|
||||
line3.invoice = invoice
|
||||
line3.save()
|
||||
invoice.refresh_from_db()
|
||||
assert invoice.total_amount == 32
|
||||
invoice2.refresh_from_db()
|
||||
assert invoice2.total_amount == 0
|
||||
|
||||
# reset invoice
|
||||
line3.invoice = None
|
||||
line3.save()
|
||||
invoice.refresh_from_db()
|
||||
assert invoice.total_amount == 12
|
||||
invoice2.refresh_from_db()
|
||||
assert invoice2.total_amount == 0
|
||||
# no changes
|
||||
line3.save()
|
||||
invoice.refresh_from_db()
|
||||
assert invoice.total_amount == 12
|
||||
invoice2.refresh_from_db()
|
||||
assert invoice2.total_amount == 0
|
||||
|
||||
# delete line
|
||||
line3.delete()
|
||||
invoice.refresh_from_db()
|
||||
assert invoice.total_amount == 12
|
||||
invoice2.refresh_from_db()
|
||||
assert invoice2.total_amount == 0
|
Loading…
Reference in New Issue