Adding a trace log level, cleaning up

This commit is contained in:
Zoe Roux
2020-11-15 01:32:03 +01:00
parent 8e981861e2
commit d3dcb3b405
7 changed files with 102 additions and 45 deletions
+32 -34
View File
@@ -1,10 +1,9 @@
__all__ = ["Autopipe", "main", __all__ = ["Autopipe", "main",
"Coordinator", "Pipe", "Input", "APData", "Output", "LogLevel", "Coordinator", "Pipe", "Input", "APData", "Output",
"ArgumentError", "ArgumentError",
"input", "output", "pipe", "coordinators"] "input", "output", "pipe", "coordinators"]
from sys import stderr from .logging import LogLevel
from .exceptions import ArgumentError from .exceptions import ArgumentError
from .models import Coordinator, Pipe, Input, APData, Output from .models import Coordinator, Pipe, Input, APData, Output
from .autopipe import Autopipe from .autopipe import Autopipe
@@ -13,47 +12,46 @@ version = 1.0
autopipe: Autopipe autopipe: Autopipe
def main(argv=None): def _parse_args(argv=None):
import sys from sys import argv as sysargv
from autopipe import Autopipe, ArgumentError, coordinators from argparse import ArgumentParser, HelpFormatter
import logging
from argparse import ArgumentParser
parser = ArgumentParser(description="Easily run advanced pipelines in a daemon or in one run sessions.") class CustomHelpFormatter(HelpFormatter):
# noinspection PyProtectedMember
def _format_action_invocation(self, action):
if not action.option_strings or action.nargs == 0:
return super()._format_action_invocation(action)
default = self._get_default_metavar_for_optional(action)
args_string = self._format_args(action, default)
return ', '.join(action.option_strings) + ' ' + args_string
# noinspection PyTypeChecker
parser = ArgumentParser(description="Easily run advanced pipelines in a daemon or in one run sessions.",
formatter_class=CustomHelpFormatter)
parser.add_argument("coordinator", help="The name of your pipeline coordinator.", nargs="+") parser.add_argument("coordinator", help="The name of your pipeline coordinator.", nargs="+")
parser.add_argument("-V", "--version", action="version", version=f"%(prog)s {version}") parser.add_argument("-V", "--version", action="version", version=f"%(prog)s {version}")
parser.add_argument("-v", "--verbose", choices=["debug", "info", "warn", "error"], nargs="?", const="info", parser.add_argument("-v", "--verbose", choices=list(LogLevel), nargs="?",
default="warn", dest="log_level", metavar="loglevel", const="info", default="warn", dest="level", metavar="lvl",
help="Set the logging level.", type=str.lower) help="Set the logging level. (default: warn ; available: %(choices)s)", type=LogLevel.parse)
parser.add_argument("-d", "--daemon", help="Enable the daemon mode (rerun input generators after a sleep cooldown", parser.add_argument("-d", "--daemon", help="Enable the daemon mode (rerun input generators after a sleep cooldown)",
action="store_true") action="store_true")
args = parser.parse_args(argv if argv is not None else sys.argv[1:]) return parser.parse_args(argv if argv is not None else sysargv[1:])
def main(argv=None):
from sys import stderr
from autopipe import Autopipe, ArgumentError, coordinators
import logging
args = _parse_args(argv)
try: try:
global autopipe global autopipe
autopipe = Autopipe(args.coordinator[0], args.coordinator[1:], autopipe = Autopipe(args.coordinator[0], args.coordinator[1:], log_level=args.level, daemon=args.daemon)
log_level=getattr(logging, args.log_level.upper()),
daemon=args.daemon)
return 0 return 0
except ArgumentError as e: except ArgumentError as e:
print(str(e), file=stderr) print(str(e), file=stderr)
if e.flag == "coordinator": if e.flag is not None:
print("Available coordinators:", file=stderr) e.print_more(args)
for coordinator in coordinators.__all__:
print(f"\t{coordinator.name()}", file=stderr)
if ':' in args.coordinator[0]:
try:
file, cls = args.coordinator[0].split(':')
except ValueError:
print(f"{args.coordinator[0]} is not a valid syntax. Did you meant to use file:class?", file=stderr)
return 2
print(f"Coordinators of ${file}:")
module = __import__(file)
for coordinator in module.__all__:
print(f"\t{coordinator.name()}", file=stderr)
else:
print("Or you can input a file anywhere on the system with the syntax: path/to/file.py:coordinator",
file=stderr)
return 2 return 2
except KeyboardInterrupt: except KeyboardInterrupt:
print("Interrupted by user", file=stderr) print("Interrupted by user", file=stderr)
+8 -7
View File
@@ -1,16 +1,17 @@
import json
import logging import logging
import time import time
import autopipe.coordinators as coordinators import autopipe.coordinators as coordinators
from typing import Callable, Union from typing import Callable, Union, List
from autopipe import APData, Coordinator, ArgumentError, Output, Pipe from autopipe import APData, Coordinator, ArgumentError, Output, Pipe, LogLevel
class Autopipe: class Autopipe:
def __init__(self, coordinator, coordinator_args, def __init__(self, coordinator: str, coordinator_args: List[str],
log_level=logging.WARNING, log_level: LogLevel = LogLevel.WARN,
daemon=False): daemon: bool = False):
logging.basicConfig(format="%(levelname)s: %(message)s", level=log_level) logging.basicConfig(format="%(levelname)s: %(message)s", level=log_level.value)
self.interceptors = [] self.interceptors = []
coordinator_class = self.get_coordinator(coordinator) coordinator_class = self.get_coordinator(coordinator)
@@ -52,7 +53,7 @@ class Autopipe:
data = pipe if isinstance(pipe, APData) else pipe.pipe(data) data = pipe if isinstance(pipe, APData) else pipe.pipe(data)
def _process_input(self, coordinator: Coordinator, data: APData) -> Union[APData, Pipe]: def _process_input(self, coordinator: Coordinator, data: APData) -> Union[APData, Pipe]:
logging.debug(data) logging.debug(f"Data: {json.dumps(data, indent=4)}")
interceptor = next((x for x in self.interceptors if x[1](data)), None) interceptor = next((x for x in self.interceptors if x[1](data)), None)
if interceptor: if interceptor:
+2 -2
View File
@@ -14,9 +14,9 @@ class DownloadExample(Coordinator):
return "DownloadExample" return "DownloadExample"
@property @property
def pipeline(self) -> List[Union[Pipe, Callable[..., APData]]]: def pipeline(self) -> List[Union[Pipe, Callable[[APData], Union[APData, Pipe]]]]:
return [Output(DownloaderPipe())] return [Output(DownloaderPipe())]
def get_input(self): def get_input(self):
return RssInput(f"http://www.obsrv.com/General/ImageFeed.aspx?{self.query}", return RssInput(f"http://www.obsrv.com/General/ImageFeed.aspx?{self.query}",
lambda x: FileData(x.title, x["media:content"], True)) lambda x: FileData(x.title, x["media_content"]["url"], True))
+23
View File
@@ -1,3 +1,6 @@
from sys import stderr
class ArgumentError(Exception): class ArgumentError(Exception):
def __init__(self, msg, flag=None): def __init__(self, msg, flag=None):
self.msg = msg self.msg = msg
@@ -5,3 +8,23 @@ class ArgumentError(Exception):
def __str__(self): def __str__(self):
return self.msg return self.msg
def print_more(self, args):
if self.flag == "coordinator":
import autopipe.coordinators as coordinators
print("Available coordinators:", file=stderr)
for coordinator in coordinators.__all__:
print(f"\t{coordinator.name()}", file=stderr)
if ':' in args.coordinator[0]:
try:
file, cls = args.coordinator[0].split(':')
except ValueError:
print(f"{args.coordinator[0]} is not a valid syntax. Did you meant to use file:class?", file=stderr)
return 2
print(f"Coordinators of ${file}:")
module = __import__(file)
for coordinator in module.__all__:
print(f"\t{coordinator.name()}", file=stderr)
else:
print("Or you can input a file anywhere on the system with the syntax: path/to/file.py:coordinator",
file=stderr)
+4
View File
@@ -1,3 +1,4 @@
import json
import logging import logging
from datetime import datetime from datetime import datetime
from typing import Generator, Callable, List from typing import Generator, Callable, List
@@ -18,10 +19,13 @@ class RssInput(Input):
return "Rss" return "Rss"
def generate(self) -> Generator[APData, None, None]: def generate(self) -> Generator[APData, None, None]:
setattr(logging, "trace", lambda msg, *args, **kwargs: True)
logging.debug(f"Pulling the rss feed at {self.url}, last etag: {self.last_etag}, modif: {self.last_modified}") logging.debug(f"Pulling the rss feed at {self.url}, last etag: {self.last_etag}, modif: {self.last_modified}")
feed = feedparser.parse(self.url, etag=self.last_etag, modified=self.last_modified) feed = feedparser.parse(self.url, etag=self.last_etag, modified=self.last_modified)
if feed.status != 304: if feed.status != 304:
for entry in feed.entries: for entry in feed.entries:
logging.trace(f"Rss entry: {json.dumps(entry, indent=4)}")
yield self.mapper(entry) yield self.mapper(entry)
@property @property
+33
View File
@@ -0,0 +1,33 @@
import logging
from enum import Enum
class LogLevel(Enum):
TRACE = 5
VV = 5
DEBUG = logging.DEBUG
V = logging.DEBUG
INFO = logging.INFO
WARNING = logging.WARN
WARN = logging.WARN
ERROR = logging.ERROR
def __str__(self):
return self.name.lower()
@classmethod
def parse(cls, x):
for name, value in cls.__members__.items():
if x.upper() == name:
return value
def _log(self, msg, *args, **kwargs):
if self.isEnabledFor(LogLevel.TRACE.value):
self._log(LogLevel.TRACE.value, msg, args, **kwargs)
logging.addLevelName(LogLevel.TRACE.value, LogLevel.TRACE.name)
setattr(logging, LogLevel.TRACE.name, LogLevel.TRACE.value)
setattr(logging.getLoggerClass(), "trace", _log)
setattr(logging, "trace", lambda msg, *args, **kwargs: logging.log(LogLevel.TRACE.value, msg, *args, **kwargs))
-2
View File
@@ -2,8 +2,6 @@ from abc import ABC, abstractmethod
from typing import Generator, List, Union, Callable from typing import Generator, List, Union, Callable
import logging import logging
from autopipe import ArgumentError
class APData(ABC): class APData(ABC):
@property @property