diff --git a/autopipe/__init__.py b/autopipe/__init__.py index cea28e9..dc514f8 100644 --- a/autopipe/__init__.py +++ b/autopipe/__init__.py @@ -20,16 +20,20 @@ def main(argv=None): from argparse import ArgumentParser parser = ArgumentParser(description="Easily run advanced pipelines in a daemon or in one run sessions.") - parser.add_argument("coordinator", help="The name of your pipeline coordinator.", nargs="+", dest="coord") + parser.add_argument("coordinator", help="The name of your pipeline coordinator.", nargs="+") parser.add_argument("-V", "--version", action="version", version=f"%(prog)s {version}") parser.add_argument("-v", "--verbose", choices=["debug", "info", "warn", "error"], nargs="?", const="info", default="warn", dest="log_level", metavar="loglevel", help="Set the logging level.", type=str.lower) + parser.add_argument("-d", "--daemon", help="Enable the daemon mode (rerun input generators after a sleep cooldown", + action="store_true") args = parser.parse_args(argv if argv is not None else sys.argv[1:]) try: global autopipe - autopipe = Autopipe(args.coord[0], args.coord[1:], log_level=getattr(logging, args.log_level.upper())) + autopipe = Autopipe(args.coordinator[0], args.coordinator[1:], + log_level=getattr(logging, args.log_level.upper()), + daemon=args.daemon) return 0 except ArgumentError as e: print(str(e), file=stderr) diff --git a/autopipe/autopipe.py b/autopipe/autopipe.py index 1180763..c409ba3 100644 --- a/autopipe/autopipe.py +++ b/autopipe/autopipe.py @@ -7,7 +7,9 @@ from autopipe import APData, Coordinator, ArgumentError, Output, Pipe class Autopipe: - def __init__(self, coordinator, coordinator_args, log_level=logging.WARNING): + def __init__(self, coordinator, coordinator_args, + log_level=logging.WARNING, + daemon=False): logging.basicConfig(format="%(levelname)s: %(message)s", level=log_level) self.interceptors = [] @@ -21,7 +23,7 @@ class Autopipe: while True: self.process_coordinator() sleep_time = self.coordinator.get_input().loop_cooldown - if sleep_time <= 0: + if sleep_time <= 0 or not daemon: logging.info("Input generator finished. Closing now.") break logging.info(f"Input generator finished. Starting again in {sleep_time} seconds.") diff --git a/autopipe/coordinators/download_example.py b/autopipe/coordinators/download_example.py index 8620f3c..a5538bf 100644 --- a/autopipe/coordinators/download_example.py +++ b/autopipe/coordinators/download_example.py @@ -11,7 +11,7 @@ class DownloadExample(Coordinator): @classmethod def name(cls): - return "NotifyExample" + return "DownloadExample" @property def pipeline(self) -> List[Union[Pipe, Callable[..., APData]]]: diff --git a/autopipe/models.py b/autopipe/models.py index 4361adf..5f370d2 100644 --- a/autopipe/models.py +++ b/autopipe/models.py @@ -13,17 +13,14 @@ class APData(ABC): class Pipe(ABC): - def __init__(self): - logging.info(f"Entering pipe: {self.name}") - @property @abstractmethod def name(self): raise NotImplementedError - @abstractmethod def pipe(self, data: APData) -> APData: - raise NotImplementedError + logging.info(f"Entering pipe: {self.name}") + return data def __call__(self, data: APData) -> APData: return self.pipe(data) @@ -57,7 +54,6 @@ class Input(ABC): class Output(Pipe): def __init__(self, pipe: Union[Pipe, Callable[[APData], APData], APData] = None): - super().__init__() if callable(pipe): self.pipe = pipe self.output = None @@ -71,9 +67,10 @@ class Output(Pipe): if self.output: return "Static output" raise NotImplementedError - return self.pipe.name + return self.pipe.name if isinstance(self.pipe, Pipe) else self.pipe.__name__ def pipe(self, data: APData) -> APData: + super().pipe(data) if self.pipe is None: if self.output: return self.output diff --git a/autopipe/pipe/downloader.py b/autopipe/pipe/downloader.py index 6446b81..530f7a1 100644 --- a/autopipe/pipe/downloader.py +++ b/autopipe/pipe/downloader.py @@ -20,39 +20,40 @@ class DownloaderPipe(Pipe): return "Downloader" def pipe(self, data: FileData) -> FileData: + super().pipe(data) if data.is_local: return data - if not force_refresh and os.path.isfile(path): - if not read: - return - with open(path, "r") as f: - return StringIO(f.read()) - - if message: - print(message) - r = requests.get(url, stream=progress) - try: - Path(os.path.dirname(path)).mkdir(parents=True, exist_ok=True) - with open(path, "wb") as f: - length = r.headers.get("content-length") - if progress and length: - local = 0 - length = int(length) - for chunk in r.iter_content(chunk_size=4096): - f.write(chunk) - local += len(chunk) - per = 50 * local // length - print(f"\r [{'#' * per}{'-' * (50 - per)}] ({sizeof_fmt(local)}/{sizeof_fmt(length)}) \r", - end='', flush=True) - else: - f.write(r.content) - if read: - return StringIO(r.content.decode(encoding)) - except KeyboardInterrupt: - os.remove(path) - if progress: - print() - print("Download cancelled") - raise + # if not force_refresh and os.path.isfile(path): + # if not read: + # return + # with open(path, "r") as f: + # return StringIO(f.read()) + # + # if message: + # print(message) + # r = requests.get(url, stream=progress) + # try: + # Path(os.path.dirname(path)).mkdir(parents=True, exist_ok=True) + # with open(path, "wb") as f: + # length = r.headers.get("content-length") + # if progress and length: + # local = 0 + # length = int(length) + # for chunk in r.iter_content(chunk_size=4096): + # f.write(chunk) + # local += len(chunk) + # per = 50 * local // length + # print(f"\r [{'#' * per}{'-' * (50 - per)}] ({sizeof_fmt(local)}/{sizeof_fmt(length)}) \r", + # end='', flush=True) + # else: + # f.write(r.content) + # if read: + # return StringIO(r.content.decode(encoding)) + # except KeyboardInterrupt: + # os.remove(path) + # if progress: + # print() + # print("Download cancelled") + # raise data.is_local = True return data diff --git a/autopipe/pipe/filter_pipe.py b/autopipe/pipe/filter_pipe.py index 6651613..9208b8f 100644 --- a/autopipe/pipe/filter_pipe.py +++ b/autopipe/pipe/filter_pipe.py @@ -4,7 +4,6 @@ from autopipe.models import Pipe, APData class FilterPipe(Pipe): def __init__(self, filter: Callable[[APData], bool]): - super().__init__() self.filter = filter @property @@ -12,4 +11,5 @@ class FilterPipe(Pipe): return "Filter" def pipe(self, data: APData) -> APData: + super().pipe(data) return data if self.filter(data) else None diff --git a/autopipe/pipe/tee.py b/autopipe/pipe/tee.py index 65402a5..572c1d6 100644 --- a/autopipe/pipe/tee.py +++ b/autopipe/pipe/tee.py @@ -3,7 +3,6 @@ from autopipe.models import Pipe, APData class TeePipe(Pipe): def __init__(self, *outputs): - super().__init__() self.outputs = outputs @property @@ -11,6 +10,7 @@ class TeePipe(Pipe): return "Tee" def pipe(self, data: APData) -> APData: + super().pipe(data) for output in self.outputs: output.pipe(data) return data