Source code for granicus_archiver.googledrive.pathtree

from __future__ import annotations
from typing import NewType, NamedTuple, Self, Iterator
from os import PathLike
from pathlib import Path

from .types import FileId, FolderCache

__all__ = ('PathNode', 'PathPart', 'NodeId', 'NodePos')

PathPart = NewType('PathPart', str)
"""Type alias used for :attr:`PathNode.part`"""

NodeId = NewType('NodeId', str)
"""Type alias used for :attr:`PathNode.id`"""


[docs] class NodePos(NamedTuple): """Position of a :class:`PathNode` within its tree """ id: NodeId """The node's :attr:`~PathNode.id`""" level: int """The node's :attr:`~PathNode.nest_level`"""
[docs] class PathNode: """Node representing one segment of a collection of filesystem paths This is primarily used to aid in creating multiple folders within Drive while avoiding costly lookups for each parent folder's :obj:`~.types.FileId`. Instead, the :attr:`folder_id` for each known node in the tree is read from the :attr:`folder_cache`. Nodes without a :attr:`folder_id` will increase the :attr:`cost` of the node itself and its ancestors. Nodes with higher :attr:`cost` can then be prioritized during folder creation, setting their :attr:`folder_id` once it is known. Standard container methods are supported (aside from ``__setitem__`` and ``__delitem__``) Example: >>> a = PathNode('a', folder_cache={}) >>> len(a) 0 >>> 'b' in a False >>> b = a.add('b') >>> len(a) 1 >>> 'b' in a True >>> a['b'] is b True >>> foo = a.add('foo') >>> len(a) 2 >>> [item for item in a] [<PathNode: "a/b">, <PathNode: "a/foo">] Arguments: part: The path :attr:`part <pathlib.PurePath.parts>` for the node parent: The node's parent (or ``None`` for the root node) folder_cache: The :obj:`~.types.FolderCache` for the entire tree. This is only required/valid for the root node. """ part: PathPart """The path :attr:`part <pathlib.PurePath.parts>`""" full_path: Path """The full path as a combination of all :attr:`parent` :attr:`parts <part>`""" parent: Self|None """The parent node (or ``None`` if this is the :attr:`root` node)""" nest_level: int """The depth of this node from its :attr:`root` (with the root node beginning at ``0``) """ children: dict[PathPart, PathNode] """Direct children of this node """ def __init__( self, part: PathPart, parent: Self|None = None, folder_cache: FolderCache|None = None ) -> None: self.part = part self.parent = parent if self.parent is None: self.full_path = Path(part) self.nest_level = 0 else: self.full_path = self.parent.full_path / part self.nest_level = self.parent.nest_level + 1 if folder_cache is None: assert self.parent is not None self._folder_cache = folder_cache self.children = {} self._folder_id = self.folder_cache.get(self.full_path) self._cost = 1 if self.folder_id is None else 0
[docs] @classmethod def create_from_paths(cls, *paths: PathLike, folder_cache: FolderCache) -> PathNode: """Create a root node from the given path(s) If multiple paths are given, they must all have a common root directory. Arguments: *paths: One or many :class:`~pathlib.Path` instances sharing a common root folder_cache: The value to set for :attr:`folder_cache` on the root node Example: >>> root = PathNode.create_from_paths('a/a', 'a/b', 'a/c', folder_cache={}) >>> root <PathNode: "a"> >>> aa = root['a'] >>> aa <PathNode: "a/a"> >>> [node for node in root.walk()] [<PathNode: "a">, <PathNode: "a/a">, <PathNode: "a/b">, <PathNode: "a/c">] """ real_paths = [Path(p) for p in paths] roots = set([p.parts[0] for p in real_paths]) if len(roots) != 1: raise ValueError('No common root path found') root_part = PathPart(roots.pop()) root = PathNode(part=root_part, folder_cache=folder_cache) for path in real_paths: root.add(path) return root
@property def id(self) -> NodeId: """Unique id for the node """ return NodeId(str(self.full_path)) @property def node_pos(self) -> NodePos: """A :class:`namedtuple <NodePos>` of :attr:`id` and :attr:`nest_level` """ return NodePos(self.id, self.nest_level) @property def root(self) -> Self: """The root of the tree """ if self.parent is None: return self return self.parent.root @property def folder_id(self) -> FileId|None: """The :obj:`~.types.FileId` of the folder matching :attr:`full_path` (if it exists) """ return self._folder_id @folder_id.setter def folder_id(self, value: FileId|None) -> None: if value == self._folder_id: return self._folder_id = value self._cost = 1 if value is None else 0 @property def cost(self) -> int: """The total number of the node's descendants with no :attr:`folder_id` (including the node itself) Example: >>> a = PathNode.create_from_paths('a/b/c', folder_cache={}) >>> b = a['b'] >>> c = b['c'] >>> nodes = [a, b, c] Each node at this point has a local "cost" of ``1`` which is added to its parent's overall cost. >>> for node in nodes: ... print(node.cost, node.folder_id, node) 3 None a 2 None a/b 1 None a/b/c >>> c.folder_id = 'c_id' The last descendant now has a folder_id which reduces the cost up the tree >>> for node in nodes: ... print(node.cost, node.folder_id, node) 2 None a 1 None a/b 0 c_id a/b/c >>> a.folder_id = 'a_id' The root node now has a folder_id, but still has the cost of its children added. >>> for node in nodes: ... print(node.cost, node.folder_id, node) 1 a_id a 1 None a/b 0 c_id a/b/c >>> b.folder_id = 'b_id' Now all nodes have folder id's with a cost of zero. >>> for node in nodes: ... print(node.cost, node.folder_id, node) 0 a_id a 0 b_id a/b 0 c_id a/b/c """ c = self._cost for child in self: c += child.cost return c @property def folder_cache(self) -> FolderCache: """:obj:`~.types.FolderCache` used to populate the :attr:`folder_id` for all nodes in the tree This is only stored on the :attr:`root` node and will persist across the tree. This allows cache updates to be re-read by all nodes. """ fc = self.root._folder_cache assert fc is not None return fc
[docs] def add(self, item: PathPart|Path) -> PathNode: """Add a node (or nodes) to *self* from the given path If the given *item* is a :class:`~pathlib.Path` instance, it must contain this node's :attr:`part` at its root. Any sub-directories will be created in the tree and the node representing the final element of the path will be returned. If *item* is a :obj:`PathPart`, a child of *self* will be added and returned with its :attr:`part` set to that value. Example: >>> from pathlib import Path >>> a = PathNode('a', folder_cache={}) >>> b = a.add('b') >>> c = b.add('c') >>> a, b, c (<PathNode: "a">, <PathNode: "a/b">, <PathNode: "a/b/c">) This fails since the root is not included >>> foobar = a.add(Path('foo') / 'bar') Traceback (most recent call last): ... ValueError: path "foo/bar" is not relative to "a" >>> foobar = a.add(Path('a') / 'foo' / 'bar') >>> foobar <PathNode: "a/foo/bar"> """ if isinstance(item, Path): parts = [PathPart(p) for p in item.parts] if parts[0] != self.part: raise ValueError(f'path "{item}" is not relative to "{self.full_path}"') if len(parts) == 1: return self parts = parts[1:] child = self.get(parts[0]) if child is None: child = self.add(parts[0]) return child.add(Path(*parts)) if item in self: return self[item] child = PathNode(item, parent=self) self.children[item] = child return child
[docs] def find(self, path: PathLike|NodeId) -> PathNode|None: """Find a node with its :attr:`full_path` matching the given path The search will include *self* and all of its descendants. If no match is found, ``None`` is returned. Example: >>> a = PathNode('a', folder_cache={}) >>> b = a.add('b') >>> c = b.add('c') >>> a.find('a/b') is b True >>> a.find('a/b/c') is c True """ if not isinstance(path, Path): path = Path(path) for node in self.walk(): if node.full_path == path: return node
[docs] def update_from_cache(self) -> None: """Update from the :attr:`folder_cache` for this node and all of its descendants This method should be called after any updates to the cache are known to have occurred. The :attr:`path <node_path>` for each node is searched for on the cache and if found, sets the :attr:`folder_id`. This in turn will update the :attr:`cost` attribute for each affected node in the tree. Example: >>> folder_cache = {} >>> a = PathNode.create_from_paths('a/b/c', folder_cache=folder_cache) >>> b = a['b'] >>> c = b['c'] >>> nodes = [a, b, c] >>> for node in nodes: ... print(node.cost, node.folder_id, node) 3 None a 2 None a/b 1 None a/b/c Set the cached id for Path('a/b/c') then update the tree >>> folder_cache[Path('a/b/c')] = 'c_id' >>> a.update_from_cache() >>> for node in nodes: ... print(node.cost, node.folder_id, node) 2 None a 1 None a/b 0 c_id a/b/c """ self.folder_id = self.folder_cache.get(self.full_path) for child in self: child.update_from_cache()
[docs] def count(self) -> int: """Get the total number of descendants at this point in the tree (including *self*) Example: >>> a = PathNode.create_from_paths('a/b/c', folder_cache={}) >>> a.count() 3 >>> foo = a.add('foo') >>> a.count() 4 >>> bar = foo.add('bar') >>> foo.count() 2 >>> a.count() 5 """ i = 1 for child in self: i += child.count() return i
[docs] def get(self, key: PathPart) -> PathNode|None: """Get the child of *self* matching :attr:`part` If not found, ``None`` is returned. """ return self.children.get(key)
def __getitem__(self, key: PathPart) -> PathNode: return self.children[key] def __contains__(self, key: PathPart) -> bool: return key in self.children def __len__(self) -> int: return len(self.children) def __iter__(self) -> Iterator[PathNode]: yield from self.children.values() def walk_non_cached(self, breadth_first: bool = False) -> Iterator[PathNode]: for node in self.walk(breadth_first=breadth_first): if node.folder_id is None: yield node
[docs] def walk(self, breadth_first: bool = False) -> Iterator[PathNode]: """Recursively iterate over all descendants from this point in the tree If *breadth_first* is ``True``, the child nodes for each :attr:`nest_level` will be iterated before advancing deeper in the tree. If *breadth_first* is ``False`` (default), all descendants will be iterated to their deepest :attr:`nest_level` before advancing to the next branch (depth-first). Example: >>> paths = ['a/a/a', 'a/a/b', 'a/b/a', 'a/b/b', 'a/c'] >>> root = PathNode.create_from_paths(*paths, folder_cache={}) >>> for node in root.walk(): ... print(node) a a/a a/a/a a/a/b a/b a/b/a a/b/b a/c >>> for node in root.walk(breadth_first=True): ... print(node) a a/a a/b a/c a/a/a a/a/b a/b/a a/b/b """ def get_breadth_iters(node: PathNode) -> dict[NodePos, PathNode]: node_iters: dict[NodePos, PathNode] = {} node_iters[node.node_pos] = node for child in node: node_iters.update(get_breadth_iters(child)) return node_iters if breadth_first: node_iters = get_breadth_iters(self) cur_level = self.nest_level keys = list(node_iters.keys()) while len(keys): lvl_keys = [key for key in keys if key.level == cur_level] if not len(lvl_keys): assert not len(keys) for key in lvl_keys: node = node_iters[key] yield node keys.remove(key) cur_level += 1 else: yield self for child in self: yield from child.walk()
def __repr__(self) -> str: return f'<PathNode: "{self}">' def __str__(self) -> str: return str(self.full_path)