granicus_archiver.utils

exception granicus_archiver.utils.HashMismatchError[source]

Bases: ValueError

Raised when a hash comparison fails

Remove hyperlinks from a pdf file

Parameters:
  • infile (Path) – The input PDF file

  • outfile (Path) – Output filename

Return type:

None

class granicus_archiver.utils.JobWaiter(job: Job[T])[source]

Bases: Generic[T], Awaitable[T]

Wrapper for aiojobs.Job to wait for its result

Instances of this class are awaitable and hashable

Parameters:

job (Job[T])

task: Task[T]

A asyncio.Task to await the job's wait() method

job: Job[T]

The aiojobs.Job instance

class granicus_archiver.utils.JobResult(job: Job[T], result: T | NotSetType, exception: BaseException | None = None)[source]

Bases: Generic[T]

A completed aiojobs.Job

Parameters:
  • job (Job[T])

  • result (T | NotSetType)

  • exception (BaseException | None)

job: Job[T]

The job instance

exception: BaseException | None

An exception, if one was encountered by the job

raise_exc() None[source]

Raise the exception if it exists

Return type:

None

class granicus_archiver.utils.JobWaiters(scheduler: Scheduler | None = None)[source]

Bases: Sized, Iterable[JobWaiter[T]], Container[JobWaiter[T] | Job[T]], Awaitable[list[T]], AsyncIterable[JobResult[T]]

Container for aiojobs.Job instances to await their results

Jobs may be awaited using the wait() and gather() methods as well as async iteration using async for

Parameters:

scheduler (Scheduler | None)

jobs: set[JobWaiter[T]]

All currently tracked jobs wrapped in JobWaiter instances

waiters: dict[Job[T], JobWaiter[T]]

Mapping of aiojobs.Job instances to their JobWaiter

waiter_tasks: dict[Task[T], JobWaiter[T]]

Mapping of the JobWaiter.task for each JobWaiter

scheduler: Scheduler | None

Optional aiojobs.Scheduler instance

add(job: Job[T]) JobWaiter[source]

Add an existing aiojobs.Job instance

If the job is already tracked, this becomes a no-op

Parameters:

job (Job[T])

Return type:

JobWaiter

async spawn(coro: Coroutine[object, object, T], name: str | None = None) Job[T][source]

Spawn a job using the scheduler (if it was set)

The arguments match that of aiojobs.Scheduler.spawn() method

Parameters:
Return type:

Job[T]

discard(job_or_waiter: Job[T] | JobWaiter[T]) None[source]

Remove a Job (if it is currently being tracked)

Parameters:

job_or_waiter (Job[T] | JobWaiter[T])

Return type:

None

clear() None[source]

Clear all tracked jobs

Return type:

None

async wait(return_when: Literal['FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED'] = 'FIRST_COMPLETED') tuple[list[JobResult[T]], set[Job[T]]][source]

Wait for the next job completion

This method is similar to asyncio.wait(), aside from the slight difference in return type.

Returns:

Return type:

(tuple)

Parameters:

return_when (Literal['FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED'])

async as_completed() AsyncGenerator[JobResult[T], None][source]

An asynchronous generator of completed jobs (wrapped as JobResult):

waiter = JobWaiters()
...
async for result in waiter.as_completed():
    ...

The same could be accomplished using async for on the instance itself:

waiter = JobWaiters()
...
async for result in waiter:
    ...
Return type:

AsyncGenerator[JobResult[T], None]

async gather() list[T][source]

Wait for completion of all jobs and return their results as a list

The same could be accomplished by awaiting the instance directly:

waiter = JobWaiters()
...
results = await waiter
Return type:

list[T]

async close() None[source]

Closes the scheduler (if set)

Return type:

None

granicus_archiver.utils.get_file_hash(hash_type: Literal['md5'], p: Path) MD5Hash[source]
granicus_archiver.utils.get_file_hash(hash_type: Literal['sha1'], p: Path) SHA1Hash
granicus_archiver.utils.get_file_hash(hash_type: Literal['sha256'], p: Path) SHA256Hash

Get the hash for the contents of a file

Parameters:
  • p – The file path

  • hash_type – The hash type ('md5', 'sha1', or 'sha256')

async granicus_archiver.utils.get_file_hash_async(hash_type: Literal['md5'], p: Path) MD5Hash[source]
async granicus_archiver.utils.get_file_hash_async(hash_type: Literal['sha1'], p: Path) SHA1Hash
async granicus_archiver.utils.get_file_hash_async(hash_type: Literal['sha256'], p: Path) SHA256Hash

Get the hash for the contents of a file asynchronously using aiofile

Parameters:
  • p – The file path

  • hash_type – The hash type ('md5', 'sha1', or 'sha256')

granicus_archiver.utils.seconds_to_time_str(seconds: int) str[source]

Format seconds as HH:MM:SS

Parameters:

seconds (int)

Return type:

str

async granicus_archiver.utils.aio_read_iter(fd: FileIOWrapperBase, chunk_size: int = 65536, timeout_total: float | None = None, timeout_chunk: float | None = None) AsyncGenerator[str | bytes, None][source]

Iterate over chunked segments of a file descriptor as a asynchronous generator with optional timeouts

Parameters:
  • fd (FileIOWrapperBase) – A aiofile.utils.FileIOWrapperBase (the context manager returned when using aiofile.utils.async_open() with async with)

  • chunk_size (int) – The chunk sized passed to the aiofile.utils.FileIOWrapperBase.iter_chunked() method

  • timeout_total (float | None) – Timeout to apply for the entire read operation. If not given, no timeout will be enforced.

  • timeout_chunk (float | None) – Timeout to apply for each chunk iteration. If not given, no tiemout will be enforced.

Raises:

TimeoutError – If either timeout argument is supplied and its limit was reached

Return type:

AsyncGenerator[str | bytes, None]

class granicus_archiver.utils.CompletionCounts(max_items: int | None = None, enable_log: bool = False, log_level: int | str = 'INFO')[source]

Bases: object

Helper to track item queue and completion counts

>>> counts = CompletionCounts(max_items=10)
>>> counts
<CompletionCounts: queued=0, completed=0, active=0, progress=0%>
>>> counts.num_queued += 4
>>> counts
<CompletionCounts: queued=4, completed=0, active=4, progress=0%>
>>> counts.num_completed += 1
>>> counts
<CompletionCounts: queued=4, completed=1, active=3, progress=10%>
>>> counts.full
False
>>> counts.num_queued += 6
>>> counts
<CompletionCounts: queued=10, completed=1, active=9, progress=10%>
>>> counts.full
True
>>> counts.complete
False
>>> for i in range(9):
...     counts.num_completed += 1
...     print(repr(counts))
<CompletionCounts: queued=10, completed=2, active=8, progress=20%>
<CompletionCounts: queued=10, completed=3, active=7, progress=30%>
<CompletionCounts: queued=10, completed=4, active=6, progress=40%>
<CompletionCounts: queued=10, completed=5, active=5, progress=50%>
<CompletionCounts: queued=10, completed=6, active=4, progress=60%>
<CompletionCounts: queued=10, completed=7, active=3, progress=70%>
<CompletionCounts: queued=10, completed=8, active=2, progress=80%>
<CompletionCounts: queued=10, completed=9, active=1, progress=90%>
<CompletionCounts: queued=10, completed=10, active=0, progress=100%>
>>> counts.complete
True

The progress attribute will also be updated whenever max_items is changed:

>>> counts = CompletionCounts(max_items=100)
>>> counts.num_queued = 50
>>> counts
<CompletionCounts: queued=50, completed=0, active=50, progress=0%>
>>> counts.num_queued = 50
>>> counts
<CompletionCounts: queued=50, completed=0, active=50, progress=0%>
>>> counts.num_completed = 25
>>> counts
<CompletionCounts: queued=50, completed=25, active=25, progress=25%>
>>> counts.max_items = 50
>>> counts
<CompletionCounts: queued=50, completed=25, active=25, progress=50%>
>>> counts.num_completed = 50
>>> counts
<CompletionCounts: queued=50, completed=50, active=0, progress=100%>
>>> counts.complete
True
Parameters:
enable_log: bool

If True any changes to num_queued or num_completed will be logged

log_level: int | str

The log level to use when logging changes to num_queued or num_completed

property max_items: int | None

Maximum number of items

property num_queued: int

Number of items that have been queued

property num_completed: int

Number of items that have been completed

property num_active: int

Number of active items (num_queued - num_completed)

property progress: int

Percent of items completed versus max_items

Note

This will be zero if max_items is None

property full: bool

Whether all items have been queued

Note

This will always be False if max_items is None

property complete: bool

Whether all items have been completed

Note

This will always be False if max_items is None

reset() None[source]

Reset all counters to zero

>>> counts = CompletionCounts(max_items=4)
>>> counts
<CompletionCounts: queued=0, completed=0, active=0, progress=0%>
>>> counts.num_queued = 4
>>> counts.num_completed = 2
>>> counts.full
True
>>> counts
<CompletionCounts: queued=4, completed=2, active=2, progress=50%>
>>> counts.reset()
>>> counts.full
False
>>> counts
<CompletionCounts: queued=0, completed=0, active=0, progress=0%>
Return type:

None