mirror of
https://github.com/zoriya/Autopipe.git
synced 2025-12-06 02:56:09 +00:00
31 lines
907 B
Python
31 lines
907 B
Python
import json
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import Generator, Callable, List
|
|
from autopipe import Input, APData
|
|
import feedparser
|
|
|
|
|
|
class RssInput(Input):
|
|
def __init__(self, url: str, mapper: Callable[[List], APData], start_from_now: bool = True):
|
|
self.url = url
|
|
self.mapper = mapper
|
|
self.last_etag = None
|
|
self.last_modified = datetime.now() if start_from_now else None
|
|
|
|
@property
|
|
def name(self):
|
|
return "Rss"
|
|
|
|
def generate(self) -> Generator[APData, None, None]:
|
|
logging.debug(f"Pulling the rss feed at {self.url}, last etag: {self.last_etag}, modif: {self.last_modified}")
|
|
feed = feedparser.parse(self.url, etag=self.last_etag, modified=self.last_modified)
|
|
if feed.status != 304:
|
|
for entry in feed.entries:
|
|
logging.trace(f"Rss entry: {json.dumps(entry, indent=4)}")
|
|
yield self.mapper(entry)
|
|
|
|
@property
|
|
def loop_cooldown(self) -> int:
|
|
return 300
|