flask-apcheduler experiments

This commit is contained in:
deflax 2024-01-06 22:26:03 +00:00
parent a278d817cf
commit dc96ce83ff
2 changed files with 26 additions and 39 deletions

View file

@ -2,25 +2,33 @@ import os
import logging import logging
import time import time
from datetime import datetime from datetime import datetime
import schedule
import threading import threading
import json import json
from flask import Flask, render_template, jsonify, request from flask import Flask, render_template, jsonify, request
from flask_apscheduler import APScheduler
from core_client import Client from core_client import Client
app = Flask(__name__)
logger = logging.getLogger('waitress') logger = logging.getLogger('waitress')
logger.setLevel(os.environ.get('SCHEDULER_LOG_LEVEL', 'INFO').upper()) logger.setLevel(os.environ.get('SCHEDULER_LOG_LEVEL', 'INFO').upper())
app = Flask(__name__)
scheduler = APScheduler()
scheduler.api_enabled = False
scheduler.init_app(app)
scheduler.start()
database = {} database = {}
prio = 0 prio = 0
head = {} head = {}
epg_json = open('/config/epg.json', 'r') epg_json = open('/config/epg.json', 'r')
epg = json.load(epg_json) epg = json.load(epg_json)
logger.info(epg)
for i in epg: for i in epg:
logger.info(i) logger.info(i)
epg_json.close() epg_json.close()
# Environment # Environment
@ -30,30 +38,6 @@ api_username = os.environ.get('CORE_API_AUTH_USERNAME')
api_password=os.environ.get('CORE_API_AUTH_PASSWORD') api_password=os.environ.get('CORE_API_AUTH_PASSWORD')
# Helper functions # Helper functions
def run_continuously(interval=1):
"""Continuously run, while executing pending jobs at each
elapsed time interval.
@return cease_continuous_run: threading. Event which can
be set to cease continuous run. Please note that it is
*intended behavior that run_continuously() does not run
missed jobs*. For example, if you've registered a job that
should run every minute and you set a continuous run
interval of one hour then your job won't be run 60 times
at each interval but only once.
"""
cease_continuous_run = threading.Event()
class ScheduleThread(threading.Thread):
@classmethod
def run(cls):
while not cease_continuous_run.is_set():
schedule.run_pending()
time.sleep(interval)
continuous_thread = ScheduleThread()
continuous_thread.start()
return cease_continuous_run
def find_event_entry(events, target_name): def find_event_entry(events, target_name):
for entry in events: for entry in events:
if "name" in entry and entry["name"] == target_name: if "name" in entry and entry["name"] == target_name:
@ -62,12 +46,9 @@ def find_event_entry(events, target_name):
def stream_exec(stream_name, stream_prio, stream_hls_url): def stream_exec(stream_name, stream_prio, stream_hls_url):
global head global head
print('Hello {}, your priority is'. format(stream_name, stream_prio)) logger.info('Hello {}, your priority is'. format(stream_name, stream_prio))
print('HLS: ' + stream_hls_url) logger.info('HLS: ' + stream_hls_url)
head = stream_hls_url head = { "head": stream_hls_url }
# Start the background thread
stop_run_continuously = run_continuously()
# Datarhei Core API integration # Datarhei Core API integration
SYNC_PERIOD = 30 SYNC_PERIOD = 30
@ -122,11 +103,14 @@ def core_api_sync():
logger.info('{} ({}) has been registered to the database'.format(stream_id, stream_name)) logger.info('{} ({}) has been registered to the database'.format(stream_id, stream_name))
epg_result = find_event_entry(epg, stream_name) epg_result = find_event_entry(epg, stream_name)
logger.info(epg_result) logger.info(epg_result)
stream_prio = epg_result['prio'] #stream_prio = epg_result['prio']
stream_prio = 0
try: try:
stream_start_time = epg_result['start_at'] #stream_start_time = epg_result['start_at']
stream_start_time = "08:00"
logger.info("Start time is set to " + stream_start_time) logger.info("Start time is set to " + stream_start_time)
schedule.every().day.at(stream_start_time).do(stream_exec, stream_name, stream_prio, stream_hls_url).tag(stream_id) schedule.every().day.at(stream_start_time).do(stream_exec, stream_name, stream_prio, stream_hls_url).tag(stream_id)
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
except KeyError: except KeyError:
logger.info("Stream should start a.s.a.p") logger.info("Stream should start a.s.a.p")
schedule.every().minute.do(stream_exec, stream_name, stream_prio, stream_hls_url).tag(stream_id) schedule.every().minute.do(stream_exec, stream_name, stream_prio, stream_hls_url).tag(stream_id)
@ -153,16 +137,19 @@ def core_api_sync():
# Debug Functions # Debug Functions
def show_database(): def show_database():
global database global database
logger.info('Scheduler DB: ' + database) logger.info('Scheduler DB: ' + str(database))
def show_scheduled_tasks(): def show_scheduled_tasks():
logger.info('Scheduler tasks:' + schedule.get_jobs()) logger.info('Scheduler tasks:' + str(schedule.get_jobs()))
# Schedule datarhei core api sync # Schedule datarhei core api sync
core_api_sync() core_api_sync()
schedule.every(SYNC_PERIOD).seconds.do(core_api_sync) scheduler.add_job(func=core_api_sync, trigger="interval", seconds=SYNC_PERIOD, id="core_api_sync", name="core_api_sync", replace_existing=True)
# Schedule show db/tasks # Schedule show db/tasks
scheduler.add_job(func=core_api_sync, trigger="interval", seconds=60, id="core_api_sync", name="core_api_sync", replace_existing=True)
scheduler.add_job(func=core_api_sync, trigger="interval", seconds=60, id="core_api_sync", name="core_api_sync", replace_existing=True)
schedule.every().minute.do(show_database) schedule.every().minute.do(show_database)
schedule.every().minute.do(show_scheduled_tasks) schedule.every().minute.do(show_scheduled_tasks)

View file

@ -4,6 +4,6 @@ Flask==3.0.0
itsdangerous==2.1.2 itsdangerous==2.1.2
Jinja2==3.1.2 Jinja2==3.1.2
MarkupSafe==2.1.3 MarkupSafe==2.1.3
schedule==1.2.1
waitress==2.1.2 waitress==2.1.2
Werkzeug==3.0.1 Werkzeug==3.0.1
APScheduler==3.10.4