diff --git a/src/scheduler/app.py b/src/scheduler/app.py index 7b813f5..780c83d 100644 --- a/src/scheduler/app.py +++ b/src/scheduler/app.py @@ -58,48 +58,72 @@ def process_running_channel(database, scheduler, stream_id, stream_name, stream_ logger_job.info(f'{stream_id} ({stream_name}) has been registered to the database') if stream_start == "now": logger_job.info("Stream should start now") - scheduler.add_job(func=stream_exec, id=stream_id, args=(stream_name, stream_prio, stream_hls_url)) + scheduler.add_job(func=stream_exec, id=stream_id, args=(stream_id, stream_name, stream_prio, stream_hls_url)) else: logger_job.info(f"Stream start hour is set to {stream_start}") scheduler.add_job( func=stream_exec, trigger='cron', hour=stream_start, jitter=60, - id=stream_id, args=(stream_name, stream_prio, stream_hls_url) + id=stream_id, args=(stream_id, stream_name, stream_prio, stream_hls_url) ) - database.update({stream_id: {'name': stream_name, 'meta': stream_description, 'src': stream_hls_url}}) + database.update({stream_id: {'name': stream_name, 'start_at': stream_start, 'meta': stream_description, 'src': stream_hls_url}}) # Helper function to remove channel from the database def remove_channel_from_database(database, scheduler, stream_id, stream_name, state): + global prio + global head if stream_id in database: - logger_job.info(f'{stream_id} ({stream_name}) has been removed from the database. Reason: {state.exec}') + logger_job.info(f'{stream_id} ({stream_name}) will be removed from the database. Reason: {state.exec}') + # Handle the situation where we remove an stream that is currently playing + if head['id'] == stream_id: + logger_job.warning(f'{stream_id} was currently running. Searching for a fallback job.') + current_hour = datetime.now().hour + hour_set = [] + for key, value in database.items(): + if value['start_at'] == "now" or value['start_at'] == "never": + continue + else: + hour_set.append(value['start_at']) + closest_hour = min(hour_set, key=lambda item: abs(item - current_hour)) + for key, value in database.items(): + if value['start_at'] == closest_hour: + fallback_stream_id = key + fallback_stream_name = value['name'] + fallback_hls_url = value['src'] + break + prio = 0 + logger_job.info(f'Source priority is reset to 0') + scheduler.add_job(func=stream_exec, id="fallback", args=(fallback_stream_id, fallback_stream_name, 0, fallback_hls_url)) database.pop(stream_id) try: scheduler.remove_job(stream_id) - except scheduler.jobstores.base.JobLookupError as je: - logger_job.error(je) + except Exception as joberror: + logger_job.error(joberror) # Helper function to find match a stream name with epg.json -def find_event_entry(events, target_name): - for entry in events: - if "name" in entry and entry["name"] == target_name: +def find_event_entry(epg, stream_name): + for entry in epg: + if "name" in entry and entry["name"] == stream_name: return {"start_at": entry.get("start_at"), "prio": entry.get("prio")} return None # Helper function to update the head -def update_head(stream_hls_url): - head = { "head": stream_hls_url } +def update_head(stream_id, stream_prio, stream_hls_url): + global head + head = { "id": stream_id, + "prio": stream_prio, + "head": stream_hls_url } logger_job.info(f'Head position is: {str(head)}') # Tasks -def stream_exec(stream_name, stream_prio, stream_hls_url): - global head +def stream_exec(stream_id, stream_name, stream_prio, stream_hls_url): global prio logger_job.info(f'Hello {stream_name}!') if stream_prio > prio: - logger_job.info(f'Source priority is now set to: {stream_prio}') prio = stream_prio - update_head(stream_hls_url) + logger_job.info(f'Source priority is now set to: {prio}') + update_head(stream_id, stream_prio, stream_hls_url) elif stream_prio == prio: - update_head(stream_hls_url) + update_head(stream_id, stream_prio, stream_hls_url) elif stream_prio < prio: logger_job.warning(f'Source with higher priority ({prio}) is blocking. Skipping head update!') @@ -113,7 +137,6 @@ def core_api_sync(): except Exception as err: logger_job.error(f'Error getting process list: {err}') return True - for process in process_list: try: get_process = get_core_process_details(client, process.id) @@ -159,7 +182,7 @@ try: logger_api.info('Logging in to Datarhei Core API ' + api_username + '@' + api_hostname) client.login() except Exception as err: - logger_api.error('client login error') + logger_api.error('Client login error') logger_api.error(err) core_api_sync()