mirror of
https://github.com/zoriya/Autopipe.git
synced 2025-12-06 02:56:09 +00:00
97 lines
2.2 KiB
Python
97 lines
2.2 KiB
Python
from abc import ABC, abstractmethod
|
|
from typing import Generator, List, Union, Callable
|
|
from autopipe import ArgumentError
|
|
import logging
|
|
|
|
|
|
class APData(ABC):
|
|
@property
|
|
@abstractmethod
|
|
def type(self):
|
|
raise NotImplementedError
|
|
|
|
|
|
class Pipe(ABC):
|
|
@property
|
|
@abstractmethod
|
|
def name(self):
|
|
raise NotImplementedError
|
|
|
|
def pipe(self, data: APData) -> APData:
|
|
logging.info(f"Entering pipe: {self.name}")
|
|
return data
|
|
|
|
def __call__(self, data: APData) -> APData:
|
|
return self.pipe(data)
|
|
|
|
|
|
class Input(ABC):
|
|
@property
|
|
@abstractmethod
|
|
def name(self):
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def generate(self) -> Generator[APData, None, None]:
|
|
raise NotImplementedError
|
|
|
|
@property
|
|
@abstractmethod
|
|
def loop_cooldown(self) -> int:
|
|
"""
|
|
If negative or 0, the input can't be chained and once the generator return the program will exit.
|
|
If grater then 0, the generator will be called again after a sleep of x seconds where x is the return of this.
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
def __iter__(self):
|
|
return self.generate()
|
|
|
|
|
|
class Output(Pipe):
|
|
def __init__(self, pipe: Union[Pipe, Callable[[APData], APData], APData] = None):
|
|
if callable(pipe):
|
|
self.pipe = pipe
|
|
self.output = None
|
|
else:
|
|
self.output = pipe
|
|
self.pipe = None
|
|
|
|
@property
|
|
def name(self):
|
|
if self.pipe is None:
|
|
if self.output:
|
|
return "Static output"
|
|
raise NotImplementedError
|
|
return self.pipe.name if isinstance(self.pipe, Pipe) else self.pipe.__name__
|
|
|
|
def pipe(self, data: APData) -> APData:
|
|
super().pipe(data)
|
|
if self.pipe is None:
|
|
if self.output:
|
|
return self.output
|
|
raise NotImplementedError
|
|
return self.pipe(data)
|
|
|
|
|
|
class Coordinator(ABC):
|
|
def __init__(self):
|
|
logging.info(f"Using coordinator: {self.name()}")
|
|
|
|
@classmethod
|
|
@abstractmethod
|
|
def name(cls):
|
|
raise NotImplementedError
|
|
|
|
@property
|
|
@abstractmethod
|
|
def input(self) -> Input:
|
|
raise NotImplementedError
|
|
|
|
@property
|
|
def pipeline(self) -> List[Union[Pipe, Callable[[APData], Union[APData, Pipe]]]]:
|
|
return []
|
|
|
|
def default_handler(self, data: APData) -> Union[Pipe, Callable[[APData], Union[APData, Pipe]]]:
|
|
raise ArgumentError(f"No default argument handler for this coordinator, did you forget an Output() wrapper?")
|