Source code for eventstreamr2.configuration

from eventstreamr2.utils.events import Observable
from eventstreamr2.utils.collections import AbstractPriorityDictionary
import itertools



[docs]class ConfigurationManager(Observable): """ This object manages sotring and accessing configurations. This object stores individual configurations on a 2-level hierarchy and enables the configurations to be ordered using an arbitary priority system. The 2 levels are named :code:`service_name` and :code:`source_name` to represent the service and the source of the configuration respectively. All configurations under a common :code:`service_name` are agregated using the :class:`ServiceConfigurationWrapper` which allows for easy access to individual configuration items. The priority given to :code:`set_config` is taken to mean that the larger(numerically) the number, the more important it is. So in the case where 2 configurations provide the same key; the configuration with the larger priority number will be used. .. note:: For every successful modification, the observers are notified with 0 or 1 parameters: - *Service Name* if there is only a single service affected. .. warning:: A notification of modification may not actually affect the data. .. todo:: Change the keying order for internal storage to :code:`source` -> :code:`service`. And change the priority to be located under the :code:`source` key(not :code:`service`). :ivar configs: A :code:`dict` storing all the configurations. """ _priority_key = object() def __init__(self): super(ConfigurationManager, self).__init__() self._configs = {} def __str__(self): import json return json.dumps(self._configs, indent=2) def __repr__(self): return "%s(%s)" % (self.__class__.__name__, str(self))
[docs] def get_config(self, service_name): """ Gets the service's configuration wrapped in a :class:`ServiceConfigurationWrapper`. :param service_name: The name of the service. :return: A read only dictionary like object that manages retrieving the configuration in a transparent manner. """ return ServiceConfigurationWrapper(self, service_name)
[docs] def set_source_priority(self, source_name, new_source_priority): self._configs.setdefault(source_name, {}) # Setup source name dictionary self._configs[source_name][self._priority_key] = new_source_priority
[docs] def set_config(self, service_name, source_name, source_priority, new_config=None): """ Assigns the given configuration to the service's configuration. If there is an existing (:code:`service_name`, :code:`source_name`) pair then it will be overwritten with the values given. The source name can be reused over multiple service names without any ill effects. If ``new_config`` is missing; it is assumed that no priority is provided. :param service_name: The name of the service. :param source_name: The name of the source. :param source_priority: The priority of the source. :type source_priority: number or dict :param new_config: A mapping containing the configuration. :type source_priority: dict or None """ if new_config is None: source_priority, new_config = new_config, source_priority self._configs.setdefault(source_name, {}) # Setup source name dictionary if source_priority is not None: self._configs[source_name][self._priority_key] = source_priority self._configs[source_name][service_name] = new_config self._notify_observers(service_name)
[docs] def delete_source(self, source_name): if source_name in self._configs: self._configs.pop(source_name) self._notify_observers()
[docs] def delete_service_config(self, service_name, source_name=None): """ Removes all configurations under :code:`service_name` and if specified :code:`source_name`. If :code:`source_name` is not specified all configurations a removed. No operation is performed if :code:`service_name` is missing, the same applies if the pair :code:`service_name` and :code:`source_name` are missing. .. note:: This will only fire listeners when it modifies the underlying data. It may not actually change the *end-user* data. :param service_name: The name of the service to remove. :param source_name: The name of source to remove. """ if source_name is not None: if source_name in self._configs: if service_name in self._configs[source_name]: self._configs[source_name].pop(service_name) self._notify_observers(service_name) else: modified = False for source_cfg in self._configs.values(): if service_name in source_cfg: source_cfg.pop(service_name) modified = True if modified: self._notify_observers(service_name)
[docs]class ServiceConfigurationWrapper(AbstractPriorityDictionary, Observable): """ An immutable dictionary which allows easier access to the values stored in :class:`ConfigurationManager`. It preferable to use :meth:`get_config <ConfigurationManager.get_config>` to create an instance of this class. .. warning:: A notification of modification may not actually affect the data. .. seealso:: :class:`AbstractPriorityDictionary` for more information about the avaliable functions. """ def __init__(self, config_manager, service_name): super(ServiceConfigurationWrapper, self).__init__() self.config_manager = config_manager self.service_name = service_name config_manager.add_weak_observer(self._cfg_mgr_update) def __repr__(self): return "%s(%r, %r)" % (self.__class__.__name__, self.config_manager, self.service_name) def _cfg_mgr_update(self, mgr, service_name=None): """ A callback for configuration manager. This notifies this object's observers if neccissary. """ if service_name is None or service_name == self.service_name: self._notify_observers(self) def _ordered_configs(self): pk = ConfigurationManager._priority_key name = self.service_name # Only those with filtered_cfgs = filter(lambda val: pk in val, self.config_manager._configs.values()) src_cfgs = sorted(filtered_cfgs, reverse=True, key=lambda v: v[pk]) for src_cfg in src_cfgs: if self.service_name in src_cfg: yield src_cfg[self.service_name]