From e07261deed5dea23d83d3001e4b90732bc79c820 Mon Sep 17 00:00:00 2001 From: deflax Date: Sat, 6 Jan 2024 06:03:02 +0200 Subject: [PATCH] implement schedule 1.2.1 --- config/scheduler/epg.json | 20 ++--- src/scheduler/app.py | 129 ++++++++++++++++++++++----------- src/scheduler/requirements.txt | 4 +- 3 files changed, 98 insertions(+), 55 deletions(-) diff --git a/config/scheduler/epg.json b/config/scheduler/epg.json index 9686859..cbd4210 100644 --- a/config/scheduler/epg.json +++ b/config/scheduler/epg.json @@ -1,25 +1,25 @@ [ { "name": "IwayHigh", - "start_at": "1400", - "prio": 5 + "start_at": "14:00", + "prio": 0 }, { "name": "Jungletrain", - "start_at": "2200", - "prio": 5 + "start_at": "22:00", + "prio": 0 }, { "name": "Anima", - "start_at": "0600", - "prio": 5 - }, - { - "name": "Rodopsko Odealo", - "prio": 1 + "start_at": "06:00", + "prio": 0 }, { "name": "Ines OBS", + "prio": 1 + }, + { + "name": "Rodopsko Odealo", "prio": 2 } ] diff --git a/src/scheduler/app.py b/src/scheduler/app.py index 5e113af..ee70c71 100644 --- a/src/scheduler/app.py +++ b/src/scheduler/app.py @@ -1,16 +1,20 @@ import os import logging -import asyncio +import time from datetime import datetime -from schedule_manager import ScheduleManager +import schedule +import threading +import json from flask import Flask, render_template, jsonify, request from core_client import Client app = Flask(__name__) -manager = ScheduleManager() logger = logging.getLogger('waitress') logger.setLevel(os.environ.get('SCHEDULER_LOG_LEVEL', 'INFO').upper()) + database = {} +prio = 0 +head = {} # Environment api_hostname = os.environ.get('CORE_API_HOSTNAME') @@ -18,25 +22,50 @@ api_port = os.environ.get('CORE_API_PORT') api_username = os.environ.get('CORE_API_AUTH_USERNAME') api_password=os.environ.get('CORE_API_AUTH_PASSWORD') -iway = { - "head": "https://stream.deflax.net/memfs/ac2172fa-d8d5-4487-8bc6-5347dcf7c0dc.m3u8" -} +# Helper functions +def run_continuously(interval=1): + """Continuously run, while executing pending jobs at each + elapsed time interval. + @return cease_continuous_run: threading. Event which can + be set to cease continuous run. Please note that it is + *intended behavior that run_continuously() does not run + missed jobs*. For example, if you've registered a job that + should run every minute and you set a continuous run + interval of one hour then your job won't be run 60 times + at each interval but only once. + """ + cease_continuous_run = threading.Event() -cam = { - "head": "https://stream.deflax.net/memfs/37890510-5ff7-4387-866f-516468cea43f.m3u8" -} + class ScheduleThread(threading.Thread): + @classmethod + def run(cls): + while not cease_continuous_run.is_set(): + schedule.run_pending() + time.sleep(interval) -jt = { - "head": "https://stream.deflax.net/memfs/6e5b4949-910d-4ec9-8ed8-1a3d8bb5138e.m3u8" -} + continuous_thread = ScheduleThread() + continuous_thread.start() + return cease_continuous_run -obs = { - "head": "https://stream.deflax.net/memfs/9502315a-bb95-4e3e-8c24-8661d6dd2fe8.m3u8" -} +def find_event_entry(events, target_name): + for entry in events: + if "name" in entry and entry["name"] == target_name: + return {"start_at": entry.get("start_at"), "prio": entry.get("prio")} + return None -ines = { - "head": "https://stream.deflax.net/memfs/eddfeadd-1c72-4c5c-aabb-a99a1daa2231.m3u8" -} +# Load the epg.json config file +with open('/config/epg.json', 'r') as json_file: + # Load the JSON data from the file + epg_json = json.load(json_file) + +def stream_exec(stream_name, stream_prio, stream_hls_url): + global head + print('Hello {}, your priority is'. format(stream_name, stream_prio)) + print('HLS: ' + stream_hls_url) + head = stream_hls_url + +# Start the background thread +stop_run_continuously = run_continuously() # Datarhei Core API integration SYNC_PERIOD = 30 @@ -47,14 +76,15 @@ try: except Exception as err: logger.error('client login error') logger.error(err) - + def core_api_sync(): + global database + global prio new_ids = [] try: process_list = client.v3_process_get_list() except Exception as err: - logger.error('client.v3_process_get_list error') - logger.error(err) + logger.error('client.v3_process_get_list ' + err) return True for process in process_list: try: @@ -63,9 +93,7 @@ def core_api_sync(): meta = get_process.metadata state = get_process.state except Exception as err: - logger.error('client.v3_process_get error') - logger.error(err) - logger.error(get_process) + logger.error('client.v3_process_get ' + err) continue if meta is None: # Skip processes without metadata @@ -80,50 +108,65 @@ def core_api_sync(): stream_description = meta['restreamer-ui']['meta']['description'] stream_storage_type = meta['restreamer-ui']['control']['hls']['storage'] stream_hls_url = 'https://{}/{}/{}.m3u8'.format(api_hostname, stream_storage_type, stream_id) - payload = { stream_id: { 'name': stream_name, 'meta': stream_description, 'src': stream_hls_url } } - + if state.exec == "running": # Register a running channel to the database if stream_id in database: - # Skip overwriting channel + # Skip learned channels continue else: logger.info('{} ({}) has been registered to the database'.format(stream_id, stream_name)) + epg_result = find_event_entry(epg_json, stream_name) + stream_prio = epg_result['prio'] + try: + stream_start_time = epg_result['start_at'] + logger.info("Start time is set to " + stream_start_time) + schedule.every().day.at(stream_start_time).do(stream_exec, stream_name, stream_prio, stream_hls_url).tag(stream_id) + except KeyError: + logger.info("Stream should start a.s.a.p") + schedule.every().minute.do(stream_exec, stream_name, stream_prio, stream_hls_url).tag(stream_id) database.update(payload) else: # Remove from the database if the state is changed if stream_id in database: logger.info('{} ({}) has been removed from the database. Reason: {}'.format(stream_id, stream_name, state.exec)) database.pop(stream_id) + schedule.clear(stream_id) new_ids.remove(stream_id) - # Cleanup orphaned references - marked_keys = [] + orphan_keys = [] for key in database: if key in new_ids: continue else: logger.info('Key {} is an orphan. Removing.'.format(key)) - marked_keys.append(key) + orphan_keys.append(key) + for orphan_key in orphan_keys: + database.pop(orphan_key) + schedule.clear(stream_id) - for marked_key in marked_keys: - database.pop(marked_key) +# Debug Functions +def show_database(): + global database + logger.info('show database:') + logger.info(database) + +def show_scheduled_tasks(): + logger.info('show tasks:') + logger.info(schedule.get_jobs()) -def analyze_db(): - #logger.info(database) - return True +# Schedule datarhei core api sync +schedule.every(SYNC_PERIOD).minutes.do(core_api_sync) -# Schedule a periodic task: sync datarhei core api -manager.register_task(name="core_api_sync", job=core_api_sync).period(SYNC_PERIOD).start() - -# Schedule a periodic task: read the memory state -manager.register_task(name="read_database", job=analyze_db).period(35).start() +# Schedule show db/tasks +schedule.every().minute.do(show_database) +schedule.every().minute.do(show_scheduled_tasks) @app.route('/', methods=['GET']) def root_query(): - playhead = iway - return jsonify(playhead) + global head + return jsonify(head) def create_app(): - return app + return app \ No newline at end of file diff --git a/src/scheduler/requirements.txt b/src/scheduler/requirements.txt index 4eba186..5eba75b 100644 --- a/src/scheduler/requirements.txt +++ b/src/scheduler/requirements.txt @@ -4,6 +4,6 @@ Flask==3.0.0 itsdangerous==2.1.2 Jinja2==3.1.2 MarkupSafe==2.1.3 -schedule-manager==0.1.1 +schedule==1.2.1 waitress==2.1.2 -Werkzeug==3.0.1 +Werkzeug==3.0.1 \ No newline at end of file