Muhammad Abdur Rahman Saad
Update app.py
afdde10
raw
history blame
1.34 kB
import logging
from flask import Flask
from flask_apscheduler import APScheduler
from asgiref.wsgi import WsgiToAsgi
from routes.main import bp
from main import main as data_collection_main
class Config:
SCHEDULER_API_ENABLED = True # Enables the scheduler's API
def create_app():
# Create the Flask application
flask_app = Flask(__name__)
# Load config for APScheduler
flask_app.config.from_object(Config())
# Register our Blueprint
flask_app.register_blueprint(bp)
logging.basicConfig(
format='%(asctime)s - %(levelname)s - %(funcName)s - %(message)s')
logging.getLogger().setLevel(logging.ERROR)
# Initialize the APScheduler
scheduler = APScheduler()
scheduler.init_app(flask_app)
# Schedule a cron job to run main() every day at 16:00
@scheduler.task('cron', id='daily_data_collection', hour=16, minute=0)
def scheduled_data_collection():
"""
This function runs automatically every day at 16:00 server time.
It calls the Prefect flow defined in 'main.py' to collect data.
"""
data_collection_main()
# Start the scheduler
scheduler.start()
return flask_app
app = create_app()
asgi_app = WsgiToAsgi(app)
if __name__ == '__main__':
# Create the Flask app and run it
app.run()