From cf3fc9ef488b61b461e81d27ddb0951a40d88ff1 Mon Sep 17 00:00:00 2001 From: Zoe Roux Date: Thu, 12 Nov 2020 23:15:13 +0100 Subject: [PATCH] Creating a good coordinator loader & a pipeline selector --- autopipe/__init__.py | 5 +-- autopipe/autopipe.py | 38 +++++++++++++++- autopipe/coordinators/__init__.py | 9 +++- autopipe/coordinators/notify_example.py | 9 ++-- autopipe/input/rss.py | 8 ++-- autopipe/models.py | 15 ++++--- autopipe/pipe/__init__.py | 1 + autopipe/pipe/downloader.py | 58 +++++++++++++++++++++++++ 8 files changed, 124 insertions(+), 19 deletions(-) create mode 100644 autopipe/pipe/downloader.py diff --git a/autopipe/__init__.py b/autopipe/__init__.py index 531965c..c5f5393 100644 --- a/autopipe/__init__.py +++ b/autopipe/__init__.py @@ -1,11 +1,10 @@ -__all__ = ["Autopipe", "main", "available_coordinators" +__all__ = ["Autopipe", "main", "Coordinator", "Pipe", "Input", "APData", "ArgumentError", "input", "output", "pipe", "coordinators"] from .exceptions import ArgumentError from .models import Coordinator, Pipe, Input, APData -from .coordinators import * from .autopipe import Autopipe version = 1.0 @@ -32,7 +31,7 @@ def main(argv=None): logging.error(str(e)) if e.flag == "coordinator": logging.error("Available coordinators:") - for coordinator in available_coordinators: + for coordinator in coordinators.__all__: logging.error(f" - {coordinator.name()}") return 2 except Exception as ex: diff --git a/autopipe/autopipe.py b/autopipe/autopipe.py index 0f1ce76..0e9fa65 100644 --- a/autopipe/autopipe.py +++ b/autopipe/autopipe.py @@ -1,11 +1,45 @@ import logging -from autopipe import available_coordinators, ArgumentError +import autopipe.coordinators as coordinators +from typing import Callable, List, Union + +from autopipe import APData, Coordinator, ArgumentError class Autopipe: def __init__(self, coordinator, coordinator_args, log_level=logging.WARNING): logging.basicConfig(format="%(levelname)s: %(message)s", level=log_level) - coordinator_class = next((i for i in available_coordinators if i.name() == coordinator), None) + self.handlers = [] + + coordinator_class = self.get_coordinator(coordinator, coordinator_args) if coordinator_class is None: raise ArgumentError(f"Invalid coordinator: {coordinator}", "coordinator") self.coordinator = coordinator_class(*coordinator_args) + + for data in self.coordinator.get_input(): + self._process_input(self.coordinator, data) + + @staticmethod + def get_coordinator(coordinator: str, args: List[str]) -> Union[Callable, None]: + if coordinator == "-": + return None # TODO support reading stdin as a coordinator file. + try: + return getattr(coordinators, coordinator) + except AttributeError: + try: + module = __import__(coordinator) + coordinator_class = getattr(module, args[0]) + del args[0] + return coordinator_class + except Exception: + return None + + def _process_input(self, coordinator: Coordinator, data: APData) -> APData: + logging.debug(data) + handler = next((x for x in self.handlers if x[1](data)), None) + if handler is None: + return coordinator.default_handler(data) + return handler(data) + + def pipe_handler(self, f, selector: Callable[[APData], bool]): + self.handlers.append((f, selector)) + return f diff --git a/autopipe/coordinators/__init__.py b/autopipe/coordinators/__init__.py index 75af45e..4c0936f 100644 --- a/autopipe/coordinators/__init__.py +++ b/autopipe/coordinators/__init__.py @@ -1,3 +1,10 @@ from .notify_example import NotifyExample -available_coordinators = [NotifyExample] +__all__ = [NotifyExample] + + +def __getattr__(name): + obj = next((x for x in __all__ if x.name().casefold() == name.casefold()), None) + if obj is None: + raise AttributeError(f"No coordinator found with the name: {name}.") + return obj diff --git a/autopipe/coordinators/notify_example.py b/autopipe/coordinators/notify_example.py index ba7e971..aba6a21 100644 --- a/autopipe/coordinators/notify_example.py +++ b/autopipe/coordinators/notify_example.py @@ -1,16 +1,17 @@ from autopipe import Coordinator from autopipe.input import RssInput +from autopipe.pipe import FileData class NotifyExample(Coordinator): - def __init__(self, url, mapper): + def __init__(self, query): super().__init__() - self.url = url - self.mapper = mapper + self.query = query @classmethod def name(cls): return "NotifyExample" def get_input(self): - return RssInput(self.url, self.mapper) + return RssInput(f"http://www.obsrv.com/General/ImageFeed.aspx?{self.query if self.query else 'raccoon'}", + lambda x: FileData(x.title, x["media:content"], True)) diff --git a/autopipe/input/rss.py b/autopipe/input/rss.py index a9bd2f3..ebf818f 100644 --- a/autopipe/input/rss.py +++ b/autopipe/input/rss.py @@ -1,12 +1,12 @@ import logging from datetime import datetime -from typing import Generator, Callable -from models import Input, APData +from typing import Generator, Callable, List +from autopipe import Input, APData import feedparser class RssInput(Input): - def __init__(self, url: str, mapper: Callable[[[]], APData], start_from_now: bool = True): + def __init__(self, url: str, mapper: Callable[[List], APData], start_from_now: bool = True): super().__init__() self.url = url self.mapper = mapper @@ -17,7 +17,7 @@ class RssInput(Input): def name(self): return "Rss" - def generate(self) -> Generator[APData]: + def generate(self) -> Generator[APData, None, None]: logging.debug(f"Pulling the rss feed at {self.url}, last etag: {self.last_etag}, modif: {self.last_modified}") feed = feedparser.parse(self.url, etag=self.last_etag, modified=self.last_modified) if feed.status != 304: diff --git a/autopipe/models.py b/autopipe/models.py index 63a8a5e..38da50c 100644 --- a/autopipe/models.py +++ b/autopipe/models.py @@ -1,12 +1,16 @@ +import json from abc import ABC, abstractmethod from typing import Generator import logging +from autopipe import ArgumentError -class APData: - def __init__(self): - self.value = None - self.type = None + +class APData(ABC): + @property + @abstractmethod + def type(self): + raise NotImplementedError class Pipe(ABC): @@ -59,4 +63,5 @@ class Coordinator(ABC): def get_input(self): raise NotImplementedError - + def default_handler(self, data): + raise ArgumentError(f"No default argument handler for this coordinator. Data: {json.dumps(data)}") diff --git a/autopipe/pipe/__init__.py b/autopipe/pipe/__init__.py index 5d86fab..f826df3 100644 --- a/autopipe/pipe/__init__.py +++ b/autopipe/pipe/__init__.py @@ -1,2 +1,3 @@ from .tee import TeePipe from .filter_pipe import FilterPipe +from .downloader import DownloaderPipe, FileData diff --git a/autopipe/pipe/downloader.py b/autopipe/pipe/downloader.py new file mode 100644 index 0000000..6446b81 --- /dev/null +++ b/autopipe/pipe/downloader.py @@ -0,0 +1,58 @@ +import logging + +from autopipe import Pipe, APData + + +class FileData(APData): + def __init__(self, name, link, is_locale=True): + self.name = name + self.link = link + self.is_local = is_locale + + @property + def type(self): + return "File" + + +class DownloaderPipe(Pipe): + @property + def name(self): + return "Downloader" + + def pipe(self, data: FileData) -> FileData: + if data.is_local: + return data + if not force_refresh and os.path.isfile(path): + if not read: + return + with open(path, "r") as f: + return StringIO(f.read()) + + if message: + print(message) + r = requests.get(url, stream=progress) + try: + Path(os.path.dirname(path)).mkdir(parents=True, exist_ok=True) + with open(path, "wb") as f: + length = r.headers.get("content-length") + if progress and length: + local = 0 + length = int(length) + for chunk in r.iter_content(chunk_size=4096): + f.write(chunk) + local += len(chunk) + per = 50 * local // length + print(f"\r [{'#' * per}{'-' * (50 - per)}] ({sizeof_fmt(local)}/{sizeof_fmt(length)}) \r", + end='', flush=True) + else: + f.write(r.content) + if read: + return StringIO(r.content.decode(encoding)) + except KeyboardInterrupt: + os.remove(path) + if progress: + print() + print("Download cancelled") + raise + data.is_local = True + return data