granicus_archiver.utils¶
- exception granicus_archiver.utils.HashMismatchError[source]¶
Bases:
ValueErrorRaised when a hash comparison fails
- granicus_archiver.utils.remove_pdf_links(infile: Path, outfile: Path) None[source]¶
Remove hyperlinks from a pdf file
- Parameters:
infile (Path) – The input PDF file
outfile (Path) – Output filename
- Return type:
None
- class granicus_archiver.utils.JobWaiter(job: Job[T])[source]¶
Bases:
Generic[T],Awaitable[T]Wrapper for
aiojobs.Jobto wait for its resultInstances of this class are awaitable and hashable
- Parameters:
job (Job[T])
- task: Task[T]¶
A
asyncio.Tasktoawaitthejob'swait()method
- job: Job[T]¶
The
aiojobs.Jobinstance
- class granicus_archiver.utils.JobResult(job: Job[T], result: T | NotSetType, exception: BaseException | None = None)[source]¶
Bases:
Generic[T]A completed
aiojobs.Job- Parameters:
job (Job[T])
result (T | NotSetType)
exception (BaseException | None)
- job: Job[T]¶
The job instance
- exception: BaseException | None¶
An exception, if one was encountered by the job
- class granicus_archiver.utils.JobWaiters(scheduler: Scheduler | None = None)[source]¶
Bases:
Sized,Iterable[JobWaiter[T]],Container[JobWaiter[T] |Job[T]],Awaitable[list[T]],AsyncIterable[JobResult[T]]Container for
aiojobs.Jobinstances toawaittheir resultsJobs may be awaited using the
wait()andgather()methods as well as async iteration usingasync for- Parameters:
scheduler (Scheduler | None)
- waiters: dict[Job[T], JobWaiter[T]]¶
Mapping of
aiojobs.Jobinstances to theirJobWaiter
- waiter_tasks: dict[Task[T], JobWaiter[T]]¶
Mapping of the
JobWaiter.taskfor eachJobWaiter
- scheduler: Scheduler | None¶
Optional
aiojobs.Schedulerinstance
- add(job: Job[T]) JobWaiter[source]¶
Add an existing
aiojobs.JobinstanceIf the job is already tracked, this becomes a no-op
- Parameters:
job (Job[T])
- Return type:
- async spawn(coro: Coroutine[object, object, T], name: str | None = None) Job[T][source]¶
Spawn a job using the
scheduler(if it was set)The arguments match that of
aiojobs.Scheduler.spawn()method
- discard(job_or_waiter: Job[T] | JobWaiter[T]) None[source]¶
Remove a
Job(if it is currently being tracked)- Parameters:
job_or_waiter (Job[T] | JobWaiter[T])
- Return type:
None
- async wait(return_when: Literal['FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED'] = 'FIRST_COMPLETED') tuple[list[JobResult[T]], set[Job[T]]][source]¶
Wait for the next job completion
This method is similar to
asyncio.wait(), aside from the slight difference in return type.- Returns:
done: A list of completed
JobResultinstancespending: A
setof pendingaiojobs.Jobinstances
- Return type:
(tuple)
- Parameters:
return_when (Literal['FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED'])
- async as_completed() AsyncGenerator[JobResult[T], None][source]¶
An asynchronous generator of completed jobs (wrapped as
JobResult):waiter = JobWaiters() ... async for result in waiter.as_completed(): ...
The same could be accomplished using
async foron the instance itself:waiter = JobWaiters() ... async for result in waiter: ...
- Return type:
AsyncGenerator[JobResult[T], None]
- granicus_archiver.utils.get_file_hash(hash_type: Literal['md5'], p: Path) MD5Hash[source]¶
- granicus_archiver.utils.get_file_hash(hash_type: Literal['sha1'], p: Path) SHA1Hash
- granicus_archiver.utils.get_file_hash(hash_type: Literal['sha256'], p: Path) SHA256Hash
Get the hash for the contents of a file
- Parameters:
p – The file path
hash_type – The hash type (
'md5','sha1', or'sha256')
- async granicus_archiver.utils.get_file_hash_async(hash_type: Literal['md5'], p: Path) MD5Hash[source]¶
- async granicus_archiver.utils.get_file_hash_async(hash_type: Literal['sha1'], p: Path) SHA1Hash
- async granicus_archiver.utils.get_file_hash_async(hash_type: Literal['sha256'], p: Path) SHA256Hash
Get the hash for the contents of a file asynchronously using
aiofile- Parameters:
p – The file path
hash_type – The hash type (
'md5','sha1', or'sha256')
- async granicus_archiver.utils.aio_read_iter(fd: FileIOWrapperBase, chunk_size: int = 65536, timeout_total: float | None = None, timeout_chunk: float | None = None) AsyncGenerator[str | bytes, None][source]¶
Iterate over chunked segments of a file descriptor as a asynchronous generator with optional timeouts
- Parameters:
fd (FileIOWrapperBase) – A
aiofile.utils.FileIOWrapperBase(the context manager returned when usingaiofile.utils.async_open()withasync with)chunk_size (int) – The chunk sized passed to the
aiofile.utils.FileIOWrapperBase.iter_chunked()methodtimeout_total (float | None) – Timeout to apply for the entire read operation. If not given, no timeout will be enforced.
timeout_chunk (float | None) – Timeout to apply for each chunk iteration. If not given, no tiemout will be enforced.
- Raises:
TimeoutError – If either timeout argument is supplied and its limit was reached
- Return type:
AsyncGenerator[str | bytes, None]
- class granicus_archiver.utils.CompletionCounts(max_items: int | None = None, enable_log: bool = False, log_level: int | str = 'INFO')[source]¶
Bases:
objectHelper to track item queue and completion counts
>>> counts = CompletionCounts(max_items=10) >>> counts <CompletionCounts: queued=0, completed=0, active=0, progress=0%>
>>> counts.num_queued += 4 >>> counts <CompletionCounts: queued=4, completed=0, active=4, progress=0%>
>>> counts.num_completed += 1 >>> counts <CompletionCounts: queued=4, completed=1, active=3, progress=10%> >>> counts.full False
>>> counts.num_queued += 6 >>> counts <CompletionCounts: queued=10, completed=1, active=9, progress=10%> >>> counts.full True
>>> counts.complete False >>> for i in range(9): ... counts.num_completed += 1 ... print(repr(counts)) <CompletionCounts: queued=10, completed=2, active=8, progress=20%> <CompletionCounts: queued=10, completed=3, active=7, progress=30%> <CompletionCounts: queued=10, completed=4, active=6, progress=40%> <CompletionCounts: queued=10, completed=5, active=5, progress=50%> <CompletionCounts: queued=10, completed=6, active=4, progress=60%> <CompletionCounts: queued=10, completed=7, active=3, progress=70%> <CompletionCounts: queued=10, completed=8, active=2, progress=80%> <CompletionCounts: queued=10, completed=9, active=1, progress=90%> <CompletionCounts: queued=10, completed=10, active=0, progress=100%>
>>> counts.complete True
The
progressattribute will also be updated whenevermax_itemsis changed:>>> counts = CompletionCounts(max_items=100) >>> counts.num_queued = 50 >>> counts <CompletionCounts: queued=50, completed=0, active=50, progress=0%> >>> counts.num_queued = 50 >>> counts <CompletionCounts: queued=50, completed=0, active=50, progress=0%> >>> counts.num_completed = 25 >>> counts <CompletionCounts: queued=50, completed=25, active=25, progress=25%> >>> counts.max_items = 50 >>> counts <CompletionCounts: queued=50, completed=25, active=25, progress=50%> >>> counts.num_completed = 50 >>> counts <CompletionCounts: queued=50, completed=50, active=0, progress=100%> >>> counts.complete True
- enable_log: bool¶
If
Trueany changes tonum_queuedornum_completedwill be logged
- log_level: int | str¶
The log level to use when logging changes to
num_queuedornum_completed
- property progress: int¶
Percent of items
completedversusmax_itemsNote
This will be zero if
max_itemsisNone
- property full: bool¶
Whether all items have been queued
Note
This will always be
Falseifmax_itemsisNone
- property complete: bool¶
Whether all items have been completed
Note
This will always be
Falseifmax_itemsisNone
- reset() None[source]¶
Reset all counters to zero
>>> counts = CompletionCounts(max_items=4) >>> counts <CompletionCounts: queued=0, completed=0, active=0, progress=0%>
>>> counts.num_queued = 4 >>> counts.num_completed = 2 >>> counts.full True >>> counts <CompletionCounts: queued=4, completed=2, active=2, progress=50%>
>>> counts.reset() >>> counts.full False >>> counts <CompletionCounts: queued=0, completed=0, active=0, progress=0%>
- Return type:
None