# scheduler.py from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.interval import IntervalTrigger from config import SETTINGS, load_feeds import logging from fetch_news import NewsFetcher from filter_news import NewsFilter from telegram_bot import TelegramBot import asyncio logger = logging.getLogger(__name__) class NewsScheduler: def __init__(self): self.scheduler = AsyncIOScheduler() self.processed_links = set() self.max_history = 1000 self.filter = NewsFilter() self.bot = TelegramBot() self.running = False async def start(self): if not self.running: self.scheduler.start() self.running = True asyncio.create_task(self.bot.process_queue()) logger.info("Scheduler gestartet mit %d Jobs", len(self.scheduler.get_jobs())) async def shutdown(self): if self.running: await self.scheduler.shutdown() await self.bot.session.close() self.running = False async def job_wrapper(self, feed: Dict): try: async with NewsFetcher() as fetcher: articles = await fetcher.process_rss(feed, self.processed_links) if feed["type"] == "rss" \ else await fetcher.get_news_api(feed, self.processed_links) filtered = self.filter.filter_articles(articles) for article in filtered: await self.bot.message_queue.put(article) self.cleanup_processed_links() except Exception as e: logger.error(f"Job für {feed['name']} fehlgeschlagen: {str(e)}") def cleanup_processed_links(self): if len(self.processed_links) > self.max_history: self.processed_links = set(list(self.processed_links)[-self.max_history:]) def add_jobs(self): for feed in load_feeds(): trigger = IntervalTrigger( minutes=feed.get("interval", SETTINGS["check_interval"]) ) self.scheduler.add_job( self.job_wrapper, trigger=trigger, kwargs={"feed": feed}, max_instances=3, name=f"Feed: {feed['name']}" )