import asyncio
import datetime
import inspect
import json
import logging
import os
import traceback
import typing
import warnings
import aiohttp.client_exceptions
import keyring
import versionedconfig as _vc
import divera.api.endpointwrappers
import divera.api.v2.auth
import divera.api.v2.pull
import divera.api.v2.statusgeber
import divera.config
from divera.config.migrations import *
import divera.models
import divera.models.status
import divera.socket
import divera.triggers
import divera.triggers.socketevents
import divera.utils
[docs]class Client:
def __init__(
self,
config_folder: str = None,
use_recommended_callbacks: bool = True,
):
self.config_folder = os.path.expanduser(config_folder) if config_folder else None
self.config_file = self.config_folder.rstrip('/') + '/' + 'divera.json' if config_folder else None
self.use_recommended_callbacks = use_recommended_callbacks
self.config = _vc.Configuration(
{},
)
self.read_config()
self.poll = False
self.main_task = None
self.callbacks = []
self.socket = divera.socket.SocketConnection(
jwt=self.get_jwt,
ucr=self.get_ucr,
config=self.config['access']['socket'],
)
if self.config_folder is None:
logging.warning(
'You have not specified a configuration folder.'
' Make sure to call Client.logout() before you terminate your program.',
)
self.block_polling = 0
[docs] def read_config(self):
"""
Load the configuration data from the `divera.json` inside the folder specified on `__init__`.
The configuration also gets updated to the latest version.
"""
def detect_and_convert_objects(data):
output = {}
models = divera.utils.get_all_models()
return_output = False
for key in data:
val = data[key]
for m in models:
if key != m.__name__:
continue
if type(val) is not list:
continue
for object_raw in val:
if key not in output:
output[key] = []
assert type(object_raw) is dict
output[key].append(
m(data=object_raw),
)
return_output = True
if return_output:
return output
else:
return data
if self.config_file:
if os.path.exists(self.config_file):
with open(self.config_file) as file:
text = file.read()
restored_config = json.loads(
text,
object_hook=detect_and_convert_objects,
) or {}
file.close()
else:
restored_config = {}
config = restored_config
else:
config = {}
self.config = _vc.Configuration(
_vc.migrate(
config,
list_of_migrations=[
mig()
for mig in DiveraPythonConfigMigration.__subclasses__()
],
),
path=self.config_file,
)
[docs] def store_config(self):
"""
Save the configuration data to the `divera.json` inside the folder specified on `__init__`.
"""
if not self.config_file:
warnings.warn("No file has been specified to store the configuration to.")
return
if not os.path.exists(self.config_folder):
os.makedirs(self.config_folder)
with open(self.config_file, 'w') as f:
f.write(
json.dumps(
self.config,
indent=4,
cls=divera.config.ModelEncoder,
)
)
f.close()
[docs] def get_username(self):
return self.config['access']["username"]
[docs] def get_domain(self):
return self.config['access']["domain"]
[docs] def get_protocol(self):
return self.config['access']['protocol']
[docs] def get_url(self):
return f"{self.get_protocol()}://{self.get_domain()}"
[docs] def set_domain(
self,
domain: str,
):
self.config['access']['domain'] = domain.rstrip('/').split('://')[-1]
[docs] async def login(
self,
username,
password,
domain: str = None,
url_websocket: str = None,
):
"""
Log in to an account.
:param username: email address of the user
:param password: password if the user
:param domain: domain of the server (needed if not set in config yet)
:param url_websocket: url of the websocket of the server for real time updated
:return:
"""
if domain:
self.set_domain(
domain=domain,
)
if self.config['access']['url websocket'] is None or url_websocket is not None:
self.config['access']['url websocket'] = url_websocket
if not self.get_domain():
raise ValueError('Cannot login without knowing the domain.')
if username:
self.config['access']['username'] = username
if not self.get_username():
raise ValueError('Cannot login without knowing the user name.')
login = divera.api.v2.auth.Login(
username=self.get_username(),
password=password,
)
resp = await self.request(
login,
)
if resp['success']:
access_token = resp['data']['user']['access_token']
await self.set_access_token(
access_token=access_token,
)
# Remove sensitive information before copying it to config
del resp['data']['user']['access_token']
self.config['user'] = resp['data']['user']
else:
raise PermissionError("unable to log in using provided credentials", resp)
[docs] async def login_with_access_token(
self,
username: str,
access_token: str,
domain: str,
):
"""
If the user should not be asked for a password, they can be asked for their access_token instead.
This is also referred to as 'access key' and can be found in the DEBUG section of your account settings.
:param username: Your username (email)
:param access_token: The access token / access key from the DEBUG section of your account settings.
:param domain: The domain of your divera instance
"""
self.config['access']['username'] = username
self.set_domain(
domain=domain,
)
await self.set_access_token(
access_token=access_token,
)
[docs] def logout(self):
"""
Delete the password from keyring.
"""
if self.get_domain() and self.get_username():
keyring.delete_password(
service_name=self.get_domain(),
username=self.config['access']['username'],
)
[docs] async def set_access_token(self, access_token: str):
"""
:param access_token:
:return:
"""
keyring.set_password(
service_name=self.get_domain(),
username=self.get_username(),
password=access_token,
)
[docs] def get_access_token(self):
try:
return keyring.get_password(
service_name=self.get_domain(),
username=self.get_username(),
)
except TypeError:
return None
[docs] def is_logged_in(self):
return bool(self.get_access_token())
[docs] async def get_jwt(
self,
version: str = '',
):
"""
Get a new jason web token
:param version: If you add any of ['ws', 'api'] to get the respective jwt
:return: jason web token as string
"""
jwt_response = await self.request(
divera.api.v2.auth.JWT(),
)
res = jwt_response['data']['jwt' + (f'_{version}' if version else '')]
return res
[docs] async def get_ucr(self):
pull_response = await self.request(
divera.api.v2.pull.All(),
)
return pull_response['data']['ucr_default']
[docs] async def request(
self,
endpoint: [
divera.api.endpointwrappers.EndPoint,
typing.AsyncGenerator,
typing.Generator,
typing.Any,
],
) -> typing.Any:
if issubclass(type(endpoint), divera.api.endpointwrappers.EndPoint):
return await self.request(
await self.perform_web_request(endpoint),
)
if '__aiter__' in dir(endpoint):
res = None
async for y in endpoint:
res = await self.request(y)
return res
if inspect.isgenerator(endpoint):
res = None
for y in endpoint:
res = await self.request(y)
return res
return endpoint
[docs] def add_callback(
self,
callback: typing.Callable,
filter_: [
typing.Type[divera.triggers.Trigger],
typing.Callable,
Exception,
],
model: [
typing.Type[divera.models.Model],
typing.List[typing.Type[divera.models.Model]],
] = None,
):
"""
Add callbacks and rules on when and for which model they are to be called.
:param callback: The function to be called.
:param filter_: The class of the trigger or a function that returns True if the function is to be called.
:param model: The type of object this callback should be used with. Use None for any model, or a list of models for any model in that list.
:return: None
"""
def block_polling(func, args, kwargs):
self.block_polling += 1
res = func(*args, **kwargs)
self.block_polling -= 1
return res
self.callbacks.append(
[
lambda *args, **kwargs: block_polling(callback, args, kwargs),
filter_,
model,
],
)
[docs] async def process_event(
self,
event: [
divera.triggers.Trigger,
Exception,
],
):
for cb, f, m in self.callbacks:
if type(f) is list or type(f) is tuple:
results = []
for trigger_class in f:
results.append(issubclass(type(event), trigger_class))
if True in results:
pass
else:
continue
elif issubclass(type(event), f):
pass
else:
continue
if issubclass(type(event), divera.triggers.ObjectInteraction):
if m is None:
pass
elif type(event.object) == m:
pass
elif type(m) in [list, tuple] and type(event.object) in m:
pass
else:
continue
try:
cb_ = cb(event, self)
if inspect.iscoroutine(cb_):
await cb_
except Exception as e:
logging.error(
str().join(
traceback.format_exception(e),
)
)
await self.process_event(e)
[docs] async def handle_socket_data(self, data):
# Unspecified triggers are sorted last (index = True = 1), others are sorted first (index = False = 0)
subclasses = divera.triggers.socketevents.get_matching_subclasses(
data=data,
)
return await self.process_event(
subclasses[0](
data=data,
),
)
[docs] def store_object_to_config(
self,
obj,
):
# Add object list to storage if necessary
if type(obj).__name__ not in self.config['models']:
self.config['models'][type(obj).__name__] = []
if obj in self.config['models'][type(obj).__name__]:
logging.warning(
f'{type(obj).__name__} is already stored in config',
)
else:
self.config['models'][type(obj).__name__].append(obj)
[docs] def update_object_in_config(
self,
obj,
):
for i, o in enumerate(self.config['models'][type(obj).__name__]):
if o == obj:
self.config['models'][type(obj).__name__][i] = obj
[docs] def remove_object_from_config(
self,
obj,
):
if type(obj).__name__ not in self.config['models']:
return
i = len(self.config['models'][type(obj).__name__]) - 1
while i >= 0:
if self.config['models'][type(obj).__name__][i] == obj:
del self.config['models'][type(obj).__name__][i]
i -= 1
# Remove empty object lists from storage
if len(self.config['models'][type(obj).__name__]) == 0:
del self.config['models'][type(obj).__name__]
[docs] async def poll_once(
self,
models: [
typing.List[typing.Type[divera.models.Model]],
typing.Tuple[typing.Type[divera.models.Model]]
] = None,
):
if models is None:
models = divera.utils.get_all_models()
for model in models:
new_objects = await self.request(
model.get_all(),
)
if new_objects is None:
new_objects = []
if new_objects is None:
continue
async for change in divera.triggers.get_object_changes(
new_objects=new_objects,
old_objects=self.config['models'][model.__name__] if model.__name__ in self.config['models'] else [],
):
await self.process_event(change)
[docs] async def poll_until_stopped(self):
self.poll = True
while self.poll is True:
last_tms = self.config['access']['polling']['last_poll'] or 0
next_tms = last_tms + self.config['access']['polling']['interval']
delay = next_tms - datetime.datetime.now().timestamp()
if delay > 0:
await asyncio.sleep(
max(
delay,
10,
)
)
else:
if self.block_polling > 0:
continue
self.config['access']['polling']['last_poll'] = datetime.datetime.now().timestamp()
try:
await self.poll_once()
except aiohttp.client_exceptions.ClientConnectorError as e:
if 'Temporary' in str(e):
logging.error(''.join(traceback.format_exception(e)))
[docs] async def set_status(
self,
status: [int, divera.models.status.Status] = None,
note: str = None,
vehicle: int = None,
reset_date: [int, datetime.datetime] = None,
reset_to: int = None,
alarm_skip: bool = None,
status_skip_statusplan: bool = None,
status_skip_geofence: bool = None,
):
await self.request(
divera.api.v2.statusgeber.SetStatus(
id=status.id if type(status) is divera.Status else status,
note=note,
vehicle=vehicle,
reset_date=reset_date,
reset_to=reset_to,
alarm_skip=alarm_skip,
status_skip_statusplan=status_skip_statusplan,
status_skip_geofence=status_skip_geofence,
),
)
[docs] def start_polling(
self,
):
asyncio.run(
self.poll_until_stopped(
)
)
[docs] def setup_socket_callbacks(self):
"""
Setup all known events to call `self.handle_socket_data`
"""
known_events = [
'reconnect',
'reconnect_error',
'connect_error',
'connect_failed',
'user-dm',
'user-status',
'user-statusplan',
'cluster-monitor',
'cluster-monitor-central',
'cluster-pull',
'cluster-message',
'user-message',
'cluster-vehicle',
'cluster-vehicle-property',
'cluster-vehicle-crew',
'cluster-crewtarget',
]
for event in known_events:
self.socket.add_callback(
self.handle_socket_data,
event=event,
)
[docs] def setup_recommended_callbacks(self):
self.setup_socket_callbacks()
self.add_callback(
lambda event, client: self.store_object_to_config(event.object),
divera.ObjectCreated,
)
self.add_callback(
lambda event, client: self.update_object_in_config(event.object),
divera.AttributeChanged,
)
self.add_callback(
lambda event, client: self.remove_object_from_config(event.object),
divera.ObjectRemoved,
)
async def cluster_pull_callback(event: divera.ClusterPullTrigger, client: divera.Client):
if event.action == 1:
await self.process_event(
divera.ObjectCreated(
await self.request(
event.models[0](
data=event.data['pull'],
).get(), # Update the info of the object before processing it
),
),
)
return
elif event.action == 4:
await self.process_event(
divera.ObjectRemoved(
list(filter(
lambda o: o.__getattribute__(o.sorting_key),
self.config['models'][event.models[0].__name__],
))[0],
),
)
return
else:
if event.action is None:
if type(event) in [
divera.ClusterPullMessageChannel,
]:
logging.warning(
f'Expecting action info for {type(event)}: ignoring to prevent double processing',
)
return
logging.debug(
f'falling back to manual detection of changes for type {event} with action {event.action}',
)
if event.models is None:
logging.warning(f'{type(event)} does not relate to any models')
return
if 'object' in dir(event):
obj = event.object
else:
obj_tmp = event.models[0](id=event.data['pull']['id'])
obj = await self.request(
obj_tmp.get(),
)
async for change in divera.triggers.get_object_changes(
[obj] if obj is not None else [],
[
o
for o in client.config['models'][type(obj).__name__]
if o == obj
]
if type(obj).__name__ in client.config['models'] else
[],
):
if event.action is not None or True:
logging.debug(
' '.join([
'action',
str(event.action),
'seems to be related to',
str(type(change)),
]),
)
await client.process_event(
change,
)
self.add_callback(
cluster_pull_callback,
divera.ClusterPullTrigger,
)
async def remove_message(event, client):
await self.process_event(
divera.ObjectRemoved(
event.object,
),
)
self.add_callback(
remove_message,
divera.MessageDeletedEvent,
)
[docs] async def listen(self):
if self.use_recommended_callbacks:
self.setup_recommended_callbacks()
url_ws = self.config['access']['url websocket']
if url_ws is None:
url_ws = divera.utils.get_recommended_websocket_url(
self.config['access']['domain'],
)
await self.socket.connect(
url=url_ws,
)
self.main_task = asyncio.gather(
self.config.autosave(
interval=60,
indent=4,
json_encoder=divera.config.ModelEncoder,
),
self.poll_until_stopped(),
self.socket.client.wait(),
)
await self.main_task
self.main_task = None
[docs] async def close(self):
self.config.run_autosave = False
self.poll = False
if self.main_task is not None:
self.main_task.cancel()
logging.info(
f'{type(self).__name__} has been instructed to stop and is not processing new events.'
)
await self.socket.client.disconnect()