From f2f10e7aa23ac468e9c1a02b74cd87ddce570a8c Mon Sep 17 00:00:00 2001 From: deflax Date: Sun, 7 Jan 2024 01:32:06 +0200 Subject: [PATCH] implement apscheduler --- src/scheduler/app.py | 125 ++++++++++++++++++++++--------------------- 1 file changed, 64 insertions(+), 61 deletions(-) diff --git a/src/scheduler/app.py b/src/scheduler/app.py index 4a88fa0..0af0165 100644 --- a/src/scheduler/app.py +++ b/src/scheduler/app.py @@ -1,65 +1,57 @@ import os import logging +import json import time from datetime import datetime -import threading -import json from flask import Flask, render_template, jsonify, request -from flask_apscheduler import APScheduler +from apscheduler.schedulers.background import BackgroundScheduler from core_client import Client -logger = logging.getLogger('waitress') -logger.setLevel(os.environ.get('SCHEDULER_LOG_LEVEL', 'INFO').upper()) - app = Flask(__name__) +scheduler = BackgroundScheduler() -scheduler = APScheduler() -scheduler.api_enabled = False -scheduler.init_app(app) -scheduler.start() +# Log handlers +logger_api = logging.getLogger('waitress') +logger_api.setLevel(os.environ.get('SCHEDULER_LOG_LEVEL', 'INFO').upper()) -database = {} -prio = 0 -head = {} +logger_job = logging.getLogger('apscheduler') +logger_job.setLevel(logging.DEBUG) -epg_json = open('/config/epg.json', 'r') -epg = json.load(epg_json) -logger.info(epg) - -for i in epg: - logger.info(i) - -epg_json.close() - -# Environment +# Variables +CORE_SYNC_PERIOD = 30 api_hostname = os.environ.get('CORE_API_HOSTNAME') api_port = os.environ.get('CORE_API_PORT') api_username = os.environ.get('CORE_API_AUTH_USERNAME') api_password=os.environ.get('CORE_API_AUTH_PASSWORD') +# Init +database = {} +prio = 0 +head = {} +epg_json = open('/config/epg.json', 'r') +epg = json.load(epg_json) +logger_api.info(epg) +for i in epg: + logger_api.info(i) +epg_json.close() + # Helper functions def find_event_entry(events, target_name): for entry in events: if "name" in entry and entry["name"] == target_name: return {"start_at": entry.get("start_at"), "prio": entry.get("prio")} return None + +# Tasks +def tick(): + print('Tick! The time is: %s' % datetime.now()) def stream_exec(stream_name, stream_prio, stream_hls_url): global head - logger.info('Hello {}, your priority is'. format(stream_name, stream_prio)) - logger.info('HLS: ' + stream_hls_url) + logger_job.info('Hello {}, your priority is'. format(stream_name, stream_prio)) + logger_job.info('HLS: ' + stream_hls_url) head = { "head": stream_hls_url } - -# Datarhei Core API integration -SYNC_PERIOD = 30 -try: - client = Client(base_url='https://' + api_hostname, username=api_username, password=api_password) - logger.info('Logging in to Datarhei Core API ' + api_username + '@' + api_hostname) - client.login() -except Exception as err: - logger.error('client login error') - logger.error(err) - + def core_api_sync(): global database global epg @@ -68,7 +60,7 @@ def core_api_sync(): try: process_list = client.v3_process_get_list() except Exception as err: - logger.error('client.v3_process_get_list ' + err) + logger_job.error('client.v3_process_get_list ' + err) return True for process in process_list: try: @@ -77,7 +69,7 @@ def core_api_sync(): meta = get_process.metadata state = get_process.state except Exception as err: - logger.error('client.v3_process_get ' + err) + logger_job.error('client.v3_process_get ' + err) continue if meta is None: # Skip processes without metadata @@ -85,7 +77,7 @@ def core_api_sync(): else: if meta['restreamer-ui'].get('meta') is None: # Skip processes without meta key - #logger.warn('{} does not have a meta key'.format(stream_id)) + #logger_job.warn('{} does not have a meta key'.format(stream_id)) continue new_ids.append(stream_id) stream_name = meta['restreamer-ui']['meta']['name'] @@ -100,27 +92,25 @@ def core_api_sync(): # Skip learned channels continue else: - logger.info('{} ({}) has been registered to the database'.format(stream_id, stream_name)) + logger_job.info('{} ({}) has been registered to the database'.format(stream_id, stream_name)) epg_result = find_event_entry(epg, stream_name) - logger.info(epg_result) + logger_job.info(epg_result) #stream_prio = epg_result['prio'] stream_prio = 0 try: - #stream_start_time = epg_result['start_at'] - stream_start_time = "08:00" - logger.info("Start time is set to " + stream_start_time) - schedule.every().day.at(stream_start_time).do(stream_exec, stream_name, stream_prio, stream_hls_url).tag(stream_id) - scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id') + stream_start_hour = epg_result['start_at'] + logger_job.info("Stream start hour is set to " + stream_start_hour) + scheduler.add_job(func=stream_exec, trigger='cron', hour=stream_start_hour, jitter=60, id=stream_id, args=(stream_name, stream_prio, stream_hls_url)) except KeyError: - logger.info("Stream should start a.s.a.p") - schedule.every().minute.do(stream_exec, stream_name, stream_prio, stream_hls_url).tag(stream_id) + logger_job.info("Stream should start now") + scheduler.add_job(func=stream_exec, id=stream_id, args=(stream_name, stream_prio, stream_hls_url)) database.update(payload) else: # Remove from the database if the state is changed if stream_id in database: - logger.info('{} ({}) has been removed from the database. Reason: {}'.format(stream_id, stream_name, state.exec)) + logger_job.info('{} ({}) has been removed from the database. Reason: {}'.format(stream_id, stream_name, state.exec)) database.pop(stream_id) - schedule.clear(stream_id) + scheduler.remove_job(stream_id) new_ids.remove(stream_id) # Cleanup orphaned references orphan_keys = [] @@ -128,30 +118,42 @@ def core_api_sync(): if key in new_ids: continue else: - logger.info('Key {} is an orphan. Removing.'.format(key)) + logger_job.info('Key {} is an orphan. Removing.'.format(key)) orphan_keys.append(key) for orphan_key in orphan_keys: database.pop(orphan_key) - schedule.clear(stream_id) + scheduler.remove_job(stream_id) -# Debug Functions def show_database(): global database - logger.info('Scheduler DB: ' + str(database)) + logger_job.info('Scheduler DB: ' + str(database)) def show_scheduled_tasks(): - logger.info('Scheduler tasks:' + str(schedule.get_jobs())) + logger_job.info('Scheduler tasks:' + str(scheduler.get_jobs())) + logger_job.info('Scheduler tasks:' + str(scheduler.print_jobs())) +# Login +try: + client = Client(base_url='https://' + api_hostname, username=api_username, password=api_password) + logger_job.info('Logging in to Datarhei Core API ' + api_username + '@' + api_hostname) + client.login() +except Exception as err: + logger_job.error('client login error') + logger_job.error(err) + + +# Schedule tick +scheduler.add_job(func=tick, trigger="interval", seconds=3) + # Schedule datarhei core api sync -core_api_sync() -scheduler.add_job(func=core_api_sync, trigger="interval", seconds=SYNC_PERIOD, id="core_api_sync", name="core_api_sync", replace_existing=True) +#core_api_sync() +scheduler.add_job(func=core_api_sync, trigger="interval", seconds=CORE_SYNC_PERIOD, id="core_api_sync") # Schedule show db/tasks -scheduler.add_job(func=core_api_sync, trigger="interval", seconds=60, id="core_api_sync", name="core_api_sync", replace_existing=True) -scheduler.add_job(func=core_api_sync, trigger="interval", seconds=60, id="core_api_sync", name="core_api_sync", replace_existing=True) +scheduler.add_job(func=show_database, trigger="interval", seconds=60, id="show_database") +scheduler.add_job(func=show_scheduled_tasks, trigger="interval", seconds=60, id="show_scheduled_tasks") -schedule.every().minute.do(show_database) -schedule.every().minute.do(show_scheduled_tasks) +scheduler.start() fallback = { "head": "https://stream.deflax.net/memfs/938a36f8-02ff-4452-a7e5-3b6a9a07cdfa.m3u8" } head = fallback @@ -159,6 +161,7 @@ head = fallback @app.route('/', methods=['GET']) def root_query(): global head + logger_api.info('api ping') return jsonify(head) def create_app():