Source code for granicus_archiver.config

from __future__ import annotations
from typing import (
    ClassVar, Literal, Iterator, Any, Self, get_type_hints, TYPE_CHECKING,
)
from abc import abstractmethod
from pathlib import Path
import os
from os import PathLike
import json
import dataclasses
from dataclasses import dataclass, field
from zoneinfo import ZoneInfo

from yarl import URL

from yaml import (
    load as yaml_load,
    dump as yaml_dump,
    CLoader as YamlLoader,
    CDumper as YamlDumper,
)
from loguru import logger
from appdirs import AppDirs

from .types import Serializable
if TYPE_CHECKING:
    from .clips.model import Location
    from .legistar.types import Category
    from .aws.client import Key
else:
    Location = str
    Category = str

GroupKey = Literal['root', 'google', 'aws', 'legistar']
""""""


APP_NAME = 'granicus-archiver'
APP_AUTHOR = 'granicus-archiver'

APP_DIRS = AppDirs(APP_NAME, APP_AUTHOR)


[docs] def get_app_config(*parts: Path|str) -> Path: """Get the user config directory for the app Any arguments (if provided) will be joined to the path """ return Path(APP_DIRS.user_config_dir, *parts)
[docs] def get_app_cache(*parts: Path|str) -> Path: """Get the user cache directory for the app Any arguments (if provided) will be joined to the path """ return Path(APP_DIRS.user_cache_dir, *parts)
[docs] @dataclass class BaseConfig(Serializable): group_key: ClassVar[GroupKey] """Unique key for :class:`BaseConfig` subclasses""" _env_prefix: ClassVar[str] = 'GRANICUS_ARCHIVER' @classmethod def _get_env_key(cls, key: str) -> str: return f'{cls._env_prefix}_{cls.group_key.upper()}_{key.upper()}'
[docs] @classmethod def iter_child_config_classes(cls) -> Iterator[tuple[str, type[BaseConfig]]]: """Iterate over child config classes """ for attr, val_type in get_type_hints(cls).items(): if not isinstance(val_type, type): continue if issubclass(val_type, BaseConfig): yield attr, val_type
@property def child_configs(self) -> dict[str, BaseConfig]: """Mapping of child config instances by their attribute name """ return { k:getattr(self, k) for k, _ in self.iter_child_config_classes() }
[docs] @abstractmethod def update(self, **kwargs) -> bool: """Update the config from keyword arguments """
[docs] @classmethod @abstractmethod def build_defaults(cls, **kwargs) -> Self: """Create the config using defaults Any provided keyword arguments will override the default """
[docs] @classmethod @abstractmethod def load_from_env(cls) -> Self: """Load the config from environment variables """
[docs] def as_dotenv(self) -> str: """Convert the config to a dotenv string """ data = self.serialize() child_configs = self.child_configs lines = [] for key, val in data.items(): if key in child_configs: child_conf = child_configs[key] lines.extend(child_conf.as_dotenv().splitlines()) else: if isinstance(val, (list, dict)): val = json.dumps(val) lines.append(f'{self._get_env_key(key)}={val}') return '\n'.join(lines)
@classmethod def _get_env_var[Vt: (str, int, bool, Path, URL)]( cls, key: str, val_type: type[Vt] ) -> Vt|None: env_key = cls._get_env_key(key) if env_key not in os.environ: return None val = os.environ[env_key].strip() return val_type(val) @classmethod def _get_env_var_list[Vt: (str, int, bool, Path, URL)]( cls, key: str, val_type: type[Vt] ) -> list[Vt]|None: val = cls._get_env_var(key, str) if val is None: return None val_list = json.loads(val) return [val_type(v) for v in val_list] @classmethod def _get_env_var_dict[Kt: (str), Vt: (str, int, bool, Path, URL)]( cls, key: str, key_type: type[Kt], val_type: type[Vt] ) -> dict[Kt, Vt]|None: val = cls._get_env_var(key, str) if val is None: return None d = json.loads(val) return {key_type(k):val_type(v) for k,v in d.items()}
[docs] @dataclass class GoogleConfig(BaseConfig): """Google config """ user_credentials_filename: Path """Path to store OAuth credentials""" drive_folder: Path """Root folder name to upload within Drive""" legistar_drive_folder: Path """Root folder name to upload legistar items within Drive""" rguid_legistar_drive_folder: Path """Root folder name to upload rguid legistar items within Drive""" folder_cache_file: Path """Path to store folder cache""" meta_cache_file: Path """Path to store metadata cache""" group_key: ClassVar[GroupKey] = 'google'
[docs] def update(self, **kwargs) -> bool: changed = False keys = [ 'user_credentials_filename', 'drive_folder', 'legistar_drive_folder', 'rguid_legistar_drive_folder', ] for key in keys: if key not in kwargs: continue val = kwargs[key] if val == getattr(self, key): continue assert isinstance(val, Path) setattr(self, key, val) changed = True return changed
[docs] @classmethod def build_defaults(cls, **kwargs) -> Self: default_kw = dict( user_credentials_filename=Path.home() / '.granicus-oauth-user.json', drive_folder=Path('granicus-archive/data/granicus'), legistar_drive_folder=Path('granicus-archive/data/legistar'), rguid_legistar_drive_folder=Path('granicus-archive/data/legistar-rguid'), folder_cache_file=get_app_cache('google-folder-cache.json'), meta_cache_file=get_app_cache('google-meta-cache.json'), ) for key, val in default_kw.items(): kwargs.setdefault(key, val) return cls(**kwargs)
def serialize(self) -> dict[str, Any]: return dict( user_credentials_filename=str(self.user_credentials_filename), drive_folder=str(self.drive_folder), legistar_drive_folder=str(self.legistar_drive_folder), rguid_legistar_drive_folder=str(self.rguid_legistar_drive_folder), folder_cache_file=str(self.folder_cache_file), meta_cache_file=str(self.meta_cache_file), ) @classmethod def deserialize(cls, data: dict[str, Any]) -> Self: rg_folder = data.get( 'rguid_legistar_drive_folder', 'granicus-archive/data/legistar-rguid' ) for key in ['folder_cache_file', 'meta_cache_file']: if key not in data: obj = cls.build_defaults() data[key] = getattr(obj, key) return cls( user_credentials_filename=Path(data['user_credentials_filename']), drive_folder=Path(data['drive_folder']), legistar_drive_folder=Path(data['legistar_drive_folder']), rguid_legistar_drive_folder=Path(rg_folder), folder_cache_file=Path(data['folder_cache_file']), meta_cache_file=Path(data['meta_cache_file']), )
[docs] @classmethod def load_from_env(cls) -> Self: kw = dict( user_credentials_filename=cls._get_env_var('user_credentials_filename', Path), drive_folder=cls._get_env_var('drive_folder', Path), legistar_drive_folder=cls._get_env_var('legistar_drive_folder', Path), rguid_legistar_drive_folder=cls._get_env_var('rguid_legistar_drive_folder', Path), folder_cache_file=cls._get_env_var('folder_cache_file', Path), meta_cache_file=cls._get_env_var('meta_cache_file', Path), ) kw = {k:v for k,v in kw.items() if v is not None} return cls.build_defaults(**kw)
[docs] @dataclass class AWSConfig(BaseConfig): """AWS Config """ default_object_url_fmt: ClassVar[str] = 'https://s3.amazonaws.com/{bucket_name}/{key}' """Default :attr:`object_url_format`""" bucket_name: str """The bucket to use for the archive""" clips_prefix: Path """Prefix for clips""" legistar_prefix: Path """Prefix for legistar items""" legistar_rguid_prefix: Path """Prefix for rguid legistar items""" region_name: str|None = None """AWS region name. If not set, the default region will be used""" s3_endpoint_url: URL|None = None """AWS S3 endpoint URL. If not set, the default endpoint will be used""" credentials_profile: str = 'default' """The AWS credentials profile to use (from ``~/.aws/credentials``)""" access_key_id: str|None = None """AWS Access Key ID. If not set, the default credentials provider chain will be used""" secret_access_key: str|None = None """AWS Secret Access Key. If not set, the default credentials provider chain will be used""" object_url_format: str = default_object_url_fmt """Format string for generating object URLs. Required fields are - :attr:`bucket_name` - ``key`` (the S3 object key name) """ group_key: ClassVar[GroupKey] = 'aws' @classmethod def _validate_object_url_format(cls, fmt: str) -> bool: try: fmt.format(bucket_name='test-bucket', key='test/key') except KeyError as e: raise ValueError(f'Invalid object_url_format, missing key: {e}') from e return True
[docs] def update(self, **kwargs) -> bool: changed = False not_required_keys = [ 'region_name', 's3_endpoint_url', 'credentials_profile', 'access_key_id', 'secret_access_key', ] path_keys = ['clips_prefix', 'legistar_prefix', 'legistar_rguid_prefix'] required_keys = ['bucket_name'] + path_keys keys = required_keys + not_required_keys for key in keys: if key not in kwargs: continue val = kwargs[key] if val == getattr(self, key): continue if key in path_keys: assert isinstance(val, Path) assert not val.is_absolute() setattr(self, key, val) changed = True if 'object_url_format' in kwargs: val = kwargs['object_url_format'] if val != self.object_url_format: self._validate_object_url_format(val) self.object_url_format = val changed = True return changed
[docs] def get_object_url(self, key: Key, scheme: str|None = None) -> URL: """Get a URL for an S3 key within :attr:`bucket_name` """ fmt_kwargs = dict( bucket_name=self.bucket_name, key=key, ) if self.region_name is not None: fmt_kwargs['region_name'] = self.region_name if self.s3_endpoint_url is not None: fmt_kwargs['s3_endpoint_url'] = str(self.s3_endpoint_url) url_str = self.object_url_format.format(**fmt_kwargs) url = URL(url_str) if scheme is not None: url = url.with_scheme(scheme) return url
[docs] @classmethod def build_defaults(cls, **kwargs) -> Self: default_kw = dict( bucket_name='', clips_prefix=Path('clips'), legistar_prefix=Path('legistar'), legistar_rguid_prefix=Path('legistar-rguid'), region_name=None, s3_endpoint_url=None, credentials_profile='default', access_key_id=None, secret_access_key=None, object_url_format=cls.default_object_url_fmt, ) for key, val in default_kw.items(): kwargs.setdefault(key, val) return cls(**kwargs)
def serialize(self) -> dict[str, Any]: return dict( bucket_name=self.bucket_name, clips_prefix=str(self.clips_prefix), legistar_prefix=str(self.legistar_prefix), legistar_rguid_prefix=str(self.legistar_rguid_prefix), region_name=self.region_name, s3_endpoint_url=str(self.s3_endpoint_url) if self.s3_endpoint_url else None, credentials_profile=self.credentials_profile, access_key_id=self.access_key_id, secret_access_key=self.secret_access_key, object_url_format=self.object_url_format, ) @classmethod def deserialize(cls, data: dict[str, Any]) -> Self: return cls( bucket_name=data['bucket_name'], clips_prefix=Path(data['clips_prefix']), legistar_prefix=Path(data['legistar_prefix']), legistar_rguid_prefix=Path(data['legistar_rguid_prefix']), region_name=data.get('region_name'), s3_endpoint_url=URL(data['s3_endpoint_url']) if data.get('s3_endpoint_url') else None, access_key_id=data.get('access_key_id'), secret_access_key=data.get('secret_access_key'), credentials_profile=data.get('credentials_profile', 'default'), object_url_format=data.get('object_url_format', cls.default_object_url_fmt), )
[docs] @classmethod def load_from_env(cls) -> Self: credentials_profile = cls._get_env_var('credentials_profile', str) if credentials_profile is None or not len(credentials_profile): credentials_profile = 'default' kw = dict( bucket_name=cls._get_env_var('bucket_name', str), clips_prefix=cls._get_env_var('clips_prefix', Path), legistar_prefix=cls._get_env_var('legistar_prefix', Path), legistar_rguid_prefix=cls._get_env_var('legistar_rguid_prefix', Path), region_name=cls._get_env_var('region_name', str), s3_endpoint_url=cls._get_env_var('s3_endpoint_url', URL), credentials_profile=credentials_profile, access_key_id=cls._get_env_var('access_key_id', str), secret_access_key=cls._get_env_var('secret_access_key', str), object_url_format=cls._get_env_var('object_url_format', str), ) kw = {k:v for k,v in kw.items() if v is not None} return cls.build_defaults(**kw)
[docs] @dataclass class LegistarConfig(BaseConfig): """Legistar Config """ out_dir: Path """Root directory to store downloaded files (relatve to the current working directory). Defaults to ``data/legistar`` """ out_dir_abs: Path """Root directory to store downloaded files (absolute path) """ data_file: Path """Filename to store parsed data. Defaults to "<out-dir>/data.json" """ search_index_dir: Path """Directory to store the search index (relative to the current working directory)""" feed_urls: dict[str, URL] = field(default_factory=dict) """Mapping of calendar RSS feed urls with user-defined names as keys """ feed_overflows_allowed: list[str] = field(default_factory=list) """A list of feed names (keys of :attr:`feed_urls`) that are allowed to reach the 100 item limit described in :meth:`.legistar.rss_parser.Feed.from_feed` """ category_maps: dict[Location, Category] = field(default_factory=dict) """A :class:`dict` of any custom mappings to match the :attr:`Clip.location <.model.Clip.location>` fields to their appropriate :attr:`.legistar.rss_parser.FeedItem.category` The keys for this should be the ``location`` with the values set to the ``category``. """ group_key: ClassVar[GroupKey] = 'legistar'
[docs] def is_feed_overflow_allowed(self, feed: str|URL) -> bool: """Check whether the given feed name or url is allowed to overflow If a string is supplied this returns whether it is present in :attr:`feed_overflows_allowed`. If a :class:`~yarl.URL` is supplied, :attr:`feed_urls` will be searched and the matching key (if any) will be checked. """ if isinstance(feed, URL): urls_rev = {v:k for k,v in self.feed_urls.items()} key = urls_rev.get(feed) else: key = feed return key in self.feed_overflows_allowed
[docs] def update(self, **kwargs) -> bool: changed = False out_dir = kwargs.get('out_dir') if out_dir is not None and out_dir != self.out_dir: assert isinstance(out_dir, Path) assert not out_dir.is_absolute() self.out_dir = out_dir self.out_dir_abs = out_dir.resolve() changed = True out_dir_abs = kwargs.get('out_dir_abs') if out_dir_abs is not None and out_dir_abs != self.out_dir_abs: assert isinstance(out_dir_abs, Path) assert out_dir_abs.is_absolute() self.out_dir_abs = out_dir_abs self.out_dir = out_dir_abs.relative_to(Path.cwd()) changed = True data_file = kwargs.get('data_file') if data_file is not None: assert isinstance(data_file, Path) self.data_file = data_file changed = True for key in ['feed_urls', 'category_maps']: if key not in kwargs: continue cur_val = getattr(self, key) cur_val.update(kwargs[key]) changed = True feed_overflows = kwargs.get('feed_overflows_allowed') if feed_overflows is not None: cur_val = self.feed_overflows_allowed s = set(cur_val) | set(feed_overflows) if s != set(cur_val): self.feed_overflows_allowed = list(s) changed = True search_index_dir = kwargs.get('search_index_dir') if search_index_dir is not None and search_index_dir != self.search_index_dir: assert isinstance(search_index_dir, Path) assert not search_index_dir.is_absolute() self.search_index_dir = search_index_dir changed = True return changed
[docs] @classmethod def build_defaults(cls, **kwargs) -> Self: out_dir = Path('data') / 'legistar' out_dir_abs = out_dir.resolve() default_kw = dict( out_dir=out_dir, out_dir_abs=out_dir_abs, data_file=out_dir / 'data.json', feed_urls={}, feed_overflows_allowed=[], category_maps={}, search_index_dir=out_dir / '_search-index', ) for key, val in default_kw.items(): kwargs.setdefault(key, val) return cls(**kwargs)
def serialize(self) -> dict[str, Any]: path_attrs = ['out_dir', 'out_dir_abs', 'data_file'] d: dict[str, object] = {k: str(getattr(self, k)) for k in path_attrs} d.update(dict( feed_urls={k:str(v) for k,v in self.feed_urls.items()}, feed_overflows_allowed=self.feed_overflows_allowed, category_maps=self.category_maps, search_index_dir=str(self.search_index_dir), )) return d @classmethod def deserialize(cls, data: dict[str, Any]) -> Self: return cls( out_dir=Path(data['out_dir']), out_dir_abs=Path(data['out_dir_abs']), data_file=Path(data['data_file']), feed_urls={k:URL(v) for k,v in data['feed_urls'].items()}, feed_overflows_allowed=data.get('feed_overflows_allowed', []), category_maps=data['category_maps'], search_index_dir=Path(data['search_index_dir']), )
[docs] @classmethod def load_from_env(cls) -> Self: kw = dict( out_dir=cls._get_env_var('out_dir', Path), out_dir_abs=cls._get_env_var('out_dir_abs', Path), data_file=cls._get_env_var('data_file', Path), feed_urls=cls._get_env_var_dict('feed_urls', str, URL), feed_overflows_allowed=cls._get_env_var_list('feed_overflows_allowed', str), category_maps=cls._get_env_var_dict('category_maps', str, str), search_index_dir=cls._get_env_var('search_index_dir', Path), ) kw = {k:v for k,v in kw.items() if v is not None} return cls.build_defaults(**kw)
[docs] @dataclass class Config(BaseConfig): out_dir: Path """Root directory to store downloaded files (relatve to the current working directory) """ out_dir_abs: Path """Root directory to store downloaded files (absolute path) """ data_file: Path """Filename to store download information. Defaults to "<out-dir>/data.json" """ timestamp_file: Path """Filename to store clip timestamp information. Defaults to "<out-dir>/timestamp-data.yaml" """ granicus_data_url: URL|None """URL for granicus clip data """ legistar: LegistarConfig """:class:`LegistarConfig` instance """ google: GoogleConfig """:class:`GoogleConfig` instance """ aws: AWSConfig """:class:`AWSConfig` instance """ local_timezone_name: str|None """Local timezone name for all granicus / legistar items""" default_filename: ClassVar[Path] = get_app_config('config.yaml') group_key: ClassVar[GroupKey] = 'root' app_dirs: ClassVar[AppDirs] = AppDirs(APP_NAME, APP_AUTHOR) _read_only: ClassVar[bool] = True def __post_init__(self) -> None: assert self.out_dir != self.legistar.out_dir assert self.out_dir_abs != self.legistar.out_dir_abs assert self.data_file != self.legistar.data_file assert self.data_file.resolve() != self.legistar.data_file.resolve()
[docs] @classmethod def get_app_config(cls, *parts: Path|str) -> Path: """Get the user config directory for the app Any arguments (if provided) will be joined to the path """ return get_app_config(*parts)
[docs] @classmethod def get_app_cache(cls, *parts: Path|str) -> Path: """Get the user cache directory for the app Any arguments (if provided) will be joined to the path """ return get_app_cache(*parts)
@property def local_timezone(self) -> ZoneInfo: if self.local_timezone_name is None: raise ValueError('local timezone not set') return ZoneInfo(self.local_timezone_name)
[docs] def get_group(self, key: GroupKey) -> BaseConfig: """Get a :class:`BaseConfig` instance by its :attr:`~BaseConfig.group_key` """ if key == 'root': return self obj = getattr(self, key) return obj
[docs] @classmethod def load(cls, filename: PathLike) -> Self: """Load the config from the given filename """ if not isinstance(filename, Path): filename = Path(filename) if not filename.exists(): logger.warning(f'Config file {filename} does not exist. Using defaults') return cls.build_defaults() data = yaml_load(filename.read_text(), Loader=YamlLoader) return cls.deserialize(data)
[docs] def save(self, filename: PathLike) -> None: """Save the config to the given filename """ if self._read_only: raise ValueError('Config is read-only') if not isinstance(filename, Path): filename = Path(filename) filename.parent.mkdir(parents=True, exist_ok=True) s = yaml_dump(self.serialize(), Dumper=YamlDumper) filename.write_text(s)
[docs] def update(self, **kwargs) -> bool: changed = False out_dir = kwargs.get('out_dir') if out_dir is not None and out_dir != self.out_dir: assert isinstance(out_dir, Path) assert not out_dir.is_absolute() self.out_dir = out_dir self.out_dir_abs = out_dir.resolve() changed = True out_dir_abs = kwargs.get('out_dir_abs') if out_dir_abs is not None and out_dir_abs != self.out_dir_abs: assert isinstance(out_dir_abs, Path) assert out_dir_abs.is_absolute() self.out_dir_abs = out_dir_abs self.out_dir = out_dir_abs.relative_to(Path.cwd()) changed = True path_attrs = ['data_file', 'timestamp_file'] child_configs = self.child_configs for key, val in kwargs.items(): if val is None: continue if key in path_attrs and val != getattr(self, key): assert isinstance(val, Path) setattr(self, key, val) changed = True elif key == 'granicus_data_url': val = URL(val) if self.granicus_data_url == val: continue self.granicus_data_url = val changed = True elif key in child_configs: if child_configs[key].update(**val): changed = True elif key == 'local_timezone_name': if val == self.local_timezone_name: continue self.local_timezone_name = val changed = True return changed
[docs] @classmethod def build_defaults(cls, **kwargs) -> Self: out_dir = Path('data/granicus') out_dir_abs = out_dir.resolve() # out_dir_abs: Path = kwargs.get('out_dir', Path.cwd() / 'data') # out_dir = out_dir_abs.relative_to(Path.cwd()) data_url = kwargs.get('data_url') if data_url is not None: data_url = URL(data_url) feed_url = kwargs.get('legistar_feed_url') if feed_url is not None: feed_url = URL(feed_url) default_kw = dict( out_dir=out_dir, out_dir_abs=out_dir_abs, data_file=out_dir / 'data.json', timestamp_file=out_dir / 'timestamp-data.yaml', granicus_data_url=data_url, local_timezone_name=None, ) for attr, conf_cls in cls.iter_child_config_classes(): if attr in kwargs and isinstance(kwargs[attr], conf_cls): continue val = conf_cls.build_defaults(**kwargs.get(attr, {})) kwargs[attr] = val for key, val in default_kw.items(): kwargs.setdefault(key, val) return cls(**kwargs)
def serialize(self) -> dict[str, Any]: path_attrs = ['out_dir', 'out_dir_abs', 'data_file', 'timestamp_file'] d: dict[str, object] = {k: str(getattr(self, k)) for k in path_attrs} data_url = self.granicus_data_url d['granicus_data_url'] = None if data_url is None else str(data_url) for key, child_conf in self.child_configs.items(): d[key] = child_conf.serialize() d['local_timezone_name'] = self.local_timezone_name return d @classmethod def deserialize(cls, data: dict[str, Any]) -> Self: path_attrs = ['out_dir', 'out_dir_abs', 'data_file', 'timestamp_file'] kw: dict[str, Any] = {k: Path(data[k]) for k in path_attrs} data_url = data.get('granicus_data_url') if data_url is not None: data_url = URL(data_url) for key, val_type in cls.iter_child_config_classes(): if key not in data: child_conf = val_type.build_defaults() else: child_conf = val_type.deserialize(data[key]) kw[key] = child_conf return cls( granicus_data_url=data_url, local_timezone_name=data.get('local_timezone_name'), **kw )
[docs] @classmethod def load_from_env(cls) -> Self: kw: dict[str, Any] = dict( out_dir=cls._get_env_var('out_dir', Path), out_dir_abs=cls._get_env_var('out_dir_abs', Path), data_file=cls._get_env_var('data_file', Path), timestamp_file=cls._get_env_var('timestamp_file', Path), granicus_data_url=cls._get_env_var('granicus_data_url', URL), local_timezone_name=cls._get_env_var('local_timezone_name', str), ) kw = {k:v for k,v in kw.items() if v is not None} for key, val_type in cls.iter_child_config_classes(): kw[key] = val_type.load_from_env() return cls.build_defaults(**kw)