Source code for divera.client

import asyncio
import datetime
import inspect
import json
import logging
import os
import traceback
import typing
import warnings

import aiohttp.client_exceptions
import keyring
import versionedconfig as _vc

import divera.api.endpointwrappers
import divera.api.v2.auth
import divera.api.v2.pull
import divera.api.v2.statusgeber
import divera.config
from divera.config.migrations import *
import divera.models
import divera.models.status
import divera.socket
import divera.triggers
import divera.triggers.socketevents
import divera.utils


[docs]class Client: def __init__( self, config_folder: str = None, use_recommended_callbacks: bool = True, ): self.config_folder = os.path.expanduser(config_folder) if config_folder else None self.config_file = self.config_folder.rstrip('/') + '/' + 'divera.json' if config_folder else None self.use_recommended_callbacks = use_recommended_callbacks self.config = _vc.Configuration( {}, ) self.read_config() self.poll = False self.main_task = None self.callbacks = [] self.socket = divera.socket.SocketConnection( jwt=self.get_jwt, ucr=self.get_ucr, config=self.config['access']['socket'], ) if self.config_folder is None: logging.warning( 'You have not specified a configuration folder.' ' Make sure to call Client.logout() before you terminate your program.', ) self.block_polling = 0
[docs] def read_config(self): """ Load the configuration data from the `divera.json` inside the folder specified on `__init__`. The configuration also gets updated to the latest version. """ def detect_and_convert_objects(data): output = {} models = divera.utils.get_all_models() return_output = False for key in data: val = data[key] for m in models: if key != m.__name__: continue if type(val) is not list: continue for object_raw in val: if key not in output: output[key] = [] assert type(object_raw) is dict output[key].append( m(data=object_raw), ) return_output = True if return_output: return output else: return data if self.config_file: if os.path.exists(self.config_file): with open(self.config_file) as file: text = file.read() restored_config = json.loads( text, object_hook=detect_and_convert_objects, ) or {} file.close() else: restored_config = {} config = restored_config else: config = {} self.config = _vc.Configuration( _vc.migrate( config, list_of_migrations=[ mig() for mig in DiveraPythonConfigMigration.__subclasses__() ], ), path=self.config_file, )
[docs] def store_config(self): """ Save the configuration data to the `divera.json` inside the folder specified on `__init__`. """ if not self.config_file: warnings.warn("No file has been specified to store the configuration to.") return if not os.path.exists(self.config_folder): os.makedirs(self.config_folder) with open(self.config_file, 'w') as f: f.write( json.dumps( self.config, indent=4, cls=divera.config.ModelEncoder, ) ) f.close()
[docs] def get_username(self): return self.config['access']["username"]
[docs] def get_domain(self): return self.config['access']["domain"]
[docs] def get_protocol(self): return self.config['access']['protocol']
[docs] def get_url(self): return f"{self.get_protocol()}://{self.get_domain()}"
[docs] def set_domain( self, domain: str, ): self.config['access']['domain'] = domain.rstrip('/').split('://')[-1]
[docs] async def login( self, username, password, domain: str = None, url_websocket: str = None, ): """ Log in to an account. :param username: email address of the user :param password: password if the user :param domain: domain of the server (needed if not set in config yet) :param url_websocket: url of the websocket of the server for real time updated :return: """ if domain: self.set_domain( domain=domain, ) if self.config['access']['url websocket'] is None or url_websocket is not None: self.config['access']['url websocket'] = url_websocket if not self.get_domain(): raise ValueError('Cannot login without knowing the domain.') if username: self.config['access']['username'] = username if not self.get_username(): raise ValueError('Cannot login without knowing the user name.') login = divera.api.v2.auth.Login( username=self.get_username(), password=password, ) resp = await self.request( login, ) if resp['success']: access_token = resp['data']['user']['access_token'] await self.set_access_token( access_token=access_token, ) # Remove sensitive information before copying it to config del resp['data']['user']['access_token'] self.config['user'] = resp['data']['user'] else: raise PermissionError("unable to log in using provided credentials", resp)
[docs] async def login_with_access_token( self, username: str, access_token: str, domain: str, ): """ If the user should not be asked for a password, they can be asked for their access_token instead. This is also referred to as 'access key' and can be found in the DEBUG section of your account settings. :param username: Your username (email) :param access_token: The access token / access key from the DEBUG section of your account settings. :param domain: The domain of your divera instance """ self.config['access']['username'] = username self.set_domain( domain=domain, ) await self.set_access_token( access_token=access_token, )
[docs] def logout(self): """ Delete the password from keyring. """ if self.get_domain() and self.get_username(): keyring.delete_password( service_name=self.get_domain(), username=self.config['access']['username'], )
[docs] async def set_access_token(self, access_token: str): """ :param access_token: :return: """ keyring.set_password( service_name=self.get_domain(), username=self.get_username(), password=access_token, )
[docs] def get_access_token(self): try: return keyring.get_password( service_name=self.get_domain(), username=self.get_username(), ) except TypeError: return None
[docs] def is_logged_in(self): return bool(self.get_access_token())
[docs] async def get_jwt( self, version: str = '', ): """ Get a new jason web token :param version: If you add any of ['ws', 'api'] to get the respective jwt :return: jason web token as string """ jwt_response = await self.request( divera.api.v2.auth.JWT(), ) res = jwt_response['data']['jwt' + (f'_{version}' if version else '')] return res
[docs] async def get_ucr(self): pull_response = await self.request( divera.api.v2.pull.All(), ) return pull_response['data']['ucr_default']
[docs] async def perform_web_request( self, request, ): res = request( base_url=self.get_url(), access_token=self.get_access_token(), ) args, kwargs = res async with aiohttp.ClientSession() as session: async with session.__getattribute__(request.method.lower())( *args, **kwargs, ) as resp: raw_response = None if 'Content-Type' in resp.headers: if resp.headers['Content-Type'] == 'application/json': raw_response = await resp.json() elif resp.headers['Content-Type'].startswith('text'): raw_response = await resp.text() elif resp.headers['Content-Type'] == 'application/octet-stream': raw_response = await resp.content.read() else: # default to json raw_response = await resp.json() try: processing_result = request.process_result(raw_response) if inspect.iscoroutine(processing_result): return await processing_result else: return processing_result except Exception as e: logging.error(''.join(traceback.format_exception(e)))
[docs] async def request( self, endpoint: [ divera.api.endpointwrappers.EndPoint, typing.AsyncGenerator, typing.Generator, typing.Any, ], ) -> typing.Any: if issubclass(type(endpoint), divera.api.endpointwrappers.EndPoint): return await self.request( await self.perform_web_request(endpoint), ) if '__aiter__' in dir(endpoint): res = None async for y in endpoint: res = await self.request(y) return res if inspect.isgenerator(endpoint): res = None for y in endpoint: res = await self.request(y) return res return endpoint
[docs] def add_callback( self, callback: typing.Callable, filter_: [ typing.Type[divera.triggers.Trigger], typing.Callable, Exception, ], model: [ typing.Type[divera.models.Model], typing.List[typing.Type[divera.models.Model]], ] = None, ): """ Add callbacks and rules on when and for which model they are to be called. :param callback: The function to be called. :param filter_: The class of the trigger or a function that returns True if the function is to be called. :param model: The type of object this callback should be used with. Use None for any model, or a list of models for any model in that list. :return: None """ def block_polling(func, args, kwargs): self.block_polling += 1 res = func(*args, **kwargs) self.block_polling -= 1 return res self.callbacks.append( [ lambda *args, **kwargs: block_polling(callback, args, kwargs), filter_, model, ], )
[docs] async def process_event( self, event: [ divera.triggers.Trigger, Exception, ], ): for cb, f, m in self.callbacks: if type(f) is list or type(f) is tuple: results = [] for trigger_class in f: results.append(issubclass(type(event), trigger_class)) if True in results: pass else: continue elif issubclass(type(event), f): pass else: continue if issubclass(type(event), divera.triggers.ObjectInteraction): if m is None: pass elif type(event.object) == m: pass elif type(m) in [list, tuple] and type(event.object) in m: pass else: continue try: cb_ = cb(event, self) if inspect.iscoroutine(cb_): await cb_ except Exception as e: logging.error( str().join( traceback.format_exception(e), ) ) await self.process_event(e)
[docs] async def handle_socket_data(self, data): # Unspecified triggers are sorted last (index = True = 1), others are sorted first (index = False = 0) subclasses = divera.triggers.socketevents.get_matching_subclasses( data=data, ) return await self.process_event( subclasses[0]( data=data, ), )
[docs] def store_object_to_config( self, obj, ): # Add object list to storage if necessary if type(obj).__name__ not in self.config['models']: self.config['models'][type(obj).__name__] = [] if obj in self.config['models'][type(obj).__name__]: logging.warning( f'{type(obj).__name__} is already stored in config', ) else: self.config['models'][type(obj).__name__].append(obj)
[docs] def update_object_in_config( self, obj, ): for i, o in enumerate(self.config['models'][type(obj).__name__]): if o == obj: self.config['models'][type(obj).__name__][i] = obj
[docs] def remove_object_from_config( self, obj, ): if type(obj).__name__ not in self.config['models']: return i = len(self.config['models'][type(obj).__name__]) - 1 while i >= 0: if self.config['models'][type(obj).__name__][i] == obj: del self.config['models'][type(obj).__name__][i] i -= 1 # Remove empty object lists from storage if len(self.config['models'][type(obj).__name__]) == 0: del self.config['models'][type(obj).__name__]
[docs] async def poll_once( self, models: [ typing.List[typing.Type[divera.models.Model]], typing.Tuple[typing.Type[divera.models.Model]] ] = None, ): if models is None: models = divera.utils.get_all_models() for model in models: new_objects = await self.request( model.get_all(), ) if new_objects is None: new_objects = [] if new_objects is None: continue async for change in divera.triggers.get_object_changes( new_objects=new_objects, old_objects=self.config['models'][model.__name__] if model.__name__ in self.config['models'] else [], ): await self.process_event(change)
[docs] async def poll_until_stopped(self): self.poll = True while self.poll is True: last_tms = self.config['access']['polling']['last_poll'] or 0 next_tms = last_tms + self.config['access']['polling']['interval'] delay = next_tms - datetime.datetime.now().timestamp() if delay > 0: await asyncio.sleep( max( delay, 10, ) ) else: if self.block_polling > 0: continue self.config['access']['polling']['last_poll'] = datetime.datetime.now().timestamp() try: await self.poll_once() except aiohttp.client_exceptions.ClientConnectorError as e: if 'Temporary' in str(e): logging.error(''.join(traceback.format_exception(e)))
[docs] async def set_status( self, status: [int, divera.models.status.Status] = None, note: str = None, vehicle: int = None, reset_date: [int, datetime.datetime] = None, reset_to: int = None, alarm_skip: bool = None, status_skip_statusplan: bool = None, status_skip_geofence: bool = None, ): await self.request( divera.api.v2.statusgeber.SetStatus( id=status.id if type(status) is divera.Status else status, note=note, vehicle=vehicle, reset_date=reset_date, reset_to=reset_to, alarm_skip=alarm_skip, status_skip_statusplan=status_skip_statusplan, status_skip_geofence=status_skip_geofence, ), )
[docs] def start_polling( self, ): asyncio.run( self.poll_until_stopped( ) )
[docs] def setup_socket_callbacks(self): """ Setup all known events to call `self.handle_socket_data` """ known_events = [ 'reconnect', 'reconnect_error', 'connect_error', 'connect_failed', 'user-dm', 'user-status', 'user-statusplan', 'cluster-monitor', 'cluster-monitor-central', 'cluster-pull', 'cluster-message', 'user-message', 'cluster-vehicle', 'cluster-vehicle-property', 'cluster-vehicle-crew', 'cluster-crewtarget', ] for event in known_events: self.socket.add_callback( self.handle_socket_data, event=event, )
[docs] async def listen(self): if self.use_recommended_callbacks: self.setup_recommended_callbacks() url_ws = self.config['access']['url websocket'] if url_ws is None: url_ws = divera.utils.get_recommended_websocket_url( self.config['access']['domain'], ) await self.socket.connect( url=url_ws, ) self.main_task = asyncio.gather( self.config.autosave( interval=60, indent=4, json_encoder=divera.config.ModelEncoder, ), self.poll_until_stopped(), self.socket.client.wait(), ) await self.main_task self.main_task = None
[docs] async def close(self): self.config.run_autosave = False self.poll = False if self.main_task is not None: self.main_task.cancel() logging.info( f'{type(self).__name__} has been instructed to stop and is not processing new events.' ) await self.socket.client.disconnect()