agendas: function to get check_status from chrono (#71528)

and new params for get_subscriptions function
This commit is contained in:
Lauréline Guérin 2022-11-28 16:39:04 +01:00
parent ecb8c5adf3
commit 2b506e6269
No known key found for this signature in database
GPG Key ID: 1FAB9B9B4F93D473
2 changed files with 169 additions and 17 deletions

View File

@ -132,14 +132,41 @@ def get_events(event_slugs, error_message=None, error_message_with_details=None)
return result['data']
def get_subscriptions(agenda_slug, user_external_id):
result = get_chrono_json(
'api/agenda/%s/subscription/?user_external_id=%s' % (agenda_slug, user_external_id)
)
if not result or not result.get('data'):
def get_subscriptions(agenda_slug, user_external_id=None, date_start=None, date_end=None):
url = 'api/agenda/%s/subscription/' % agenda_slug
params = {}
if user_external_id:
params['user_external_id'] = user_external_id
if date_start:
params['date_start'] = date_start
if date_end:
params['date_end'] = date_end
if params:
url += '?%s' % '&'.join(['%s=%s' % (k, v) for k, v in params.items()])
result = get_chrono_json(url)
if not result:
raise ChronoError(_('Unable to get subscription details'))
if result.get('err'):
raise ChronoError(_('Unable to get subscription details (%s)') % result['err_desc'])
if not result.get('data'):
if 'data' not in result:
raise ChronoError(_('Unable to get subscription details'))
return result['data']
def get_check_status(agenda_slugs, user_external_id, date_start, date_end):
result = get_chrono_json(
'api/agendas/events/check-status/?user_external_id=%s&agendas=%s&date_start=%s&date_end=%s'
% (
user_external_id,
','.join(agenda_slugs),
date_start,
date_end,
)
)
if not result:
raise ChronoError(_('Unable to get check status'))
if result.get('err'):
raise ChronoError(_('Unable to get check status (%s)') % result['err_desc'])
if 'data' not in result:
raise ChronoError(_('Unable to get check status'))
return result['data']

View File

@ -1,3 +1,4 @@
import datetime
import json
from unittest import mock
@ -8,6 +9,7 @@ from requests.models import Response
from lingo.agendas.chrono import (
ChronoError,
collect_agenda_data,
get_check_status,
get_event,
get_events,
get_subscriptions,
@ -282,12 +284,12 @@ def test_get_events():
def test_get_subscriptions_no_service(settings):
settings.KNOWN_SERVICES = {}
with pytest.raises(ChronoError) as e:
get_subscriptions('foo', 'user:1')
get_subscriptions(agenda_slug='foo')
assert str(e.value) == 'Unable to get subscription details'
settings.KNOWN_SERVICES = {'other': []}
with pytest.raises(ChronoError) as e:
get_subscriptions('foo', 'user:1')
get_subscriptions(agenda_slug='foo')
assert str(e.value) == 'Unable to get subscription details'
@ -295,7 +297,7 @@ def test_get_subscriptions():
with mock.patch('requests.Session.get') as requests_get:
requests_get.side_effect = ConnectionError()
with pytest.raises(ChronoError) as e:
get_subscriptions('foo', 'user:1')
get_subscriptions(agenda_slug='foo')
assert str(e.value) == 'Unable to get subscription details'
with mock.patch('requests.Session.get') as requests_get:
@ -303,7 +305,7 @@ def test_get_subscriptions():
mock_resp.status_code = 500
requests_get.return_value = mock_resp
with pytest.raises(ChronoError) as e:
get_subscriptions('foo', 'user:1')
get_subscriptions(agenda_slug='foo')
assert str(e.value) == 'Unable to get subscription details'
with mock.patch('requests.Session.get') as requests_get:
@ -311,25 +313,148 @@ def test_get_subscriptions():
mock_resp.status_code = 404
requests_get.return_value = mock_resp
with pytest.raises(ChronoError) as e:
get_subscriptions('foo', 'user:1')
get_subscriptions(agenda_slug='foo')
assert str(e.value) == 'Unable to get subscription details'
with mock.patch('requests.Session.get') as requests_get:
requests_get.return_value = MockedRequestResponse(content=json.dumps({'foo': 'bar'}))
with pytest.raises(ChronoError) as e:
get_subscriptions('foo', 'user:1')
get_subscriptions(agenda_slug='foo')
assert str(e.value) == 'Unable to get subscription details'
data = {'data': []}
with mock.patch('requests.Session.get') as requests_get:
requests_get.return_value = MockedRequestResponse(content=json.dumps(data))
with pytest.raises(ChronoError) as e:
get_subscriptions('foo', 'user:1')
assert str(e.value) == 'Unable to get subscription details'
assert requests_get.call_args_list[0][0] == ('api/agenda/foo/subscription/?user_external_id=user:1',)
assert get_subscriptions(agenda_slug='foo') == []
assert requests_get.call_args_list[0][0] == ('api/agenda/foo/subscription/',)
assert requests_get.call_args_list[0][1]['remote_service']['url'] == 'http://chrono.example.org'
data = {'data': ['foo', 'bar']}
with mock.patch('requests.Session.get') as requests_get:
requests_get.return_value = MockedRequestResponse(content=json.dumps(data))
assert get_subscriptions('foo', 'user:1') == ['foo', 'bar']
assert get_subscriptions(agenda_slug='foo') == ['foo', 'bar']
data = {'data': []}
with mock.patch('requests.Session.get') as requests_get:
requests_get.return_value = MockedRequestResponse(content=json.dumps(data))
get_subscriptions(agenda_slug='foo', user_external_id='user:1')
assert requests_get.call_args_list[0][0] == ('api/agenda/foo/subscription/?user_external_id=user:1',)
assert requests_get.call_args_list[0][1]['remote_service']['url'] == 'http://chrono.example.org'
requests_get.reset_mock()
get_subscriptions(agenda_slug='foo', date_start=datetime.date(2022, 9, 1))
assert requests_get.call_args_list[0][0] == ('api/agenda/foo/subscription/?date_start=2022-09-01',)
assert requests_get.call_args_list[0][1]['remote_service']['url'] == 'http://chrono.example.org'
requests_get.reset_mock()
get_subscriptions(agenda_slug='foo', date_end=datetime.date(2022, 10, 1))
assert requests_get.call_args_list[0][0] == ('api/agenda/foo/subscription/?date_end=2022-10-01',)
assert requests_get.call_args_list[0][1]['remote_service']['url'] == 'http://chrono.example.org'
requests_get.reset_mock()
get_subscriptions(
agenda_slug='foo',
user_external_id='user:1',
date_start=datetime.date(2022, 9, 1),
date_end=datetime.date(2022, 10, 1),
)
assert requests_get.call_args_list[0][0] == (
'api/agenda/foo/subscription/?user_external_id=user:1&date_start=2022-09-01&date_end=2022-10-01',
)
assert requests_get.call_args_list[0][1]['remote_service']['url'] == 'http://chrono.example.org'
def test_get_check_status_no_service(settings):
settings.KNOWN_SERVICES = {}
with pytest.raises(ChronoError) as e:
get_check_status(
agenda_slugs=['foo'],
user_external_id='user:1',
date_start=datetime.date(2022, 9, 1),
date_end=datetime.date(2022, 10, 1),
)
assert str(e.value) == 'Unable to get check status'
settings.KNOWN_SERVICES = {'other': []}
with pytest.raises(ChronoError) as e:
get_check_status(
agenda_slugs=['foo'],
user_external_id='user:1',
date_start=datetime.date(2022, 9, 1),
date_end=datetime.date(2022, 10, 1),
)
assert str(e.value) == 'Unable to get check status'
def test_get_check_status():
with mock.patch('requests.Session.get') as requests_get:
requests_get.side_effect = ConnectionError()
with pytest.raises(ChronoError) as e:
get_check_status(
agenda_slugs=['foo', 'bar'],
user_external_id='user:1',
date_start=datetime.date(2022, 9, 1),
date_end=datetime.date(2022, 10, 1),
)
assert str(e.value) == 'Unable to get check status'
with mock.patch('requests.Session.get') as requests_get:
mock_resp = Response()
mock_resp.status_code = 500
requests_get.return_value = mock_resp
with pytest.raises(ChronoError) as e:
get_check_status(
agenda_slugs=['foo', 'bar'],
user_external_id='user:1',
date_start=datetime.date(2022, 9, 1),
date_end=datetime.date(2022, 10, 1),
)
assert str(e.value) == 'Unable to get check status'
with mock.patch('requests.Session.get') as requests_get:
mock_resp = Response()
mock_resp.status_code = 404
requests_get.return_value = mock_resp
with pytest.raises(ChronoError) as e:
get_check_status(
agenda_slugs=['foo', 'bar'],
user_external_id='user:1',
date_start=datetime.date(2022, 9, 1),
date_end=datetime.date(2022, 10, 1),
)
assert str(e.value) == 'Unable to get check status'
with mock.patch('requests.Session.get') as requests_get:
requests_get.return_value = MockedRequestResponse(content=json.dumps({'foo': 'bar'}))
with pytest.raises(ChronoError) as e:
get_check_status(
agenda_slugs=['foo', 'bar'],
user_external_id='user:1',
date_start=datetime.date(2022, 9, 1),
date_end=datetime.date(2022, 10, 1),
)
assert str(e.value) == 'Unable to get check status'
data = {'data': []}
with mock.patch('requests.Session.get') as requests_get:
requests_get.return_value = MockedRequestResponse(content=json.dumps(data))
assert (
get_check_status(
agenda_slugs=['foo', 'bar'],
user_external_id='user:1',
date_start=datetime.date(2022, 9, 1),
date_end=datetime.date(2022, 10, 1),
)
== []
)
assert requests_get.call_args_list[0][0] == (
'api/agendas/events/check-status/?user_external_id=user:1&agendas=foo,bar&date_start=2022-09-01&date_end=2022-10-01',
)
assert requests_get.call_args_list[0][1]['remote_service']['url'] == 'http://chrono.example.org'
data = {'data': ['foo', 'bar']}
with mock.patch('requests.Session.get') as requests_get:
requests_get.return_value = MockedRequestResponse(content=json.dumps(data))
assert get_check_status(
agenda_slugs=['foo', 'bar'],
user_external_id='user:1',
date_start=datetime.date(2022, 9, 1),
date_end=datetime.date(2022, 10, 1),
) == ['foo', 'bar']