Creating coordinator logics, reworking the main function

This commit is contained in:
Zoe Roux
2020-11-08 19:16:09 +01:00
parent 25585be872
commit 3e2c2846d0
11 changed files with 89 additions and 19 deletions

View File

@@ -1,4 +1,40 @@
__all__ = ["Autopipe", "main", "available_coordinators"
"Coordinator", "Pipe", "Input", "APData",
"ArgumentError",
"input", "output", "pipe", "coordinators"]
from .exceptions import ArgumentError
from .models import Coordinator, Pipe, Input, APData
from .coordinators import *
from .autopipe import Autopipe
from input import *
from output import *
from pipe import *
version = 1.0
def main(argv=None):
import sys
from autopipe import Autopipe, ArgumentError, coordinators
import logging
from argparse import ArgumentParser
parser = ArgumentParser(description="Easily run advanced pipelines in a daemon or in one run sessions.")
parser.add_argument("coordinator", help="The name of your pipeline coordinator.", nargs="+")
parser.add_argument("-V", "--version", action="version", version=f"%(prog)s {version}")
parser.add_argument("-v", "--verbose", choices=["debug", "info", "warn", "error"], nargs="?", const="info",
default="warn", dest="log_level", metavar="loglevel",
help="Set the logging level.", type=str.lower)
args = parser.parse_args(argv if argv is not None else sys.argv[1:])
try:
Autopipe(args.coordinator[0], args.coordinator[1:], log_level=getattr(logging, args.log_level.upper()))
return 0
except ArgumentError as e:
logging.error(str(e))
if e.flag == "coordinator":
logging.error("Available coordinators:")
for coordinator in available_coordinators:
logging.error(f" - {coordinator.name()}")
return 2
except Exception as ex:
logging.error(ex, exc_info=logging.getLogger().getEffectiveLevel() <= logging.INFO)
return 1

View File

@@ -1,15 +1,5 @@
#!/usr/bin/env python3
import autopipe
import logging
from argparse import ArgumentParser
if __name__ == "__main__":
parser = ArgumentParser(description="Easily run advanced pipelines in a daemon or in one run sessions.")
parser.add_argument("coordinator", help="The name of your pipeline coordinator.")
parser.add_argument("-v", "--verbose", choices=["debug", "info", "warn", "error"], nargs="?", const="info",
default="warn", dest="log_level", metavar="loglevel",
help="Set the logging level.", type=str.lower)
args = parser.parse_args()
autopipe.Autopipe(args.coordinator, log_level=getattr(logging, args.log_level.upper()))
exit(autopipe.main())

View File

@@ -1,7 +1,10 @@
import logging
from autopipe import available_coordinators, ArgumentError
class Autopipe:
def __init__(self, coordinator, log_level=logging.WARNING):
def __init__(self, coordinator, coordinator_args, log_level=logging.WARNING):
logging.basicConfig(format="%(levelname)s: %(message)s", level=log_level)
logging.info(f"Using coordinator: {coordinator}")
if coordinator not in available_coordinators:
raise ArgumentError(f"Invalid coordinator: {coordinator}", "coordinator")
self.coordinator = available_coordinators[coordinator](*coordinator_args)

View File

@@ -0,0 +1,3 @@
from .notify_example import NotifyExample
available_coordinators = [NotifyExample]

View File

@@ -0,0 +1,7 @@
from autopipe import Coordinator
class NotifyExample(Coordinator):
@classmethod
def name(cls):
return "NotifyExample"

View File

@@ -0,0 +1,7 @@
class ArgumentError(Exception):
def __init__(self, msg, flag=None):
self.msg = msg
self.flag = flag
def __str__(self):
return self.msg

View File

@@ -0,0 +1 @@
from .ArgumentError import ArgumentError

View File

@@ -0,0 +1 @@
from .rss import RssInput

View File

@@ -33,14 +33,24 @@ class Input(ABC):
raise NotImplementedError
@abstractmethod
def generate(self) -> Generator[APData]:
def generate(self) -> Generator[APData, None, None]:
raise NotImplementedError
@abstractmethod
@property
@abstractmethod
def loop_cooldown(self) -> int:
"""
If negative or 0, the input can't be chained and once the generator return the program will exit.
If grater then 0, the generator will be called again after a sleep of x seconds where x is the return of this.
"""
raise NotImplementedError
class Coordinator(ABC):
def __init__(self):
logging.info(f"Using coordinator: {self.name()}")
@classmethod
@abstractmethod
def name(cls):
raise NotImplementedError

View File

@@ -0,0 +1,2 @@
from .tee import TeePipe
from .filter_pipe import FilterPipe

View File

@@ -1 +0,0 @@
../autopipe/__main__.py

11
bin/autopipe Executable file
View File

@@ -0,0 +1,11 @@
#!/usr/bin/env python3
try:
import autopipe
except ModuleNotFoundError:
import sys
import os
sys.path.insert(0, os.path.realpath(os.path.join(os.path.dirname(__file__), "..")))
import autopipe
if __name__ == "__main__":
exit(autopipe.main())