wip/71528-generate-draft-invoices (#71528) #1

Merged
lguerin merged 4 commits from wip/71528-generate-draft-invoices into main 2022-12-09 17:39:05 +01:00
14 changed files with 2199 additions and 21 deletions

View File

@ -10,6 +10,9 @@ recursive-include lingo/templates *.html *.txt
recursive-include lingo/manager/templates *.html *.txt
recursive-include lingo/pricing/templates *.html *.txt
# sql (migrations)
recursive-include lingo/invoicing/sql *.sql
include COPYING README
include MANIFEST.in
include VERSION

View File

@ -132,14 +132,41 @@ def get_events(event_slugs, error_message=None, error_message_with_details=None)
return result['data']
def get_subscriptions(agenda_slug, user_external_id):
result = get_chrono_json(
'api/agenda/%s/subscription/?user_external_id=%s' % (agenda_slug, user_external_id)
)
if not result or not result.get('data'):
def get_subscriptions(agenda_slug, user_external_id=None, date_start=None, date_end=None):
url = 'api/agenda/%s/subscription/' % agenda_slug
params = {}
if user_external_id:
params['user_external_id'] = user_external_id
if date_start:
params['date_start'] = date_start
if date_end:
params['date_end'] = date_end
if params:
url += '?%s' % '&'.join(['%s=%s' % (k, v) for k, v in params.items()])
result = get_chrono_json(url)
if not result:
raise ChronoError(_('Unable to get subscription details'))
if result.get('err'):
raise ChronoError(_('Unable to get subscription details (%s)') % result['err_desc'])
if not result.get('data'):
if 'data' not in result:
raise ChronoError(_('Unable to get subscription details'))
return result['data']
def get_check_status(agenda_slugs, user_external_id, date_start, date_end):
result = get_chrono_json(
'api/agendas/events/check-status/?user_external_id=%s&agendas=%s&date_start=%s&date_end=%s'
% (
user_external_id,
','.join(agenda_slugs),
date_start,
date_end,
)
)
if not result:
raise ChronoError(_('Unable to get check status'))
if result.get('err'):
raise ChronoError(_('Unable to get check status (%s)') % result['err_desc'])
if 'data' not in result:
raise ChronoError(_('Unable to get check status'))
return result['data']

View File

View File

@ -0,0 +1,45 @@
# lingo - payment and billing system
# Copyright (C) 2022 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
from django.core.management.base import BaseCommand, CommandError
from lingo.invoicing.utils import generate_invoices
class Command(BaseCommand):
help = 'Generate invoicing for a period'
def add_arguments(self, parser):
parser.add_argument('date_start')
parser.add_argument('date_end')
def handle(self, *args, **options):
try:
date_start = datetime.datetime.fromisoformat(options['date_start']).date()
except ValueError:
raise CommandError('Bad value "%s" for date_start' % options['date_start'])
try:
date_end = datetime.datetime.fromisoformat(options['date_end']).date()
except ValueError:
raise CommandError('Bad value "%s" for date_end' % options['date_end'])
generate_invoices(date_start=date_start, date_end=date_end)
self.stdout.write(
self.style.SUCCESS('Invoicing generation OK (start: %s, end: %s)' % (date_start, date_end))
)

View File

@ -0,0 +1,124 @@
import django.contrib.postgres.fields.jsonb
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('invoicing', '0001_initial'),
]
operations = [
migrations.CreateModel(
name='Invoice',
fields=[
(
'id',
models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'),
),
('label', models.CharField(max_length=300, verbose_name='Label')),
('total_amount', models.DecimalField(decimal_places=2, max_digits=9, default=0)),
('date_issue', models.DateField(verbose_name='Issue date')),
('payer', models.CharField(max_length=300, verbose_name='Payer')),
(
'regie',
models.ForeignKey(
on_delete=django.db.models.deletion.PROTECT,
to='invoicing.Regie',
),
),
],
options={
'abstract': False,
},
),
migrations.CreateModel(
name='InvoiceLine',
fields=[
(
'id',
models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'),
),
('slug', models.SlugField(max_length=250)),
('label', models.CharField(max_length=260)),
('quantity', models.FloatField()),
('unit_amount', models.DecimalField(decimal_places=2, max_digits=9)),
('total_amount', models.DecimalField(decimal_places=2, max_digits=9)),
('user_external_id', models.CharField(max_length=250)),
('payer_external_id', models.CharField(max_length=250)),
('event', django.contrib.postgres.fields.jsonb.JSONField(default=dict)),
('pricing_data', django.contrib.postgres.fields.jsonb.JSONField(default=dict)),
(
'status',
models.CharField(choices=[('success', 'Success'), ('error', 'Error')], max_length=10),
),
(
'invoice',
models.ForeignKey(
null=True,
on_delete=django.db.models.deletion.PROTECT,
to='invoicing.Invoice',
related_name='lines',
),
),
],
options={
'abstract': False,
},
),
migrations.CreateModel(
name='DraftInvoice',
fields=[
(
'id',
models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'),
),
('label', models.CharField(max_length=300, verbose_name='Label')),
('total_amount', models.DecimalField(decimal_places=2, max_digits=9, default=0)),
('date_issue', models.DateField(verbose_name='Issue date')),
('payer', models.CharField(max_length=300, verbose_name='Payer')),
(
'regie',
models.ForeignKey(on_delete=django.db.models.deletion.PROTECT, to='invoicing.Regie'),
),
],
options={
'abstract': False,
},
),
migrations.CreateModel(
name='DraftInvoiceLine',
fields=[
(
'id',
models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'),
),
('slug', models.SlugField(max_length=250)),
('label', models.CharField(max_length=260)),
('quantity', models.FloatField()),
('unit_amount', models.DecimalField(decimal_places=2, max_digits=9)),
('total_amount', models.DecimalField(decimal_places=2, max_digits=9)),
('user_external_id', models.CharField(max_length=250)),
('payer_external_id', models.CharField(max_length=250)),
('event', django.contrib.postgres.fields.jsonb.JSONField(default=dict)),
('pricing_data', django.contrib.postgres.fields.jsonb.JSONField(default=dict)),
(
'status',
models.CharField(choices=[('success', 'Success'), ('error', 'Error')], max_length=10),
),
(
'invoice',
models.ForeignKey(
null=True,
on_delete=django.db.models.deletion.PROTECT,
to='invoicing.DraftInvoice',
related_name='lines',
),
),
],
options={
'abstract': False,
},
),
]

View File

@ -0,0 +1,27 @@
import os
from django.db import migrations
with open(
os.path.join(
os.path.dirname(os.path.realpath(__file__)),
'..',
'sql',
'invoice_triggers_for_amount.sql',
)
) as sql_file:
sql_triggers = sql_file.read()
sql_forwards = sql_triggers
class Migration(migrations.Migration):
dependencies = [
('invoicing', '0002_invoice'),
]
operations = [
migrations.RunSQL(sql=sql_forwards, reverse_sql=migrations.RunSQL.noop),
]

View File

@ -17,6 +17,7 @@
import copy
from django.contrib.auth.models import Group
from django.contrib.postgres.fields import JSONField
from django.db import models
from django.utils.text import slugify
from django.utils.translation import gettext_lazy as _
@ -84,3 +85,53 @@ class Regie(models.Model):
regie, created = cls.objects.update_or_create(slug=data['slug'], defaults=data)
return created, regie
class AbstractInvoice(models.Model):
label = models.CharField(_('Label'), max_length=300)
total_amount = models.DecimalField(max_digits=9, decimal_places=2, default=0)
lguerin marked this conversation as resolved Outdated
Outdated
Review

On devra gérer les notions de montant total et montant dû (reste à payer) sur les factures, je pense qu'on peut déjà prévoir l'affaire et renommer "amount" en "total_amount".

On devra gérer les notions de montant total et montant dû (reste à payer) sur les factures, je pense qu'on peut déjà prévoir l'affaire et renommer "amount" en "total_amount".

ok je fais ça

ok je fais ça
date_issue = models.DateField(_('Issue date'))
regie = models.ForeignKey(Regie, on_delete=models.PROTECT)
payer = models.CharField(_('Payer'), max_length=300)
class Meta:
abstract = True
class DraftInvoice(AbstractInvoice):
pass
class Invoice(AbstractInvoice):
pass
class AbstractInvoiceLine(models.Model):
slug = models.SlugField(max_length=250)
label = models.CharField(max_length=260)
quantity = models.FloatField()
unit_amount = models.DecimalField(max_digits=9, decimal_places=2)
total_amount = models.DecimalField(max_digits=9, decimal_places=2)
user_external_id = models.CharField(max_length=250)
payer_external_id = models.CharField(max_length=250)
lguerin marked this conversation as resolved Outdated
Outdated
Review

On parle ici adult_external_id et dans la facture de payer... c'est pas la même chose ? On pourrait pas déjà appeler ça "payer" ici aussi ?

On parle ici adult_external_id et dans la facture de payer... c'est pas la même chose ? On pourrait pas déjà appeler ça "payer" ici aussi ?

dans tarification, c'est adult_external_id, mais peu importe on peut nommer ça payer pour que ce soit plus explicite

dans tarification, c'est adult_external_id, mais peu importe on peut nommer ça payer pour que ce soit plus explicite
event = JSONField(default=dict)
pricing_data = JSONField(default=dict)
status = models.CharField(
max_length=10,
choices=[
('success', _('Success')),
('error', _('Error')),
],
)
class Meta:
abstract = True
class DraftInvoiceLine(AbstractInvoiceLine):
invoice = models.ForeignKey(DraftInvoice, on_delete=models.PROTECT, null=True, related_name='lines')
class InvoiceLine(AbstractInvoiceLine):
invoice = models.ForeignKey(Invoice, on_delete=models.PROTECT, null=True, related_name='lines')

j'ai repris les models proposés dans #69728, avec quelques ajustements

j'ai repris les models proposés dans #69728, avec quelques ajustements

View File

@ -0,0 +1,44 @@
CREATE OR REPLACE FUNCTION set_invoice_total_amount() RETURNS TRIGGER AS $$
DECLARE
invoice_ids integer[];
BEGIN
IF TG_OP = 'INSERT' THEN
invoice_ids := ARRAY[NEW.invoice_id];
ELSIF TG_OP = 'DELETE' THEN
invoice_ids := ARRAY[OLD.invoice_id];
ELSIF TG_OP = 'UPDATE' THEN
invoice_ids := ARRAY[NEW.invoice_id, OLD.invoice_id];
END IF;
EXECUTE 'UPDATE ' || substring(TG_TABLE_NAME for length(TG_TABLE_NAME) - 4) || ' i
SET total_amount = COALESCE(
(
SELECT SUM(l.total_amount)
FROM ' || TG_TABLE_NAME || ' l
WHERE l.invoice_id = i.id
AND l.status = ''success''
), 0
)
WHERE id = ANY($1);' USING invoice_ids;
IF TG_OP = 'DELETE' THEN
RETURN OLD;
ELSE
RETURN NEW;
END IF;
END;
$$ LANGUAGE plpgsql;
DROP TRIGGER IF EXISTS set_draftinvoice_total_amount_trg ON invoicing_draftinvoiceline;
CREATE TRIGGER set_draftinvoice_total_amount_trg
AFTER INSERT OR UPDATE OF total_amount, invoice_id, status OR DELETE ON invoicing_draftinvoiceline
FOR EACH ROW
EXECUTE PROCEDURE set_invoice_total_amount();
DROP TRIGGER IF EXISTS set_invoice_total_amount_trg ON invoicing_invoiceline;
CREATE TRIGGER set_invoice_total_amount_trg
AFTER INSERT OR UPDATE OF total_amount, invoice_id, status OR DELETE ON invoicing_invoiceline
FOR EACH ROW
EXECUTE PROCEDURE set_invoice_total_amount();

246
lingo/invoicing/utils.py Normal file
View File

@ -0,0 +1,246 @@
# lingo - payment and billing system
# Copyright (C) 2022 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import collections
import datetime
from django.test.client import RequestFactory
from django.utils.translation import ugettext_lazy as _
from lingo.agendas.chrono import get_check_status, get_subscriptions
from lingo.agendas.models import Agenda
from lingo.invoicing.models import DraftInvoice, DraftInvoiceLine
from lingo.pricing.models import AgendaPricing, AgendaPricingNotFound, PricingError
def get_agendas(date_start, date_end):
agendas_pricings = AgendaPricing.objects.filter(flat_fee_schedule=False).extra(
where=["(date_start, date_end) OVERLAPS (%s, %s)"], params=[date_start, date_end]
)
return Agenda.objects.filter(pk__in=agendas_pricings.values('agendas')).order_by('pk')
def get_all_subscriptions(agendas, date_start, date_end):
# result:
# {
# 'user_id': {
# 'agenda_slug': [sub1, sub2, ...],
# 'agenda_slug2': [sub1, sub2, ...],
# },
# ...
# }
all_subscriptions = {}
for agenda in agendas:
subscriptions = get_subscriptions(agenda_slug=agenda.slug, date_start=date_start, date_end=date_end)
for subscription in subscriptions:
if subscription['user_external_id'] not in all_subscriptions:
all_subscriptions[subscription['user_external_id']] = {}
user_subs = all_subscriptions[subscription['user_external_id']]
if agenda.slug not in user_subs:
user_subs[agenda.slug] = []
user_subs[agenda.slug].append(subscription)
return all_subscriptions
def get_invoice_lines_for_user(
agendas, agendas_pricings, user_external_id, subscriptions, date_start, date_end
):
def get_agenda_pricing(agendas_pricings_for_agenda, date_event):
# same logic as AgendaPricing.get_agenda_pricing
for agenda_pricing in agendas_pricings_for_agenda:
if agenda_pricing.date_start > date_event:
continue
if agenda_pricing.date_end <= date_event:
continue
return agenda_pricing
raise AgendaPricingNotFound
def get_subscription(subscriptions, event_date):
# get subscription matching event_date
for subscription in subscriptions:
sub_start_date = datetime.date.fromisoformat(subscription['date_start'])
sub_end_date = datetime.date.fromisoformat(subscription['date_end'])
if sub_start_date > event_date:
continue
if sub_end_date <= event_date:
continue
return subscription
if not agendas or not subscriptions:
return []
agendas_pricings_by_agendas = collections.defaultdict(list)
for agenda_pricing in agendas_pricings:
if agenda_pricing.flat_fee_schedule:
continue
for agenda in agenda_pricing.agendas.all():
agendas_pricings_by_agendas[agenda.slug].append(agenda_pricing)
# get check status for user_external_id, on agendas, for the period
check_status_list = get_check_status(
agenda_slugs=list(subscriptions.keys()),
user_external_id=user_external_id,
date_start=date_start,
date_end=date_end,
request = RequestFactory().get('/')  # XXX

request qui in fine arrive sur pricing.get_extra_variables, pour un RequestContext pour le rendu de gabarits, pour que le context processor qui fournit la variable "cards" soit disponible; c'est bien ça ?

Peut-être on pourrait ne passer request nulle part et avoir cet appel à RequestFactory directement dans pricing.get_extra_variables ?

> request = RequestFactory().get('/') # XXX request qui in fine arrive sur pricing.get_extra_variables, pour un RequestContext pour le rendu de gabarits, pour que le context processor qui fournit la variable "cards" soit disponible; c'est bien ça ? Peut-être on pourrait ne passer request nulle part et avoir cet appel à RequestFactory directement dans pricing.get_extra_variables ?

Je me demandais, sans avoir vérifié, si en terme de perfs c'était consommateur d'instancier RequestFactory à chaque ligne de facturation.

Je me demandais, sans avoir vérifié, si en terme de perfs c'était consommateur d'instancier RequestFactory à chaque ligne de facturation.
)
request = RequestFactory().get('/') # XXX
agendas_by_slug = {a.slug: a for a in agendas}
# build lines from check status
lines = []
for check_status in check_status_list:
serialized_event = check_status['event']
event_date = datetime.datetime.fromisoformat(serialized_event['start_datetime']).date()
event_slug = '%s@%s' % (serialized_event['agenda'], serialized_event['slug'])
if serialized_event['agenda'] not in subscriptions:
# should not happen, check-status endpoint is based on real subscriptions
continue
serialized_subscription = get_subscription(
subscriptions=subscriptions[serialized_event['agenda']],
event_date=event_date,
)
if not serialized_subscription:
# should not happen, check-status endpoint is based on real subscriptions
continue
agenda = agendas_by_slug[serialized_event['agenda']]
try:
agenda_pricing = get_agenda_pricing(agendas_pricings_by_agendas.get(agenda.slug), event_date)
pricing_data = agenda_pricing.get_pricing_data_for_event(
request=request,
agenda=agenda,
event=serialized_event,
subscription=serialized_subscription,
check_status=check_status['check_status'],
booking=check_status['booking'],
user_external_id=user_external_id,
adult_external_id=user_external_id, # XXX
)
        # XXX error, warning, or ignore ?

Je pense que la politique globale pour le moment pourrait être d'arrêter sur une erreur; mais dessous pour gérer PricingError il y a ajout de l'info d'erreur dans une DraftInvoiceLine et peut-être que la même chose pourrait avoir lieu ici.

L'inconvénient si je capte bien c'est qu'on va avoir cette ligne d'erreur répétée pour toutes les présences, je dirais que c'est ok.

> # XXX error, warning, or ignore ? Je pense que la politique globale pour le moment pourrait être d'arrêter sur une erreur; mais dessous pour gérer PricingError il y a ajout de l'info d'erreur dans une DraftInvoiceLine et peut-être que la même chose pourrait avoir lieu ici. L'inconvénient si je capte bien c'est qu'on va avoir cette ligne d'erreur répétée pour toutes les présences, je dirais que c'est ok.

Je réfléchissais à un moyen de rattraper les erreurs (genre un pointage manquant), pour les ajouter à la facturation suivante.
Et du coup, avoir une ligne par erreur, ça permettrait peut-être de retrouver les erreurs pour les rejouer.
(ça vaut pour les PricingError)

Sauf que dans le cas d'une grille tarifaire manquante pour une date donnée, on peut se contenter de noter ça en warning, charge à l'agent de se dire "ha zut mon paramétrage", de corriger et rejouer.

Mais pas sûr, on verra dans des prochains tickets :)

Je réfléchissais à un moyen de rattraper les erreurs (genre un pointage manquant), pour les ajouter à la facturation suivante. Et du coup, avoir une ligne par erreur, ça permettrait peut-être de retrouver les erreurs pour les rejouer. (ça vaut pour les PricingError) Sauf que dans le cas d'une grille tarifaire manquante pour une date donnée, on peut se contenter de noter ça en warning, charge à l'agent de se dire "ha zut mon paramétrage", de corriger et rejouer. Mais pas sûr, on verra dans des prochains tickets :)
except AgendaPricingNotFound:
# no pricing, no invoice
# can happen if pricing model defined only on a part of the requested period
# XXX error, warning, or ignore ?
continue
except PricingError as e:
# XXX explicit each error
# XXX and log context ?
pricing_error = {'error': e.details}
lines.append(
DraftInvoiceLine(
slug=event_slug,
label=serialized_event['label'],
quantity=0,
unit_amount=0,
total_amount=0,
user_external_id=user_external_id,
payer_external_id=user_external_id, # XXX
event=serialized_event,
pricing_data=pricing_error,
status='error',
)
)
else:
# XXX log all context !
lines.append(
DraftInvoiceLine(
slug=event_slug,
label=serialized_event['label'],
quantity=1,
unit_amount=pricing_data['pricing'],
total_amount=pricing_data['pricing'],
user_external_id=user_external_id,
payer_external_id=user_external_id, # XXX
event=serialized_event,
pricing_data=pricing_data,
status='success',
)
)
DraftInvoiceLine.objects.bulk_create(lines)
return lines
def get_all_invoice_lines(agendas, subscriptions, date_start, date_end):
agendas_pricings = (
AgendaPricing.objects.filter(flat_fee_schedule=False)
.extra(where=["(date_start, date_end) OVERLAPS (%s, %s)"], params=[date_start, date_end])
.prefetch_related('agendas', 'pricing__criterias', 'pricing__categories')
)
lines = []
for user_external_id, user_subs in subscriptions.items():
# generate lines for each user
lines += get_invoice_lines_for_user(
agendas=agendas,
agendas_pricings=agendas_pricings,
user_external_id=user_external_id,
subscriptions=user_subs,
date_start=date_start,
date_end=date_end,
)
return lines
def generate_invoices_from_lines(agendas, all_lines, date_start, date_end):
agendas_by_slug = {a.slug: a for a in agendas}
# regroup lines by regie, and by payer_external_id (payer)
lines_by_regie = {}
for line in all_lines:
if line.status != 'success':
# ignore lines in error
continue
agenda_slug = line.event['agenda']
if agenda_slug not in agendas_by_slug:
# should not happen
        # XXX what should we do if regie is not configured ?

On raise et ça s'arrête là, pour quelque chose gros comme ça je pense qu'on n'a pas à collecter toutes les erreurs pour les présenter en une fois, que l'agent peut souffrir de cliquer plusieurs fois et de chaque fois aller corriger une erreur, il ne devrait pas y en avoir trop.

> # XXX what should we do if regie is not configured ? On raise et ça s'arrête là, pour quelque chose gros comme ça je pense qu'on n'a pas à collecter toutes les erreurs pour les présenter en une fois, que l'agent peut souffrir de cliquer plusieurs fois et de chaque fois aller corriger une erreur, il ne devrait pas y en avoir trop.
continue
regie = agendas_by_slug[agenda_slug].regie
if not regie:
# XXX what should we do if regie is not configured ?
continue
if regie.pk not in lines_by_regie:
lines_by_regie[regie.pk] = {}
regie_subs = lines_by_regie[regie.pk]
if line.payer_external_id not in regie_subs:
regie_subs[line.payer_external_id] = []
regie_subs[line.payer_external_id].append(line)
# generate invoices by regie and by payer_external_id (payer)
invoices = []
for regie_id, regie_subs in lines_by_regie.items():
            date_issue=date_end,  # XXX

À voir avec avec Stef comment on détermine la date qui apparait sur la facture ? (si j'ai bien compris l'enjeu).

> date_issue=date_end, # XXX À voir avec avec Stef comment on détermine la date qui apparait sur la facture ? (si j'ai bien compris l'enjeu).

Je pense que pour #71910 je vais ajouter un champ date_issue ailleurs, et setter les factures avec.

Je pense que pour #71910 je vais ajouter un champ date_issue ailleurs, et setter les factures avec.
for payer_external_id, adult_lines in regie_subs.items():
invoice = DraftInvoice.objects.create(
label=_('Invoice from %s to %s') % (date_start, date_end - datetime.timedelta(days=1)),
date_issue=date_end, # XXX
regie_id=regie_id,
payer=payer_external_id,
)
DraftInvoiceLine.objects.filter(pk__in=[line.pk for line in adult_lines]).update(invoice=invoice)
invoices.append(invoice)
return invoices
def generate_invoices(date_start, date_end):
# get agendas with pricing corresponding to the period
agendas = get_agendas(date_start=date_start, date_end=date_end)
# get subscriptions for each agenda, for the period
subscriptions = get_all_subscriptions(agendas=agendas, date_start=date_start, date_end=date_end)
# get invoice lines for all subscribed users, for each agenda in the corresponding period
lines = get_all_invoice_lines(
agendas=agendas, subscriptions=subscriptions, date_start=date_start, date_end=date_end
)
# and generate invoices
generate_invoices_from_lines(agendas=agendas, all_lines=lines, date_start=date_start, date_end=date_end)

View File

@ -507,12 +507,16 @@ class AgendaPricing(models.Model):
def compute_pricing(self, context):
criterias = {}
categories = []
# for each category (ordered)
for category in self.pricing.categories.all().order_by('pricingcriteriacategory__order'):

plus nécessaire depuis #66899

plus nécessaire depuis #66899
# for each category
for category in self.pricing.categories.all():
criterias[category.slug] = None
categories.append(category.slug)
# find the first matching criteria (criterias are ordered)
for criteria in self.pricing.criterias.filter(category=category, default=False):
for criteria in self.pricing.criterias.all():

(pour utiliser le prefetch)

(pour utiliser le prefetch)
if criteria.category_id != category.pk:
continue
if criteria.default:
continue
condition = criteria.compute_condition(context)
if condition:
criterias[category.slug] = criteria.slug
@ -520,7 +524,9 @@ class AgendaPricing(models.Model):
if criterias[category.slug] is not None:
continue
# if no match, take default criteria if only once defined
default_criterias = self.pricing.criterias.filter(category=category, default=True)
default_criterias = [
c for c in self.pricing.criterias.all() if c.default and c.category_id == category.pk
]
if len(default_criterias) > 1:
raise MultipleDefaultCriteriaCondition(details={'category': category.slug})
if not default_criterias:

View File

@ -1,3 +1,4 @@
import datetime
import json
from unittest import mock
@ -8,6 +9,7 @@ from requests.models import Response
from lingo.agendas.chrono import (
ChronoError,
collect_agenda_data,
get_check_status,
get_event,
get_events,
get_subscriptions,
@ -282,12 +284,12 @@ def test_get_events():
def test_get_subscriptions_no_service(settings):
settings.KNOWN_SERVICES = {}
with pytest.raises(ChronoError) as e:
get_subscriptions('foo', 'user:1')
get_subscriptions(agenda_slug='foo')
assert str(e.value) == 'Unable to get subscription details'
settings.KNOWN_SERVICES = {'other': []}
with pytest.raises(ChronoError) as e:
get_subscriptions('foo', 'user:1')
get_subscriptions(agenda_slug='foo')
assert str(e.value) == 'Unable to get subscription details'
@ -295,7 +297,7 @@ def test_get_subscriptions():
with mock.patch('requests.Session.get') as requests_get:
requests_get.side_effect = ConnectionError()
with pytest.raises(ChronoError) as e:
get_subscriptions('foo', 'user:1')
get_subscriptions(agenda_slug='foo')
assert str(e.value) == 'Unable to get subscription details'
with mock.patch('requests.Session.get') as requests_get:
@ -303,7 +305,7 @@ def test_get_subscriptions():
mock_resp.status_code = 500
requests_get.return_value = mock_resp
with pytest.raises(ChronoError) as e:
get_subscriptions('foo', 'user:1')
get_subscriptions(agenda_slug='foo')
assert str(e.value) == 'Unable to get subscription details'
with mock.patch('requests.Session.get') as requests_get:
@ -311,25 +313,148 @@ def test_get_subscriptions():
mock_resp.status_code = 404
requests_get.return_value = mock_resp
with pytest.raises(ChronoError) as e:
get_subscriptions('foo', 'user:1')
get_subscriptions(agenda_slug='foo')
assert str(e.value) == 'Unable to get subscription details'
with mock.patch('requests.Session.get') as requests_get:
requests_get.return_value = MockedRequestResponse(content=json.dumps({'foo': 'bar'}))
with pytest.raises(ChronoError) as e:
get_subscriptions('foo', 'user:1')
get_subscriptions(agenda_slug='foo')
assert str(e.value) == 'Unable to get subscription details'
data = {'data': []}
with mock.patch('requests.Session.get') as requests_get:
requests_get.return_value = MockedRequestResponse(content=json.dumps(data))
with pytest.raises(ChronoError) as e:
get_subscriptions('foo', 'user:1')
assert str(e.value) == 'Unable to get subscription details'
assert requests_get.call_args_list[0][0] == ('api/agenda/foo/subscription/?user_external_id=user:1',)
assert get_subscriptions(agenda_slug='foo') == []
assert requests_get.call_args_list[0][0] == ('api/agenda/foo/subscription/',)
assert requests_get.call_args_list[0][1]['remote_service']['url'] == 'http://chrono.example.org'
data = {'data': ['foo', 'bar']}
with mock.patch('requests.Session.get') as requests_get:
requests_get.return_value = MockedRequestResponse(content=json.dumps(data))
assert get_subscriptions('foo', 'user:1') == ['foo', 'bar']
assert get_subscriptions(agenda_slug='foo') == ['foo', 'bar']
data = {'data': []}
with mock.patch('requests.Session.get') as requests_get:
requests_get.return_value = MockedRequestResponse(content=json.dumps(data))
get_subscriptions(agenda_slug='foo', user_external_id='user:1')
assert requests_get.call_args_list[0][0] == ('api/agenda/foo/subscription/?user_external_id=user:1',)
assert requests_get.call_args_list[0][1]['remote_service']['url'] == 'http://chrono.example.org'
requests_get.reset_mock()
get_subscriptions(agenda_slug='foo', date_start=datetime.date(2022, 9, 1))
assert requests_get.call_args_list[0][0] == ('api/agenda/foo/subscription/?date_start=2022-09-01',)
assert requests_get.call_args_list[0][1]['remote_service']['url'] == 'http://chrono.example.org'
requests_get.reset_mock()
get_subscriptions(agenda_slug='foo', date_end=datetime.date(2022, 10, 1))
assert requests_get.call_args_list[0][0] == ('api/agenda/foo/subscription/?date_end=2022-10-01',)
assert requests_get.call_args_list[0][1]['remote_service']['url'] == 'http://chrono.example.org'
requests_get.reset_mock()
get_subscriptions(
agenda_slug='foo',
user_external_id='user:1',
date_start=datetime.date(2022, 9, 1),
date_end=datetime.date(2022, 10, 1),
)
assert requests_get.call_args_list[0][0] == (
'api/agenda/foo/subscription/?user_external_id=user:1&date_start=2022-09-01&date_end=2022-10-01',
)
assert requests_get.call_args_list[0][1]['remote_service']['url'] == 'http://chrono.example.org'
def test_get_check_status_no_service(settings):
settings.KNOWN_SERVICES = {}
with pytest.raises(ChronoError) as e:
get_check_status(
agenda_slugs=['foo'],
user_external_id='user:1',
date_start=datetime.date(2022, 9, 1),
date_end=datetime.date(2022, 10, 1),
)
assert str(e.value) == 'Unable to get check status'
settings.KNOWN_SERVICES = {'other': []}
with pytest.raises(ChronoError) as e:
get_check_status(
agenda_slugs=['foo'],
user_external_id='user:1',
date_start=datetime.date(2022, 9, 1),
date_end=datetime.date(2022, 10, 1),
)
assert str(e.value) == 'Unable to get check status'
def test_get_check_status():
with mock.patch('requests.Session.get') as requests_get:
requests_get.side_effect = ConnectionError()
with pytest.raises(ChronoError) as e:
get_check_status(
agenda_slugs=['foo', 'bar'],
user_external_id='user:1',
date_start=datetime.date(2022, 9, 1),
date_end=datetime.date(2022, 10, 1),
)
assert str(e.value) == 'Unable to get check status'
with mock.patch('requests.Session.get') as requests_get:
mock_resp = Response()
mock_resp.status_code = 500
requests_get.return_value = mock_resp
with pytest.raises(ChronoError) as e:
get_check_status(
agenda_slugs=['foo', 'bar'],
user_external_id='user:1',
date_start=datetime.date(2022, 9, 1),
date_end=datetime.date(2022, 10, 1),
)
assert str(e.value) == 'Unable to get check status'
with mock.patch('requests.Session.get') as requests_get:
mock_resp = Response()
mock_resp.status_code = 404
requests_get.return_value = mock_resp
with pytest.raises(ChronoError) as e:
get_check_status(
agenda_slugs=['foo', 'bar'],
user_external_id='user:1',
date_start=datetime.date(2022, 9, 1),
date_end=datetime.date(2022, 10, 1),
)
assert str(e.value) == 'Unable to get check status'
with mock.patch('requests.Session.get') as requests_get:
requests_get.return_value = MockedRequestResponse(content=json.dumps({'foo': 'bar'}))
with pytest.raises(ChronoError) as e:
get_check_status(
agenda_slugs=['foo', 'bar'],
user_external_id='user:1',
date_start=datetime.date(2022, 9, 1),
date_end=datetime.date(2022, 10, 1),
)
assert str(e.value) == 'Unable to get check status'
data = {'data': []}
with mock.patch('requests.Session.get') as requests_get:
requests_get.return_value = MockedRequestResponse(content=json.dumps(data))
assert (
get_check_status(
agenda_slugs=['foo', 'bar'],
user_external_id='user:1',
date_start=datetime.date(2022, 9, 1),
date_end=datetime.date(2022, 10, 1),
)
== []
)
assert requests_get.call_args_list[0][0] == (
'api/agendas/events/check-status/?user_external_id=user:1&agendas=foo,bar&date_start=2022-09-01&date_end=2022-10-01',
)
assert requests_get.call_args_list[0][1]['remote_service']['url'] == 'http://chrono.example.org'
data = {'data': ['foo', 'bar']}
with mock.patch('requests.Session.get') as requests_get:
requests_get.return_value = MockedRequestResponse(content=json.dumps(data))
assert get_check_status(
agenda_slugs=['foo', 'bar'],
user_external_id='user:1',
date_start=datetime.date(2022, 9, 1),
date_end=datetime.date(2022, 10, 1),
) == ['foo', 'bar']

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,136 @@
import datetime
import pytest
from lingo.invoicing.models import DraftInvoice, DraftInvoiceLine, Invoice, InvoiceLine, Regie
pytestmark = pytest.mark.django_db
@pytest.mark.parametrize('draft', [True, False])
def test_invoice_total_amount(draft):
regie = Regie.objects.create()
invoice_model = DraftInvoice if draft else Invoice
line_model = DraftInvoiceLine if draft else InvoiceLine
invoice = invoice_model.objects.create(date_issue=datetime.date.today(), regie=regie)
assert invoice.total_amount == 0
invoice2 = invoice_model.objects.create(date_issue=datetime.date.today(), regie=regie)
assert invoice2.total_amount == 0
# line with error status, ignored
line = line_model.objects.create(
invoice=invoice, # with invoice
quantity=0,
unit_amount=0,
total_amount=0,
status='error',
)
invoice.refresh_from_db()
assert invoice.total_amount == 0
invoice2.refresh_from_db()
assert invoice2.total_amount == 0
# update line
line.unit_amount = 10
line.quantity = 1
line.total_amount = 10
line.save()
invoice.refresh_from_db()
# still in error
assert invoice.total_amount == 0
invoice2.refresh_from_db()
assert invoice2.total_amount == 0
# update line status
line.status = 'success'
line.save()
invoice.refresh_from_db()
assert invoice.total_amount == 10
invoice2.refresh_from_db()
assert invoice2.total_amount == 0
# update other field
line.unit_amount = 12 # total amount is wrong
line.save()
invoice.refresh_from_db()
assert invoice.total_amount == 10
invoice2.refresh_from_db()
assert invoice2.total_amount == 0
# update total_amount
line.total_amount = 12
line.save()
invoice.refresh_from_db()
assert invoice.total_amount == 12
invoice2.refresh_from_db()
assert invoice2.total_amount == 0
# create line with invoice, status success
line2 = line_model.objects.create(
invoice=invoice,
quantity=1,
unit_amount=20,
total_amount=20,
status='success',
)
invoice.refresh_from_db()
assert invoice.total_amount == 32
invoice2.refresh_from_db()
assert invoice2.total_amount == 0
# change invoice
line2.invoice = invoice2
line2.save()
invoice.refresh_from_db()
assert invoice.total_amount == 12
invoice2.refresh_from_db()
assert invoice2.total_amount == 20
# delete line
line2.delete()
invoice.refresh_from_db()
assert invoice.total_amount == 12
invoice2.refresh_from_db()
assert invoice2.total_amount == 0
# create line without invoice, status success
line3 = line_model.objects.create(
quantity=1,
unit_amount=20,
total_amount=20,
status='success',
)
invoice.refresh_from_db()
assert invoice.total_amount == 12
invoice2.refresh_from_db()
assert invoice2.total_amount == 0
# set invoice
line3.invoice = invoice
line3.save()
invoice.refresh_from_db()
assert invoice.total_amount == 32
invoice2.refresh_from_db()
assert invoice2.total_amount == 0
# reset invoice
line3.invoice = None
line3.save()
invoice.refresh_from_db()
assert invoice.total_amount == 12
invoice2.refresh_from_db()
assert invoice2.total_amount == 0
# no changes
line3.save()
invoice.refresh_from_db()
assert invoice.total_amount == 12
invoice2.refresh_from_db()
assert invoice2.total_amount == 0
# delete line
line3.delete()
invoice.refresh_from_db()
assert invoice.total_amount == 12
invoice2.refresh_from_db()
assert invoice2.total_amount == 0