lingo/lingo/invoicing/views.py

551 lines
18 KiB
Python

# lingo - payment and billing system
# Copyright (C) 2022 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import collections
import datetime
import json
from django.contrib import messages
from django.contrib.postgres.fields import JSONField
from django.db import transaction
from django.db.models import CharField, Count, IntegerField, OuterRef, Subquery, Value
from django.db.models.functions import Coalesce
from django.http import Http404, HttpResponse
from django.shortcuts import get_object_or_404, redirect
from django.urls import reverse, reverse_lazy
from django.utils.translation import ugettext_lazy as _
from django.utils.translation import ungettext
from django.views.generic import (
CreateView,
DeleteView,
DetailView,
FormView,
ListView,
TemplateView,
UpdateView,
)
from lingo.agendas.models import Agenda
from lingo.invoicing.forms import CampaignForm, DraftInvoiceLineFilterSet, InvoiceLineFilterSet
from lingo.invoicing.models import (
Campaign,
DraftInvoice,
DraftInvoiceLine,
InjectedLine,
InvoiceLine,
Pool,
Regie,
RegieImportError,
)
from lingo.pricing.forms import ImportForm
def is_ajax(request):
return request.headers.get('x-requested-with') == 'XMLHttpRequest'
def import_regies(data):
results = collections.defaultdict(list)
with transaction.atomic():
regies = data.get('regies', [])
for regie in regies:
created, regie_obj = Regie.import_json(regie)
if created:
results['created'].append(regie_obj)
else:
results['updated'].append(regie_obj)
return results
class HomeView(TemplateView):
template_name = 'lingo/invoicing/manager_home.html'
home = HomeView.as_view()
class RegiesListView(ListView):
template_name = 'lingo/invoicing/manager_regie_list.html'
model = Regie
regies_list = RegiesListView.as_view()
class RegieAddView(CreateView):
template_name = 'lingo/invoicing/manager_regie_form.html'
model = Regie
fields = ['label', 'description', 'cashier_role']
def get_success_url(self):
return reverse('lingo-manager-invoicing-regie-detail', args=[self.object.pk])
regie_add = RegieAddView.as_view()
class RegieDetailView(DetailView):
template_name = 'lingo/invoicing/manager_regie_detail.html'
model = Regie
def get_context_data(self, **kwargs):
kwargs['regie'] = self.object
kwargs['agendas'] = Agenda.objects.filter(regie=self.object)
return super().get_context_data(**kwargs)
regie_detail = RegieDetailView.as_view()
class RegieEditView(UpdateView):
template_name = 'lingo/invoicing/manager_regie_form.html'
model = Regie
fields = ['label', 'description', 'cashier_role', 'counter_name', 'number_format']
def get_success_url(self):
return reverse('lingo-manager-invoicing-regie-detail', args=[self.object.pk])
regie_edit = RegieEditView.as_view()
class RegieDeleteView(DeleteView):
template_name = 'lingo/manager_confirm_delete.html'
model = Regie
def get_success_url(self):
return reverse('lingo-manager-invoicing-regie-list')
regie_delete = RegieDeleteView.as_view()
class RegiesExportView(ListView):
model = Regie
def get(self, request, *args, **kwargs):
response = HttpResponse(content_type='application/json')
today = datetime.date.today()
attachment = 'attachment; filename="export_regies_{}.json"'.format(today.strftime('%Y%m%d'))
response['Content-Disposition'] = attachment
json.dump({'regies': [regie.export_json() for regie in self.get_queryset()]}, response, indent=2)
return response
regies_export = RegiesExportView.as_view()
class RegiesImportView(FormView):
form_class = ImportForm
template_name = 'lingo/invoicing/manager_import.html'
success_url = reverse_lazy('lingo-manager-invoicing-regie-list')
def form_valid(self, form):
try:
config_json = json.loads(self.request.FILES['config_json'].read())
except ValueError:
form.add_error('config_json', _('File is not in the expected JSON format.'))
return self.form_invalid(form)
try:
results = import_regies(config_json)
except RegieImportError as exc:
form.add_error('config_json', '%s' % exc)
return self.form_invalid(form)
import_messages = {
'create': lambda x: ungettext(
'A regie was created.',
'%(count)d regies were created.',
x,
),
'update': lambda x: ungettext(
'A regie was updated.',
'%(count)d regie were updated.',
x,
),
}
create_message = _('No regie created.')
update_message = _('No regie updated.')
created = len(results.get('created', []))
updated = len(results.get('updated', []))
if created:
create_message = import_messages.get('create')(created) % {'count': created}
if updated:
update_message = import_messages.get('update')(updated) % {'count': updated}
message = "%s %s" % (create_message, update_message)
messages.info(self.request, message)
return super().form_valid(form)
regies_import = RegiesImportView.as_view()
class CampaignListView(ListView):
template_name = 'lingo/invoicing/manager_campaign_list.html'
model = Campaign
campaign_list = CampaignListView.as_view()
class CampaignAddView(CreateView):
template_name = 'lingo/invoicing/manager_campaign_form.html'
model = Campaign
form_class = CampaignForm
def get_success_url(self):
return reverse('lingo-manager-invoicing-campaign-detail', args=[self.object.pk])
campaign_add = CampaignAddView.as_view()
class CampaignDetailView(DetailView):
template_name = 'lingo/invoicing/manager_campaign_detail.html'
model = Campaign
def get_context_data(self, **kwargs):
draft_lines = DraftInvoiceLine.objects.filter(pool=OuterRef('pk')).order_by().values('pool')
count_draft_error = draft_lines.filter(status='error').annotate(count=Count('pool')).values('count')
count_draft_warning = (
draft_lines.filter(status='warning').annotate(count=Count('pool')).values('count')
)
count_draft_success = (
draft_lines.filter(status='success').annotate(count=Count('pool')).values('count')
)
lines = InvoiceLine.objects.filter(pool=OuterRef('pk')).order_by().values('pool')
count_error = (
lines.filter(status='error', error_status='').annotate(count=Count('pool')).values('count')
)
count_warning = lines.filter(status='warning').annotate(count=Count('pool')).values('count')
count_success = lines.filter(status='success').annotate(count=Count('pool')).values('count')
kwargs['pools'] = self.object.pool_set.annotate(
draft_error_count=Coalesce(Subquery(count_draft_error, output_field=IntegerField()), Value(0)),
draft_warning_count=Coalesce(
Subquery(count_draft_warning, output_field=IntegerField()), Value(0)
),
draft_success_count=Coalesce(
Subquery(count_draft_success, output_field=IntegerField()), Value(0)
),
error_count=Coalesce(Subquery(count_error, output_field=IntegerField()), Value(0)),
warning_count=Coalesce(Subquery(count_warning, output_field=IntegerField()), Value(0)),
success_count=Coalesce(Subquery(count_success, output_field=IntegerField()), Value(0)),
).order_by('-created_at')
kwargs['has_running_pool'] = any(p.status in ['registered', 'running'] for p in kwargs['pools'])
kwargs['has_real_pool'] = any(not p.draft for p in kwargs['pools'])
return super().get_context_data(**kwargs)
campaign_detail = CampaignDetailView.as_view()
class CampaignEditView(UpdateView):
template_name = 'lingo/invoicing/manager_campaign_form.html'
model = Campaign
form_class = CampaignForm
def get_queryset(self):
return (
super()
.get_queryset()
.exclude(pool__draft=False)
.exclude(pool__status__in=['registered', 'running'])
)
def get_success_url(self):
return reverse('lingo-manager-invoicing-campaign-detail', args=[self.object.pk])
campaign_edit = CampaignEditView.as_view()
class CampaignDeleteView(DeleteView):
template_name = 'lingo/manager_confirm_delete.html'
model = Campaign
def get_queryset(self):
return (
super()
.get_queryset()
.exclude(pool__draft=False)
.exclude(pool__status__in=['registered', 'running'])
)
def delete(self, request, *args, **kwargs):
self.object = self.get_object()
DraftInvoiceLine.objects.filter(pool__campaign=self.object).delete()
DraftInvoice.objects.filter(pool__campaign=self.object).delete()
Pool.objects.filter(campaign=self.object).delete()
return super().delete(request, *args, **kwargs)
def get_success_url(self):
return reverse('lingo-manager-invoicing-campaign-list')
campaign_delete = CampaignDeleteView.as_view()
class PoolDetailView(DetailView):
template_name = 'lingo/invoicing/manager_pool_detail.html'
model = Pool
pk_url_kwarg = 'pool_pk'
def dispatch(self, request, *args, **kwargs):
self.campaign = get_object_or_404(Campaign, pk=kwargs['pk'])
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
return self.campaign.pool_set.all()
def get_context_data(self, **kwargs):
kwargs['object'] = self.campaign
kwargs['pool'] = self.object
line_model = InvoiceLine
if self.object.draft:
line_model = DraftInvoiceLine
all_lines = line_model.objects.filter(pool=self.object)
self.object.error_count = len(
[line for line in all_lines if line.status == 'error' and not getattr(line, 'error_status', '')]
)
self.object.warning_count = len([line for line in all_lines if line.status == 'warning'])
self.object.success_count = len([line for line in all_lines if line.status == 'success'])
kwargs['lines'] = (
all_lines.filter(invoice__isnull=False)
.select_related('invoice')
.order_by('invoice__pk', 'user_external_id', 'pk')
)
return super().get_context_data(**kwargs)
pool_detail = PoolDetailView.as_view()
class PoolJournalView(DetailView):
template_name = 'lingo/invoicing/manager_pool_journal.html'
model = Pool
pk_url_kwarg = 'pool_pk'
def dispatch(self, request, *args, **kwargs):
self.campaign = get_object_or_404(Campaign, pk=kwargs['pk'])
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
return self.campaign.pool_set.all()
def get_context_data(self, **kwargs):
kwargs['object'] = self.campaign
kwargs['pool'] = self.object
line_model = InvoiceLine
filter_model = InvoiceLineFilterSet
if self.object.draft:
line_model = DraftInvoiceLine
filter_model = DraftInvoiceLineFilterSet
all_lines = line_model.objects.filter(pool=self.object).order_by('pk').select_related('invoice')
self.object.error_count = len(
[line for line in all_lines if line.status == 'error' and not getattr(line, 'error_status', '')]
)
self.object.warning_count = len([line for line in all_lines if line.status == 'warning'])
self.object.success_count = len([line for line in all_lines if line.status == 'success'])
data = self.request.GET or None
line_filterset = filter_model(data=data, queryset=all_lines, pool=self.object)
kwargs['lines'] = line_filterset.qs if data and [v for v in data.values() if v] else all_lines
kwargs['filterset'] = line_filterset
return super().get_context_data(**kwargs)
pool_journal = PoolJournalView.as_view()
class PoolAddView(FormView):
template_name = 'lingo/invoicing/manager_pool_add.html'
def dispatch(self, request, *args, **kwargs):
self.object = get_object_or_404(
Campaign.objects.exclude(pool__draft=False).exclude(pool__status__in=['registered', 'running']),
pk=kwargs['pk'],
)
return super().dispatch(request, *args, **kwargs)
def get_context_data(self, **kwargs):
kwargs['form'] = None
kwargs['object'] = self.object
return super().get_context_data(**kwargs)
def post(self, request, *args, **kwargs):
self.object.generate()
return redirect(
'%s#open:pools' % reverse('lingo-manager-invoicing-campaign-detail', args=[self.object.pk])
)
pool_add = PoolAddView.as_view()
class PoolPromoteView(FormView):
template_name = 'lingo/invoicing/manager_pool_promote.html'
def dispatch(self, request, *args, **kwargs):
self.object = get_object_or_404(
Pool,
campaign__id=kwargs['pk'],
pk=kwargs['pool_pk'],
draft=True,
status='completed',
)
if not self.object.is_last:
raise Http404
return super().dispatch(request, *args, **kwargs)
def get_context_data(self, **kwargs):
kwargs['form'] = None
kwargs['object'] = self.object.campaign
kwargs['pool'] = self.object
return super().get_context_data(**kwargs)
def post(self, request, *args, **kwargs):
self.object.promote()
return redirect(
'%s#open:pools'
% reverse('lingo-manager-invoicing-campaign-detail', args=[self.object.campaign.pk])
)
pool_promote = PoolPromoteView.as_view()
class PoolDeleteView(DeleteView):
template_name = 'lingo/manager_confirm_delete.html'
model = Pool
pk_url_kwarg = 'pool_pk'
def dispatch(self, request, *args, **kwargs):
self.campaign = get_object_or_404(
Campaign.objects.exclude(pool__draft=False).exclude(pool__status__in=['registered', 'running']),
pk=kwargs['pk'],
)
return super().dispatch(request, *args, **kwargs)
def get_queryset(self):
return self.campaign.pool_set.filter(draft=True)
def delete(self, request, *args, **kwargs):
self.object = self.get_object()
DraftInvoiceLine.objects.filter(pool=self.object).delete()
DraftInvoice.objects.filter(pool=self.object).delete()
return super().delete(request, *args, **kwargs)
def get_success_url(self):
return '%s#open:pools' % reverse('lingo-manager-invoicing-campaign-detail', args=[self.campaign.pk])
pool_delete = PoolDeleteView.as_view()
class LineSetErrorStatusView(DetailView):
model = InvoiceLine
pk_url_kwarg = 'line_pk'
template_name = 'lingo/invoicing/manager_line_detail_fragment.html'
def get_queryset(self):
return (
super()
.get_queryset()
.filter(status='error', pool=self.kwargs['pool_pk'], pool__campaign=self.kwargs['pk'])
)
def get_context_data(self, **kwargs):
kwargs['object'] = self.object.pool.campaign
kwargs['pool'] = self.object.pool
kwargs['line'] = self.object
return super().get_context_data(**kwargs)
def get(self, request, *args, **kwargs):
self.object = self.get_object()
error_status = kwargs['status']
if error_status == 'reset':
self.object.error_status = ''
elif error_status == 'ignore':
self.object.error_status = 'ignored'
elif error_status == 'fix':
self.object.error_status = 'fixed'
else:
raise Http404
self.object.save()
if is_ajax(self.request):
context = self.get_context_data(object=self.object)
return self.render_to_response(context)
return redirect(
reverse('lingo-manager-invoicing-pool-journal', args=[kwargs['pk'], kwargs['pool_pk']])
)
line_set_error_status = LineSetErrorStatusView.as_view()
class NonInvoicedLineListView(ListView):
template_name = 'lingo/invoicing/manager_non_invoiced_line_list.html'
paginate_by = 100
def get_queryset(self):
fields = [
'pk',
'event_date',
'slug',
'label',
'quantity',
'unit_amount',
'total_amount',
'user_external_id',
'payer_external_id',
'user_name',
'event',
'pricing_data',
'status',
'pool_id',
]
qs1 = InvoiceLine.objects.filter(status='error', error_status='').values(*fields)
qs2 = (
InjectedLine.objects.filter(invoiceline__isnull=True)
.annotate(
user_name=Value('', output_field=CharField()),
event=Value({}, output_field=JSONField()),
pricing_data=Value({}, output_field=JSONField()),
status=Value('injected', output_field=CharField()),
pool_id=Value(0, output_field=IntegerField()),
)
.values(*fields)
)
qs = qs1.union(qs2).order_by('event_date', 'user_external_id', 'label', 'pk')
return qs
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
pools = Pool.objects.filter(draft=False).in_bulk()
for line in context['object_list']:
if line['status'] == 'error':
line['error_display'] = InvoiceLine(
status=line['status'], pricing_data=line['pricing_data']
).get_error_display()
line['campaign_id'] = pools[line['pool_id']].campaign_id
return context
non_invoiced_line_list = NonInvoicedLineListView.as_view()