From a412d8c20520c0a8d89eecdde0af0b6c11b35528 Mon Sep 17 00:00:00 2001 From: Daniel afx Date: Fri, 12 Jan 2024 16:54:53 +0200 Subject: [PATCH] implement stream recorder --- src/scheduler/app.py | 73 +++++++++++++++++++++++++++----------------- 1 file changed, 45 insertions(+), 28 deletions(-) diff --git a/src/scheduler/app.py b/src/scheduler/app.py index ff59cec..dd517c4 100644 --- a/src/scheduler/app.py +++ b/src/scheduler/app.py @@ -27,8 +27,8 @@ api_password = os.environ.get('CORE_API_AUTH_PASSWORD', 'pass') # Init database = {} prio = 0 -recorder = False -head = {} +playhead = {} +rechead = "" with open('/config/epg.json', 'r') as epg_json: epg = json.load(epg_json) @@ -44,6 +44,7 @@ def get_core_process_details(client, process_id): # Helper function to process a running channel def process_running_channel(database, scheduler, stream_id, stream_name, stream_description, stream_hls_url): + global recording if stream_id in database: # Skip learned channels return @@ -57,23 +58,26 @@ def process_running_channel(database, scheduler, stream_id, stream_name, stream_ logger_job.warning(f'{stream_id} ({stream_name}) has been registered to the database') if stream_start == "now": logger_job.warning("Stream should start now") - scheduler.add_job(func=stream_exec, id=stream_id, args=(stream_id, stream_name, stream_prio, stream_hls_url)) + scheduler.add_job(func=exec_stream, id=stream_id, args=(stream_id, stream_name, stream_prio, stream_hls_url)) + rec_id = f'rec_{stream_id}' + scheduler.add_job(func=exec_recorder, id=rec_id, args=(stream_id, stream_hls_url)) else: logger_job.warning(f"Stream start hour is set to {stream_start}") scheduler.add_job( - func=stream_exec, trigger='cron', hour=stream_start, jitter=60, + func=exec_stream, trigger='cron', hour=stream_start, jitter=60, id=stream_id, args=(stream_id, stream_name, stream_prio, stream_hls_url) ) database.update({stream_id: {'name': stream_name, 'start_at': stream_start, 'meta': stream_description, 'src': stream_hls_url}}) # Bootstrap the playhead if its still empty. - if head == {}: + if playhead == {}: fallback = fallback_search(database) - scheduler.add_job(func=stream_exec, id="fallback", args=(fallback['stream_id'], fallback['stream_name'], 0, fallback['stream_hls_url'])) + scheduler.add_job(func=exec_stream, id="fallback", args=(fallback['stream_id'], fallback['stream_name'], 0, fallback['stream_hls_url'])) # Helper function to remove channel from the database def remove_channel_from_database(database, scheduler, stream_id, stream_name, state): global prio - global head + global playhead + global rechead if stream_id in database: logger_job.warning(f'{stream_id} ({stream_name}) will be removed from the database. Reason: {state.exec}') database.pop(stream_id) @@ -81,13 +85,19 @@ def remove_channel_from_database(database, scheduler, stream_id, stream_name, st scheduler.remove_job(stream_id) except Exception as joberror: logger_job.error(joberror) + # If the removed stream is currently recording we should stop the rec process and reset the rechead + if stream_id == rechead: + logger_job.warning(f'{stream_id} recording stopped.') + rec_id = f'rec_{stream_id}' + scheduler.remove_job(rec_id) + rechead = "" # Handle the situation where we remove an stream that is currently playing - if head['id'] == stream_id: - logger_job.warning(f'{stream_id} was currently running.') + if stream_id == playhead['id'] + logger_job.warning(f'{stream_id} was playing.') fallback = fallback_search(database) prio = 0 logger_job.warning(f'Source priority is reset to 0') - scheduler.add_job(func=stream_exec, id="fallback", args=(fallback['stream_id'], fallback['stream_name'], 0, fallback['stream_hls_url'])) + scheduler.add_job(func=exec_stream, id="fallback", args=(fallback['stream_id'], fallback['stream_name'], 0, fallback['stream_hls_url'])) # Helper function to search for a fallback stream def fallback_search(database): @@ -96,6 +106,7 @@ def fallback_search(database): hour_set = [] for key, value in database.items(): if value['start_at'] == "now" or value['start_at'] == "never": + # do not use non-time scheduled streams as fallbacks continue else: hour_set.append(value['start_at']) @@ -115,38 +126,44 @@ def find_event_entry(epg, stream_name): return {"start_at": entry.get("start_at"), "prio": entry.get("prio")} return None -# Helper function to update the head -def update_head(stream_id, stream_prio, stream_hls_url): - global head - head = { "id": stream_id, +# Helper function to update the playhead +def update_playhead(stream_id, stream_prio, stream_hls_url): + global playhead + playhead = { "id": stream_id, "prio": stream_prio, "head": stream_hls_url } - logger_job.warning(f'Head position is: {str(head)}') + logger_job.warning(f'Playhead position is: {str(playhead)}') -# Tasks -def stream_exec(stream_id, stream_name, stream_prio, stream_hls_url): +# Execute stream +def exec_stream(stream_id, stream_name, stream_prio, stream_hls_url): global prio logger_job.warning(f'Hello {stream_name}!') if stream_prio > prio: prio = stream_prio logger_job.warning(f'Source priority is now set to: {prio}') - update_head(stream_id, stream_prio, stream_hls_url) + update_playhead(stream_id, stream_prio, stream_hls_url) elif stream_prio == prio: - update_head(stream_id, stream_prio, stream_hls_url) + update_playhead(stream_id, stream_prio, stream_hls_url) elif stream_prio < prio: - logger_job.warning(f'Source with higher priority ({prio}) is blocking. Skipping head update!') - -def recorder(stream_hls_url): - global recorder - output_file = "test.mp4" + logger_job.warning(f'Source with higher priority ({prio}) is blocking. Skipping playhead update!') +# Execute recorder +def exec_recorder(stream_id, stream_hls_url): + global rechead + current_datetime = datetime.now().strftime("%Y-%m-%d_%H-%M-%S-%f") + output_file = current_datetime + ".mp4" + if rechead != "": + logger_job.error('Recorder is already started. Refusing to start another rec job.') + return False + else: + logger_job.warning(f'Starting recording job for {output_file}') + rechead = stream_id ffmpeg = ( FFmpeg() .option("y") .input(stream_hls_url) - .output("output.mp4", vcodec="copy") + .output(output_file, vcodec="copy") ) - ffmpeg.execute() def core_api_sync(): @@ -209,8 +226,8 @@ scheduler.start() @app.route('/', methods=['GET']) def root_query(): - global head - return jsonify(head) + global playhead + return jsonify(playhead) def create_app(): return app