implement apscheduler

This commit is contained in:
deflax 2024-01-07 01:32:06 +02:00
parent dc96ce83ff
commit f2f10e7aa2

View file

@ -1,65 +1,57 @@
import os import os
import logging import logging
import json
import time import time
from datetime import datetime from datetime import datetime
import threading
import json
from flask import Flask, render_template, jsonify, request from flask import Flask, render_template, jsonify, request
from flask_apscheduler import APScheduler from apscheduler.schedulers.background import BackgroundScheduler
from core_client import Client from core_client import Client
logger = logging.getLogger('waitress')
logger.setLevel(os.environ.get('SCHEDULER_LOG_LEVEL', 'INFO').upper())
app = Flask(__name__) app = Flask(__name__)
scheduler = BackgroundScheduler()
scheduler = APScheduler() # Log handlers
scheduler.api_enabled = False logger_api = logging.getLogger('waitress')
scheduler.init_app(app) logger_api.setLevel(os.environ.get('SCHEDULER_LOG_LEVEL', 'INFO').upper())
scheduler.start()
database = {} logger_job = logging.getLogger('apscheduler')
prio = 0 logger_job.setLevel(logging.DEBUG)
head = {}
epg_json = open('/config/epg.json', 'r') # Variables
epg = json.load(epg_json) CORE_SYNC_PERIOD = 30
logger.info(epg)
for i in epg:
logger.info(i)
epg_json.close()
# Environment
api_hostname = os.environ.get('CORE_API_HOSTNAME') api_hostname = os.environ.get('CORE_API_HOSTNAME')
api_port = os.environ.get('CORE_API_PORT') api_port = os.environ.get('CORE_API_PORT')
api_username = os.environ.get('CORE_API_AUTH_USERNAME') api_username = os.environ.get('CORE_API_AUTH_USERNAME')
api_password=os.environ.get('CORE_API_AUTH_PASSWORD') api_password=os.environ.get('CORE_API_AUTH_PASSWORD')
# Init
database = {}
prio = 0
head = {}
epg_json = open('/config/epg.json', 'r')
epg = json.load(epg_json)
logger_api.info(epg)
for i in epg:
logger_api.info(i)
epg_json.close()
# Helper functions # Helper functions
def find_event_entry(events, target_name): def find_event_entry(events, target_name):
for entry in events: for entry in events:
if "name" in entry and entry["name"] == target_name: if "name" in entry and entry["name"] == target_name:
return {"start_at": entry.get("start_at"), "prio": entry.get("prio")} return {"start_at": entry.get("start_at"), "prio": entry.get("prio")}
return None return None
# Tasks
def tick():
print('Tick! The time is: %s' % datetime.now())
def stream_exec(stream_name, stream_prio, stream_hls_url): def stream_exec(stream_name, stream_prio, stream_hls_url):
global head global head
logger.info('Hello {}, your priority is'. format(stream_name, stream_prio)) logger_job.info('Hello {}, your priority is'. format(stream_name, stream_prio))
logger.info('HLS: ' + stream_hls_url) logger_job.info('HLS: ' + stream_hls_url)
head = { "head": stream_hls_url } head = { "head": stream_hls_url }
# Datarhei Core API integration
SYNC_PERIOD = 30
try:
client = Client(base_url='https://' + api_hostname, username=api_username, password=api_password)
logger.info('Logging in to Datarhei Core API ' + api_username + '@' + api_hostname)
client.login()
except Exception as err:
logger.error('client login error')
logger.error(err)
def core_api_sync(): def core_api_sync():
global database global database
global epg global epg
@ -68,7 +60,7 @@ def core_api_sync():
try: try:
process_list = client.v3_process_get_list() process_list = client.v3_process_get_list()
except Exception as err: except Exception as err:
logger.error('client.v3_process_get_list ' + err) logger_job.error('client.v3_process_get_list ' + err)
return True return True
for process in process_list: for process in process_list:
try: try:
@ -77,7 +69,7 @@ def core_api_sync():
meta = get_process.metadata meta = get_process.metadata
state = get_process.state state = get_process.state
except Exception as err: except Exception as err:
logger.error('client.v3_process_get ' + err) logger_job.error('client.v3_process_get ' + err)
continue continue
if meta is None: if meta is None:
# Skip processes without metadata # Skip processes without metadata
@ -85,7 +77,7 @@ def core_api_sync():
else: else:
if meta['restreamer-ui'].get('meta') is None: if meta['restreamer-ui'].get('meta') is None:
# Skip processes without meta key # Skip processes without meta key
#logger.warn('{} does not have a meta key'.format(stream_id)) #logger_job.warn('{} does not have a meta key'.format(stream_id))
continue continue
new_ids.append(stream_id) new_ids.append(stream_id)
stream_name = meta['restreamer-ui']['meta']['name'] stream_name = meta['restreamer-ui']['meta']['name']
@ -100,27 +92,25 @@ def core_api_sync():
# Skip learned channels # Skip learned channels
continue continue
else: else:
logger.info('{} ({}) has been registered to the database'.format(stream_id, stream_name)) logger_job.info('{} ({}) has been registered to the database'.format(stream_id, stream_name))
epg_result = find_event_entry(epg, stream_name) epg_result = find_event_entry(epg, stream_name)
logger.info(epg_result) logger_job.info(epg_result)
#stream_prio = epg_result['prio'] #stream_prio = epg_result['prio']
stream_prio = 0 stream_prio = 0
try: try:
#stream_start_time = epg_result['start_at'] stream_start_hour = epg_result['start_at']
stream_start_time = "08:00" logger_job.info("Stream start hour is set to " + stream_start_hour)
logger.info("Start time is set to " + stream_start_time) scheduler.add_job(func=stream_exec, trigger='cron', hour=stream_start_hour, jitter=60, id=stream_id, args=(stream_name, stream_prio, stream_hls_url))
schedule.every().day.at(stream_start_time).do(stream_exec, stream_name, stream_prio, stream_hls_url).tag(stream_id)
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
except KeyError: except KeyError:
logger.info("Stream should start a.s.a.p") logger_job.info("Stream should start now")
schedule.every().minute.do(stream_exec, stream_name, stream_prio, stream_hls_url).tag(stream_id) scheduler.add_job(func=stream_exec, id=stream_id, args=(stream_name, stream_prio, stream_hls_url))
database.update(payload) database.update(payload)
else: else:
# Remove from the database if the state is changed # Remove from the database if the state is changed
if stream_id in database: if stream_id in database:
logger.info('{} ({}) has been removed from the database. Reason: {}'.format(stream_id, stream_name, state.exec)) logger_job.info('{} ({}) has been removed from the database. Reason: {}'.format(stream_id, stream_name, state.exec))
database.pop(stream_id) database.pop(stream_id)
schedule.clear(stream_id) scheduler.remove_job(stream_id)
new_ids.remove(stream_id) new_ids.remove(stream_id)
# Cleanup orphaned references # Cleanup orphaned references
orphan_keys = [] orphan_keys = []
@ -128,30 +118,42 @@ def core_api_sync():
if key in new_ids: if key in new_ids:
continue continue
else: else:
logger.info('Key {} is an orphan. Removing.'.format(key)) logger_job.info('Key {} is an orphan. Removing.'.format(key))
orphan_keys.append(key) orphan_keys.append(key)
for orphan_key in orphan_keys: for orphan_key in orphan_keys:
database.pop(orphan_key) database.pop(orphan_key)
schedule.clear(stream_id) scheduler.remove_job(stream_id)
# Debug Functions
def show_database(): def show_database():
global database global database
logger.info('Scheduler DB: ' + str(database)) logger_job.info('Scheduler DB: ' + str(database))
def show_scheduled_tasks(): def show_scheduled_tasks():
logger.info('Scheduler tasks:' + str(schedule.get_jobs())) logger_job.info('Scheduler tasks:' + str(scheduler.get_jobs()))
logger_job.info('Scheduler tasks:' + str(scheduler.print_jobs()))
# Login
try:
client = Client(base_url='https://' + api_hostname, username=api_username, password=api_password)
logger_job.info('Logging in to Datarhei Core API ' + api_username + '@' + api_hostname)
client.login()
except Exception as err:
logger_job.error('client login error')
logger_job.error(err)
# Schedule tick
scheduler.add_job(func=tick, trigger="interval", seconds=3)
# Schedule datarhei core api sync # Schedule datarhei core api sync
core_api_sync() #core_api_sync()
scheduler.add_job(func=core_api_sync, trigger="interval", seconds=SYNC_PERIOD, id="core_api_sync", name="core_api_sync", replace_existing=True) scheduler.add_job(func=core_api_sync, trigger="interval", seconds=CORE_SYNC_PERIOD, id="core_api_sync")
# Schedule show db/tasks # Schedule show db/tasks
scheduler.add_job(func=core_api_sync, trigger="interval", seconds=60, id="core_api_sync", name="core_api_sync", replace_existing=True) scheduler.add_job(func=show_database, trigger="interval", seconds=60, id="show_database")
scheduler.add_job(func=core_api_sync, trigger="interval", seconds=60, id="core_api_sync", name="core_api_sync", replace_existing=True) scheduler.add_job(func=show_scheduled_tasks, trigger="interval", seconds=60, id="show_scheduled_tasks")
schedule.every().minute.do(show_database) scheduler.start()
schedule.every().minute.do(show_scheduled_tasks)
fallback = { "head": "https://stream.deflax.net/memfs/938a36f8-02ff-4452-a7e5-3b6a9a07cdfa.m3u8" } fallback = { "head": "https://stream.deflax.net/memfs/938a36f8-02ff-4452-a7e5-3b6a9a07cdfa.m3u8" }
head = fallback head = fallback
@ -159,6 +161,7 @@ head = fallback
@app.route('/', methods=['GET']) @app.route('/', methods=['GET'])
def root_query(): def root_query():
global head global head
logger_api.info('api ping')
return jsonify(head) return jsonify(head)
def create_app(): def create_app():