mirror of
https://github.com/zoriya/Autopipe.git
synced 2025-12-06 02:56:09 +00:00
Heavily reworking the pipeline & the error queue
This commit is contained in:
@@ -1,10 +1,12 @@
|
||||
__all__ = ["Autopipe", "main",
|
||||
"Coordinator", "Pipe", "Input", "APData",
|
||||
"Coordinator", "Pipe", "Input", "APData", "Output",
|
||||
"ArgumentError",
|
||||
"input", "output", "pipe", "coordinators"]
|
||||
|
||||
from sys import stderr
|
||||
|
||||
from .exceptions import ArgumentError
|
||||
from .models import Coordinator, Pipe, Input, APData
|
||||
from .models import Coordinator, Pipe, Input, APData, Output
|
||||
from .autopipe import Autopipe
|
||||
|
||||
version = 1.0
|
||||
@@ -28,11 +30,27 @@ def main(argv=None):
|
||||
Autopipe(args.coordinator[0], args.coordinator[1:], log_level=getattr(logging, args.log_level.upper()))
|
||||
return 0
|
||||
except ArgumentError as e:
|
||||
logging.error(str(e))
|
||||
print(str(e), file=stderr)
|
||||
if e.flag == "coordinator":
|
||||
logging.error("Available coordinators:")
|
||||
print("Available coordinators:", file=stderr)
|
||||
for coordinator in coordinators.__all__:
|
||||
logging.error(f" - {coordinator.name()}")
|
||||
print(f"\t{coordinator.name()}", file=stderr)
|
||||
if ':' in args.coordinator[0]:
|
||||
try:
|
||||
file, cls = args.coordinator[0].split(':')
|
||||
except ValueError:
|
||||
print(f"{args.coordinator[0]} is not a valid syntax. Did you meant to use file:class?", file=stderr)
|
||||
return 2
|
||||
print(f"Coordinators of ${file}:")
|
||||
module = __import__(file)
|
||||
for coordinator in module.__all__:
|
||||
print(f"\t{coordinator.name()}", file=stderr)
|
||||
else:
|
||||
print("Or you can input a file anywhere on the system with the syntax: path/to/file.py:coordinator",
|
||||
file=stderr)
|
||||
return 2
|
||||
except KeyboardInterrupt:
|
||||
print("Interrupted by user", file=stderr)
|
||||
return 2
|
||||
except Exception as ex:
|
||||
logging.error(ex, exc_info=logging.getLogger().getEffectiveLevel() <= logging.INFO)
|
||||
|
||||
@@ -1,50 +1,70 @@
|
||||
import logging
|
||||
import autopipe.coordinators as coordinators
|
||||
from typing import Callable, List, Union
|
||||
import time
|
||||
|
||||
from autopipe import APData, Coordinator, ArgumentError
|
||||
import autopipe.coordinators as coordinators
|
||||
from typing import Callable, Union
|
||||
from autopipe import APData, Coordinator, ArgumentError, Output, Pipe
|
||||
|
||||
|
||||
class Autopipe:
|
||||
def __init__(self, coordinator, coordinator_args, log_level=logging.WARNING):
|
||||
logging.basicConfig(format="%(levelname)s: %(message)s", level=log_level)
|
||||
self.handlers = []
|
||||
self.interceptors = []
|
||||
|
||||
coordinator_class = self.get_coordinator(coordinator, coordinator_args)
|
||||
coordinator_class = self.get_coordinator(coordinator)
|
||||
if coordinator_class is None:
|
||||
raise ArgumentError(f"Invalid coordinator: {coordinator}", "coordinator")
|
||||
self.coordinator = coordinator_class(*coordinator_args)
|
||||
self.pipeline = self.coordinator.get_pipeline()
|
||||
|
||||
for data in self.coordinator.get_input():
|
||||
self._process_input(self.coordinator, data)
|
||||
while True:
|
||||
self.process_coordinator()
|
||||
sleep_time = self.coordinator.get_input().loop_cooldown
|
||||
if sleep_time <= 0:
|
||||
logging.info("Input generator finished. Closing now.")
|
||||
break
|
||||
logging.info(f"Input generator finished. Starting again in {sleep_time} seconds.")
|
||||
time.sleep(sleep_time)
|
||||
|
||||
@staticmethod
|
||||
def get_coordinator(coordinator: str, args: List[str]) -> Union[Callable, None]:
|
||||
def get_coordinator(coordinator: str) -> Union[Callable, None]:
|
||||
if coordinator == "-":
|
||||
return None # TODO support reading stdin as a coordinator file.
|
||||
try:
|
||||
return getattr(coordinators, coordinator)
|
||||
except AttributeError:
|
||||
try:
|
||||
module = __import__(coordinator)
|
||||
coordinator_class = getattr(module, args[0])
|
||||
del args[0]
|
||||
return coordinator_class
|
||||
mod, cls = coordinator.split(':')
|
||||
module = __import__(mod)
|
||||
return getattr(module, cls)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def _process_input(self, coordinator: Coordinator, data: APData) -> APData:
|
||||
logging.debug(data)
|
||||
handler = next((x for x in self.handlers if x[1](data)), None)
|
||||
if handler is None:
|
||||
return coordinator.default_handler(data)
|
||||
# TODO rename handler interceptors
|
||||
# TODO use a pipe array as the base pipe selector
|
||||
# TODO allow anonymous pipe (a function instead of a pipe in the array)
|
||||
# TODO use the Output() class to end the pipeline, without this the default_handler is used for next items
|
||||
# TODO change the default_handler error to ask if the Output() was forgotten
|
||||
return handler(data)
|
||||
def process_coordinator(self):
|
||||
for data in self.coordinator.get_input():
|
||||
step = 0
|
||||
pipe = None
|
||||
while pipe is None or not isinstance(pipe, Output):
|
||||
pipe = self._process_input(self.coordinator, data, step)
|
||||
data = pipe if isinstance(pipe, APData) else pipe.pipe(data)
|
||||
step += 1
|
||||
|
||||
def pipe_handler(self, f, selector: Callable[[APData], bool]):
|
||||
self.handlers.append((f, selector))
|
||||
def _process_input(self, coordinator: Coordinator, data: APData, step: int) -> Union[APData, Pipe]:
|
||||
logging.debug(data)
|
||||
|
||||
interceptor = next((x for x in self.interceptors if x[1](data)), None)
|
||||
if interceptor:
|
||||
logging.info(f"Using interceptor: {interceptor.__name__}")
|
||||
return interceptor(data)
|
||||
|
||||
if len(self.pipeline) < step:
|
||||
if isinstance(self.pipeline[step], Pipe):
|
||||
return self.pipeline[step]
|
||||
return self.pipeline[step](data)
|
||||
|
||||
logging.info(f"Using default handler.")
|
||||
return coordinator.default_handler(data)
|
||||
|
||||
def interceptor(self, f, selector: Callable[[APData], bool]):
|
||||
self.interceptors.append((f, selector))
|
||||
return f
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import json
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Generator
|
||||
from typing import Generator, List, Union, Callable
|
||||
import logging
|
||||
|
||||
from autopipe import ArgumentError
|
||||
@@ -26,6 +25,9 @@ class Pipe(ABC):
|
||||
def pipe(self, data: APData) -> APData:
|
||||
raise NotImplementedError
|
||||
|
||||
def __call__(self, data: APData) -> APData:
|
||||
return self.pipe(data)
|
||||
|
||||
|
||||
class Input(ABC):
|
||||
def __init__(self):
|
||||
@@ -49,6 +51,35 @@ class Input(ABC):
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def __iter__(self):
|
||||
return self.generate()
|
||||
|
||||
|
||||
class Output(Pipe):
|
||||
def __init__(self, pipe: Union[Pipe, Callable[[APData], APData], APData] = None):
|
||||
super().__init__()
|
||||
if callable(pipe):
|
||||
self.pipe = pipe
|
||||
self.output = None
|
||||
else:
|
||||
self.output = pipe
|
||||
self.pipe = None
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
if self.pipe is None:
|
||||
if self.output:
|
||||
return "Static output"
|
||||
raise NotImplementedError
|
||||
return self.pipe.name
|
||||
|
||||
def pipe(self, data: APData) -> APData:
|
||||
if self.pipe is None:
|
||||
if self.output:
|
||||
return self.output
|
||||
raise NotImplementedError
|
||||
return self.pipe(data)
|
||||
|
||||
|
||||
class Coordinator(ABC):
|
||||
def __init__(self):
|
||||
@@ -60,8 +91,12 @@ class Coordinator(ABC):
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def get_input(self):
|
||||
def get_input(self) -> Input:
|
||||
raise NotImplementedError
|
||||
|
||||
def default_handler(self, data):
|
||||
raise ArgumentError(f"No default argument handler for this coordinator. Data: {json.dumps(data)}")
|
||||
@property
|
||||
def get_pipeline(self) -> List[Union[Pipe, Callable[..., APData]]]:
|
||||
return []
|
||||
|
||||
def default_handler(self, data: APData) -> APData:
|
||||
raise ArgumentError(f"No default argument handler for this coordinator, did you forget an Output() wrapper?")
|
||||
|
||||
Reference in New Issue
Block a user