Source code for being.content

"""Content manager. Manages motions inside content directory."""
import collections
import glob
import os
from collections import OrderedDict
from typing import Callable, Iterator, Optional

from being.configuration import CONFIG
from being.curve import Curve
from being.logging import get_logger
from being.pubsub import PubSub
from being.serialization import loads, dumps
from being.spline import BPoly, split_spline
from being.utils import SingleInstanceCache, read_file, rootname, write_file


CONTENT_CHANGED: str = 'CONTENT_CHANGED'
"""String literal for content changed PubSub event."""

DEFAULT_DIRECTORY: str = CONFIG['General']['CONTENT_DIRECTORY']
"""Default content directory. Taken from :obj:`being.configuration.CONFIG`."""


[docs]def stripext(p: str) -> str: """Strip file extension from path. Args: p: Input path. Returns: Input path without extension part. Example: >>> stripext('this/is/a_file.ext') 'this/is/a_file' """ root, _ = os.path.splitext(p) return root
[docs]def removeprefix(string: str, prefix: str) -> str: """Remove string prefix for Python < 3.9. If string does not start with prefix leave as is. Args: string: Input string. prefix: Prefix to remove. Returns: Trimmed string. """ if string.startswith(prefix): return string[len(prefix):] return string
[docs]def upgrade_splines_to_curves(directory, logger=None): """Go through each JSON file inside directory and upgrade every serialized spline to a curve. Args: directory: Folder to check. logger (optional): Logger to write informations to (e.g. Instance logger of calling :class:`Content` instance). """ if logger is None: logger = get_logger('upgrade_splines_to_curves()') for fp in glob.iglob(directory + '/*.json'): obj = loads(read_file(fp)) if isinstance(obj, Curve): pass elif isinstance(obj, BPoly): curve = Curve(split_spline(obj)) logger.info('Upgrading spline to curve %r', fp) write_file(fp, dumps(curve)) else: logger.warning('Do not know what to do with obj %r', obj)
[docs]class Files(collections.MutableMapping): """Wrap files inside directory on disk as dictionary. Iteration order is most recently modified. """ def __init__(self, directory: str, loads: Callable = loads, dumps: Callable = dumps): """ Args: directory: Directory to manage. loads (optional): Serialization loader function. Default is :func:`being.serialization.loads`. dumps (optional): Serialization dumper function. Default is :func:`being.serialization.dumps`. """ self.directory: str = directory """Directory to manage.""" self.loads: Callable = loads """Serialization loader function.""" self.dumps: Callable = dumps """Serialization dumper function.""" os.makedirs(self.directory, exist_ok=True) def _fullpath(self, path: str) -> str: """Resolve fullpath.""" return os.path.join(self.directory, path) def _remove_directory(self, filepath): prefix = self.directory + '/' return removeprefix(filepath, prefix) def _recently_modified(self) -> Iterator[str]: """Most recently modified filenames.""" filepaths = glob.iglob(self.directory + '/*') mostRecently = sorted(filepaths, key=os.path.getmtime, reverse=True) return map(self._remove_directory, mostRecently) def _alphabetically(self) -> Iterator[str]: """Alphabetically ordered filenames.""" filepaths = glob.iglob(self.directory + '/*') mostRecently = sorted(filepaths) return map(self._remove_directory, mostRecently) def __getitem__(self, path: str) -> Iterator[str]: fp = self._fullpath(path) return self.loads(read_file(fp)) def __setitem__(self, path: str, value: object): fp = self._fullpath(path) write_file(fp, self.dumps(value)) def __delitem__(self, path: str): fp = self._fullpath(path) os.remove(fp) def __iter__(self): return iter(self._alphabetically()) def __len__(self): return len(self._alphabetically()) def __contains__(self, path: str): # Skip __iter__ fp = self._fullpath(path) return os.path.exists(fp) def __str__(self): return '%s(directory=%r)' % (type(self).__name__, self.directory)
[docs]class Content(PubSub, SingleInstanceCache): """Content manager. For now only motions and splines. Motion name is the basename without file extension. Todo: - Extend for all kind of files (multiple subfolders). - :class:`being.utils.NestedDict` appropriate? """ def __init__(self, directory: str = DEFAULT_DIRECTORY, data: Optional[Files] = None, ext: str = '.json', ): """ Args: directory (optional): Directory to manage. Default content directory from :obj:`being.configuration.CONFIG`. data (optional): Data container. ext (optional): File extensions. name + ext = path. """ if data is None: data = Files(directory) else: directory = None super().__init__(events=[CONTENT_CHANGED]) self.directory = directory self.data = data self.ext = ext self.logger = get_logger(str(self)) if self.directory is not None: upgrade_splines_to_curves(self.directory, self.logger)
[docs] def curve_exists(self, name: str) -> bool: """Check if motion curve exists. Args: name: Curve name. Returns: If curve exists. """ return name + self.ext in self.data
[docs] def load_curve(self, name: str) -> BPoly: """Load miotion curve from disk. Args: name: Motion name. Returns: Spline """ return self.data[name + self.ext]
[docs] def save_curve(self, name: str, curve: BPoly): """Save motion curve to disk. Args: name: Curve name. curve: Motion curve to save. """ self.data[name + self.ext] = curve self.publish(CONTENT_CHANGED)
[docs] def delete_curve(self, name: str): """Delete motion curve from disk. Args: name: Curve name. """ del self.data[name + self.ext] self.publish(CONTENT_CHANGED)
[docs] def rename_curve(self, oldName: str, newName: str): """Rename motion curve. Args: oldName: Old curve name. newName: New curve name. """ oldPath = oldName + self.ext newPath = newName + self.ext self.data[newPath] = self.data.pop(oldPath) self.publish(CONTENT_CHANGED)
[docs] def find_free_name(self, wishName='Untitled'): """Find free name. Append numbers starting from 1 if name is already taken. Args: wishName: Wish name. Returns: Available version of wish name. Raises: RuntimeError: If we can not find any available name (after x tries...) """ names = set(map(rootname, self.data)) if wishName not in names: return wishName for number in range(1, 100): name = f'{wishName} {number}' if name not in names: return name raise RuntimeError('Can not find any free name!')
[docs] def list_curve_names(self) -> list: """List current curve names.""" return list(map(stripext, self.data))
[docs] def forge_message(self) -> OrderedDict: """Forge content / motions message.""" # TODO: Rename type motions -> curves ??? return OrderedDict([ ('type', 'motions'), ('curves', [ (stripext(path), motion) for path, motion in self.data.items() ]), ])
def __str__(self): return '%s(directory=%r)' % (type(self).__name__, self.directory)