Source code for granicus_archiver.aws.client

from __future__ import annotations
from typing import (
    TypeVar, Generic, Iterator, AsyncGenerator, Literal, Self, TypedDict,
    cast, overload, TYPE_CHECKING
)
from abc import ABC, abstractmethod
from pathlib import Path
import mimetypes
from urllib.parse import urlencode
import asyncio
import sys

from loguru import logger

import aiojobs
import aioboto3
import botocore.exceptions
from yarl import URL

if TYPE_CHECKING or 'sphinx.ext.autodoc' in sys.modules:
    from types_aiobotocore_s3 import S3ServiceResource, S3Client
    from types_aiobotocore_s3.service_resource import Bucket, Object, ObjectSummary

else:
    S3ServiceResource = object
    S3Client = object
    Bucket = object
    Object = object
    ObjectSummary = object



from ..config import Config
from ..types import FileMeta
from ..clips.model import Clip, ClipFileUploadKey, ClipCollection
from ..legistar.types import GUID, REAL_GUID, LegistarFileUID, _GuidT, _ItemT
from ..legistar.model import AbstractLegistarModel, LegistarData, DetailPageResult
from ..legistar.guid_model import RGuidDetailResult, RGuidLegistarData
from ..utils import (
    CompletionCounts, JobWaiters,
    HashMismatchError, SHA1Hash, get_file_hash_async
)

Key = str|Path
ACL = Literal[
    'private', 'public-read', 'public-read-write', 'authenticated-read',
    'aws-exec-read', 'bucket-owner-read', 'bucket-owner-full-control'
]

[docs] class SchedulersTD(TypedDict): general: aiojobs.Scheduler clip_uploads: aiojobs.Scheduler uploads: aiojobs.Scheduler upload_checks: aiojobs.Scheduler
def _key_to_str(key: Key) -> str: if isinstance(key, Path): assert not key.is_absolute() return str(key) return key
[docs] class S3ClientKwargs(TypedDict, total=False): region_name: str|None endpoint_url: str|None
[docs] class ClientBase: """Base class for AWS clients """ session: aioboto3.Session """The boto3 session""" s3_resource: S3ServiceResource """The S3 service resource""" s3_client: S3Client """The S3 client""" bucket: Bucket """The S3 bucket""" def __init__(self, config: Config) -> None: self.config = config profile_name = self.config.aws.credentials_profile if self.config.aws.access_key_id is not None and self.config.aws.secret_access_key is not None: profile_name = None self.session = aioboto3.Session( profile_name=profile_name, region_name=self.config.aws.region_name, aws_access_key_id=self.config.aws.access_key_id, aws_secret_access_key=self.config.aws.secret_access_key, ) self._is_open = False def _get_client_kwargs(self) -> S3ClientKwargs: endpoint_url = None if self.config.aws.s3_endpoint_url is not None: endpoint_url = str(self.config.aws.s3_endpoint_url) return S3ClientKwargs( region_name=self.config.aws.region_name, endpoint_url=endpoint_url, ) async def __aenter__(self) -> Self: self._is_open = True client_kw = self._get_client_kwargs() ctx = self.session.resource('s3', **client_kw) self.s3_resource = cast(S3ServiceResource, await ctx.__aenter__()) ctx = self.session.client('s3', **client_kw) self.s3_client = cast(S3Client, await ctx.__aenter__()) # type: ignore self.bucket = await self.s3_resource.Bucket(self.config.aws.bucket_name) return self async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: if self._is_open: await self.s3_resource.__aexit__(exc_type, exc_val, exc_tb) await self.s3_client.__aexit__(exc_type, exc_val, exc_tb) self._is_open = False
[docs] def url_for_key(self, key: Key, scheme: str|None = None) -> URL: """Get a URL for an S3 key within :attr:`bucket` """ return self.config.aws.get_object_url(key, scheme=scheme)
[docs] async def get_object(self, key: Key) -> Object: """Get an S3 object within :attr:`bucket` by key """ key = _key_to_str(key) obj = await self.bucket.Object(key) await obj.load() return obj
[docs] async def iter_objects(self, prefix: Path|str) -> AsyncGenerator[ObjectSummary, None]: """Iterate over objects in :attr:`bucket` with a given (optional) prefix """ if isinstance(prefix, Path): assert not prefix.is_absolute() prefix = f'{prefix}/' async for obj in self.bucket.objects.filter(Prefix=prefix): yield obj
[docs] async def object_exists(self, key: Key) -> bool: """Check if an object exists in :attr:`bucket` by key """ try: await self.get_object(key) return True except botocore.exceptions.ClientError as exc: if exc.response.get('Error', {}).get('Code') == '404': return False raise
[docs] async def get_object_tags(self, obj: Object|Key) -> dict[str, str]: """Get the tags for an S3 object """ key = _key_to_str(obj) if isinstance(obj, (Path, str)) else obj.key tag_set = await self.s3_client.get_object_tagging( Bucket=self.bucket.name, Key=key, ) tags: dict[str, str] = {} for tag in tag_set['TagSet']: tags[tag['Key']] = tag['Value'] return tags
[docs] async def get_object_sha1(self, obj: Object|Key) -> SHA1Hash|None: """Get the SHA1 hash for an S3 object """ tags = await self.get_object_tags(obj) if 'SHA1' not in tags: return None return SHA1Hash(tags['SHA1'])
[docs] async def check_object_hash(self, obj: Object|Key, local_hash: SHA1Hash) -> None: """Check the SHA1 hash of an S3 object against a local hash Raises: HashMismatchError: If the hashes do not match """ remote_hash = await self.get_object_sha1(obj) if remote_hash != local_hash: raise HashMismatchError( f'Hash mismatch for {obj}', local_hash, remote_hash, )
@overload async def upload( self, key: Key, local_filename: Path, wait_exists: bool = True, check_exists: bool = ..., local_meta: FileMeta|None = ..., acl: ACL|None = ..., ) -> Object: ... @overload async def upload( self, key: Key, local_filename: Path, wait_exists: bool = ..., check_exists: bool = ..., local_meta: FileMeta|None = ..., acl: ACL|None = ..., ) -> Object: ... @overload async def upload( self, key: Key, local_filename: Path, wait_exists: bool = False, check_exists: bool = ..., local_meta: FileMeta|None = ..., acl: ACL|None = ..., ) -> Key: ...
[docs] async def upload( self, key: Key, local_filename: Path, wait_exists: bool = True, check_exists: bool = True, local_meta: FileMeta|None = None, acl: ACL|None = None, ) -> Object|Key: """Upload a file to S3 Arguments: key: The S3 key to upload the file to local_filename: The local file to upload wait_exists: If ``True``, wait for the object to exist before returning check_exists: If ``True``, check if the object exists before uploading local_meta: The metadata for the local file acl: The ACL for the uploaded object """ async def get_local_hash() -> SHA1Hash: if local_meta is not None and local_meta.sha1 is not None: return local_meta.sha1 return await get_file_hash_async('sha1', local_filename) key = _key_to_str(key) if check_exists and await self.object_exists(key): logger.warning(f'Object already exists: {key}') if wait_exists: return await self.get_object(key) return key mt = mimetypes.guess_type(local_filename)[0] assert mt is not None local_hash = await get_local_hash() tags = { 'Content-Type': mt, 'SHA1': local_hash, } extra_args = { 'ContentType': mt, 'Tagging': urlencode(tags), } if acl is not None: extra_args['ACL'] = acl await self.bucket.upload_file( Filename=str(local_filename), Key=key, ExtraArgs=extra_args, ) if wait_exists: waiter = self.s3_client.get_waiter('object_exists') await waiter.wait( Bucket=self.bucket.name, Key=key, ) obj = await self.get_object(key) return obj return key
[docs] async def download_object( self, key: Key, local_filename: Path ) -> None: """Download an S3 object to a local file """ key = _key_to_str(key) await self.bucket.download_file( Key=key, Filename=str(local_filename), )
[docs] class ClipClient(ClientBase): """AWS Client to upload items from a :class:`.model.ClipCollection` """ def __init__( self, config: Config, max_clips: int = 8, scheduler_limit: int = 8, ) -> None: super().__init__(config) self.max_clips = max_clips self.scheduler_limit = scheduler_limit self.completion_counts = CompletionCounts(max_clips, enable_log=True) @property def upload_dir(self) -> Path: """Root folder name (key prefix) for uploaded clips (alias for :attr:`.config.AWSConfig.clips_prefix`) """ return self.config.aws.clips_prefix async def __aenter__(self) -> Self: await super().__aenter__() self.schedulers: SchedulersTD = { 'general': await aiojobs.create_scheduler(), 'clip_uploads': await aiojobs.create_scheduler(limit=16, pending_limit=1), 'uploads': await aiojobs.create_scheduler(limit=self.scheduler_limit), 'upload_checks': await aiojobs.create_scheduler(limit=16, pending_limit=1), } self.upload_check_waiters = JobWaiters[tuple[bool, Clip, set[ClipFileUploadKey]]]( scheduler=self.schedulers['upload_checks'], ) self.upload_clip_waiters = JobWaiters[bool]( scheduler=self.schedulers['clip_uploads'], ) return self async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: logger.info('closing schedulers..') scheduler_list: list[aiojobs.Scheduler] = [ self.schedulers[key] for key in self.schedulers.keys() ] await asyncio.gather(*[sch.close() for sch in scheduler_list]) return await super().__aexit__(exc_type, exc_val, exc_tb)
[docs] def get_clip_file_upload_path(self, clip: Clip, key: ClipFileUploadKey) -> Path: """Get the uploaded filename for a clip asset (relative to :attr:`upload_dir`) """ rel_filename = clip.get_file_path(key, absolute=False) return self.upload_dir / rel_filename
[docs] async def upload_data_file(self) -> None: """Upload the clips data file to S3 """ local_file = self.config.data_file remote_file = self.upload_dir / local_file.name exists = await self.object_exists(remote_file) if exists: remote_hash = await self.get_object_sha1(remote_file) local_hash = await get_file_hash_async('sha1', local_file) if remote_hash == local_hash: logger.info(f'Data file matches hosted version: {remote_file}') return logger.info(f'Uploading {local_file} to {remote_file}') await self.upload( key=remote_file, local_filename=local_file, check_exists=False, )
[docs] @logger.catch(reraise=True) async def upload_clip(self, clip: Clip, *file_keys: ClipFileUploadKey) -> bool: """Upload all assets for the given clip """ waiter = JobWaiters[bool](scheduler=self.schedulers['uploads']) num_jobs = 0 async def _do_upload( key: ClipFileUploadKey, local_file: Path, remote_file: Path ) -> bool: logger.debug(f'uploading {local_file} to {remote_file}') local_meta = clip.files.get_metadata(key) assert local_meta is not None try: await self.upload( key=remote_file, local_filename=local_file, check_exists=True, wait_exists=False, local_meta=local_meta, acl='public-read', ) return True except botocore.exceptions.ClientError as exc: logger.error(f'Failed to upload {local_file}: {exc}') return False for file_key, local_file in clip.iter_paths(for_download=False): if len(file_keys) and file_key not in file_keys: continue if not local_file.exists(): continue remote_file = self.get_clip_file_upload_path(clip, file_key) await waiter.spawn(_do_upload(file_key, local_file, remote_file)) num_jobs += 1 if not len(waiter): self.completion_counts.num_completed += 1 return True results = await waiter all_skipped = True not in results logger.success(f'Upload complete for clip "{clip.unique_name}"') self.completion_counts.num_completed += 1 return all_skipped
[docs] @logger.catch(reraise=True) async def check_clip_needs_upload(self, clip: Clip) -> tuple[bool, Clip, set[ClipFileUploadKey]]: """Check if the given clip hash any assets that need to be uploaded """ remote_files: dict[Path, ClipFileUploadKey] = {} for file_key, filename in clip.iter_paths(for_download=False): if not filename.exists(): continue remote_file = self.get_clip_file_upload_path(clip, file_key) remote_files[remote_file] = file_key parent_dirs = {f.parent for f in remote_files.keys()} assert len(parent_dirs) == 1, f'{parent_dirs=}' parent_dir = parent_dirs.pop() async for obj_summary in self.iter_objects(parent_dir): remote_file = Path(obj_summary.key) if remote_file in remote_files: del remote_files[remote_file] return len(remote_files) > 0, clip, set(remote_files.values())
[docs] async def handle_upload_check_jobs(self) -> None: """Wait for jobs from :meth:`check_clip_needs_upload` and spawns their :meth:`upload_clip` jobs """ if self.completion_counts.full: return async for job in self.upload_check_waiters: if job.exception is not None: raise job.exception needs_upload, clip, file_keys = job.result if not needs_upload: continue await self.upload_clip_waiters.spawn(self.upload_clip(clip, *file_keys)) self.completion_counts.num_queued += 1 if self.completion_counts.full: break
[docs] async def upload_all(self, clips: ClipCollection) -> None: """Upload assets for the given clips (up to by :attr:`~ClientBase.max_clips`) """ upload_check_sch = self.schedulers['upload_checks'] upload_check_limit = upload_check_sch.limit assert upload_check_limit is not None for clip in clips: await self.upload_check_waiters.spawn(self.check_clip_needs_upload(clip)) if len(self.upload_check_waiters) >= upload_check_limit: await self.handle_upload_check_jobs() if self.completion_counts.full: break await self.handle_upload_check_jobs() if len(self.upload_check_waiters): logger.info(f'waiting for check_waiters ({len(self.upload_check_waiters)=})') await self.upload_check_waiters self.completion_counts.max_items = self.completion_counts.num_queued if len(self.upload_clip_waiters): logger.info(f'waiting for waiters ({len(self.upload_clip_waiters)=})') await self.upload_clip_waiters await self.upload_data_file() logger.success('all waiters finished')
_ModelT = TypeVar('_ModelT', bound=AbstractLegistarModel)
[docs] class LegistarClientBase(ClientBase, Generic[_GuidT, _ItemT, _ModelT], ABC): max_clips: int """The maximum number of items to upload""" legistar_data: _ModelT """A :class:`~.legistar.model.AbstractLegistarModel` instance""" item_scheduler: aiojobs.Scheduler check_scheduler: aiojobs.Scheduler upload_scheduler: aiojobs.Scheduler def __init__( self, config: Config, max_clips: int, legistar_data: _ModelT|None = None ) -> None: super().__init__(config) self.max_clips = max_clips if legistar_data is None: legistar_data = self._load_legistar_data() self.legistar_data = legistar_data self.completion_counts = CompletionCounts(max_clips, enable_log=True) @property @abstractmethod def upload_dir(self) -> Path: ... @property @abstractmethod def legistar_data_file(self) -> Path: ... @abstractmethod def _load_legistar_data(self) -> _ModelT: ... @abstractmethod def iter_item_guids(self) -> Iterator[_GuidT]: ... async def __aenter__(self) -> Self: self.item_scheduler = aiojobs.Scheduler(limit=16) self.check_scheduler = aiojobs.Scheduler(limit=16, pending_limit=1) self.upload_scheduler = aiojobs.Scheduler(limit=32) self.check_waiters = JobWaiters[tuple[bool, _GuidT, set[LegistarFileUID]]]( scheduler=self.check_scheduler ) self.item_waiters = JobWaiters[bool](scheduler=self.item_scheduler) return await super().__aenter__() async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: await self.check_scheduler.wait_and_close() await self.item_scheduler.wait_and_close() await self.upload_scheduler.wait_and_close() return await super().__aexit__(exc_type, exc_val, exc_tb)
[docs] def get_file_upload_path(self, guid: _GuidT, uid: LegistarFileUID) -> Path: """Get the uploaded filename for a legistar asset (relative to :attr:`~ClientBase.upload_dir`) """ filename, _ = self.legistar_data.get_path_for_uid(guid, uid) assert not filename.is_absolute() filename = filename.relative_to(self.legistar_data.root_dir) return self.upload_dir / filename
[docs] async def upload_data_file(self) -> None: """Upload the legistar data file to S3 """ local_file = self.legistar_data_file remote_file = self.upload_dir / local_file.name exists = await self.object_exists(remote_file) if exists: remote_hash = await self.get_object_sha1(remote_file) local_hash = await get_file_hash_async('sha1', local_file) if remote_hash == local_hash: logger.info(f'Data file matches hosted version: {remote_file}') return logger.info(f'Uploading {local_file} to {remote_file}') await self.upload( key=remote_file, local_filename=local_file, check_exists=False, )
[docs] async def upload_legistar_file( self, guid: _GuidT, uid: LegistarFileUID, local_file: Path, local_meta: FileMeta, ) -> bool: """Upload a single legistar file """ remote_file = self.get_file_upload_path(guid, uid) try: await self.upload( key=remote_file, local_filename=local_file, check_exists=True, local_meta=local_meta, acl='public-read', ) return True except botocore.exceptions.ClientError as exc: logger.error(f'Failed to upload {local_file}: {exc}') return False
[docs] async def upload_legistar_item(self, guid: _GuidT, *uids: LegistarFileUID) -> bool: """Upload all assets for the legistar item matching the given *guid* """ waiter = JobWaiters[bool](scheduler=self.upload_scheduler) num_jobs = 0 it = self.legistar_data.iter_files_for_upload(guid) for uid, local_file, local_meta, is_attachment in it: if len(uids) and uid not in uids: continue if not local_file.exists(): continue await waiter.spawn( self.upload_legistar_file(guid, uid, local_file, local_meta) ) num_jobs += 1 if not len(waiter): return True results = await waiter all_skipped = True not in results logger.success(f'Upload complete for Legistar item "{guid}"') self.completion_counts.num_completed += 1 return all_skipped
[docs] async def check_item_needs_upload(self, guid: _GuidT) -> tuple[bool, _GuidT, set[LegistarFileUID]]: """Check if the item matching *guid* has any local files that need to be uploaded """ remote_files: dict[Path, LegistarFileUID] = {} for uid, local_file, local_meta, is_attachment in self.legistar_data.iter_files_for_upload(guid): if not local_file.exists(): continue remote_file = self.get_file_upload_path(guid, uid) remote_files[remote_file] = uid if not len(remote_files): return False, guid, set() parent_dirs = {f.parent for f in remote_files.keys()} # attachments use a subdirectory for parent_dir in parent_dirs: async for obj_summary in self.iter_objects(parent_dir): remote_file = Path(obj_summary.key) if remote_file in remote_files: del remote_files[remote_file] return len(remote_files) > 0, guid, set(remote_files.values())
[docs] async def handle_upload_check_jobs(self) -> None: """Wait for jobs from :meth:`check_item_needs_upload` and spawns their :meth:`upload_legistar_item` jobs """ if self.completion_counts.full: return async for job in self.check_waiters: if job.exception is not None: raise job.exception needs_upload, guid, uids = job.result if not needs_upload: continue await self.item_waiters.spawn(self.upload_legistar_item(guid, *uids)) self.completion_counts.num_queued += 1 if self.completion_counts.full: break
[docs] async def upload_all(self) -> None: """Upload files for all items in :attr:`legistar_data` (up to by :attr:`max_clips`) """ upload_check_sch = self.check_scheduler upload_check_limit = upload_check_sch.limit assert upload_check_limit is not None for guid in self.iter_item_guids(): await self.check_waiters.spawn(self.check_item_needs_upload(guid)) if len(self.check_waiters) >= upload_check_limit: await self.handle_upload_check_jobs() if self.completion_counts.full: break await self.handle_upload_check_jobs() if len(self.check_waiters): logger.info(f'waiting for check_waiters ({len(self.check_waiters)=})') await self.check_waiters self.completion_counts.max_items = self.completion_counts.num_queued if len(self.item_waiters): logger.info(f'waiting for waiters ({len(self.item_waiters)=})') await self.item_waiters await self.upload_data_file() logger.success('all waiters finished')
[docs] class LegistarClient(LegistarClientBase[GUID, DetailPageResult, LegistarData]): """Client to upload items from :class:`~.legistar.model.LegistarData` """ @property def upload_dir(self) -> Path: return self.config.aws.legistar_prefix @property def legistar_data_file(self) -> Path: return self.config.legistar.data_file def _load_legistar_data(self) -> LegistarData: return LegistarData.load(self.legistar_data_file) def iter_item_guids(self) -> Iterator[GUID]: yield from self.legistar_data.keys()
[docs] class RGuidLegistarClient(LegistarClientBase[REAL_GUID, RGuidDetailResult, RGuidLegistarData]): """Client to upload items from :class:`~.legistar.guid_model.RGuidLegistarData` """ @property def upload_dir(self) -> Path: return self.config.aws.legistar_rguid_prefix @property def legistar_data_file(self) -> Path: # return self.config.legistar. return RGuidLegistarData._get_data_file(self.config) def _load_legistar_data(self) -> RGuidLegistarData: return RGuidLegistarData.load(self.legistar_data_file) def iter_item_guids(self) -> Iterator[REAL_GUID]: yield from self.legistar_data.keys()
@logger.catch async def upload_clips( clips: ClipCollection, config: Config, max_clips: int, scheduler_limit: int, ) -> None: client = ClipClient( config=config, max_clips=max_clips, scheduler_limit=scheduler_limit, ) async with client: await client.upload_all(clips) @logger.catch async def upload_legistar( config: Config, max_clips: int, ) -> None: client = LegistarClient( config=config, max_clips=max_clips, ) async with client: await client.upload_all() @logger.catch async def upload_legistar_rguid( config: Config, max_clips: int, ) -> None: client = RGuidLegistarClient( config=config, max_clips=max_clips, ) async with client: await client.upload_all()