handle removing running stream

This commit is contained in:
deflax 2024-01-07 05:08:54 +02:00
parent 2426c898d2
commit b2181fe25c

View file

@ -58,48 +58,72 @@ def process_running_channel(database, scheduler, stream_id, stream_name, stream_
logger_job.info(f'{stream_id} ({stream_name}) has been registered to the database') logger_job.info(f'{stream_id} ({stream_name}) has been registered to the database')
if stream_start == "now": if stream_start == "now":
logger_job.info("Stream should start now") logger_job.info("Stream should start now")
scheduler.add_job(func=stream_exec, id=stream_id, args=(stream_name, stream_prio, stream_hls_url)) scheduler.add_job(func=stream_exec, id=stream_id, args=(stream_id, stream_name, stream_prio, stream_hls_url))
else: else:
logger_job.info(f"Stream start hour is set to {stream_start}") logger_job.info(f"Stream start hour is set to {stream_start}")
scheduler.add_job( scheduler.add_job(
func=stream_exec, trigger='cron', hour=stream_start, jitter=60, func=stream_exec, trigger='cron', hour=stream_start, jitter=60,
id=stream_id, args=(stream_name, stream_prio, stream_hls_url) id=stream_id, args=(stream_id, stream_name, stream_prio, stream_hls_url)
) )
database.update({stream_id: {'name': stream_name, 'meta': stream_description, 'src': stream_hls_url}}) database.update({stream_id: {'name': stream_name, 'start_at': stream_start, 'meta': stream_description, 'src': stream_hls_url}})
# Helper function to remove channel from the database # Helper function to remove channel from the database
def remove_channel_from_database(database, scheduler, stream_id, stream_name, state): def remove_channel_from_database(database, scheduler, stream_id, stream_name, state):
global prio
global head
if stream_id in database: if stream_id in database:
logger_job.info(f'{stream_id} ({stream_name}) has been removed from the database. Reason: {state.exec}') logger_job.info(f'{stream_id} ({stream_name}) will be removed from the database. Reason: {state.exec}')
# Handle the situation where we remove an stream that is currently playing
if head['id'] == stream_id:
logger_job.warning(f'{stream_id} was currently running. Searching for a fallback job.')
current_hour = datetime.now().hour
hour_set = []
for key, value in database.items():
if value['start_at'] == "now" or value['start_at'] == "never":
continue
else:
hour_set.append(value['start_at'])
closest_hour = min(hour_set, key=lambda item: abs(item - current_hour))
for key, value in database.items():
if value['start_at'] == closest_hour:
fallback_stream_id = key
fallback_stream_name = value['name']
fallback_hls_url = value['src']
break
prio = 0
logger_job.info(f'Source priority is reset to 0')
scheduler.add_job(func=stream_exec, id="fallback", args=(fallback_stream_id, fallback_stream_name, 0, fallback_hls_url))
database.pop(stream_id) database.pop(stream_id)
try: try:
scheduler.remove_job(stream_id) scheduler.remove_job(stream_id)
except scheduler.jobstores.base.JobLookupError as je: except Exception as joberror:
logger_job.error(je) logger_job.error(joberror)
# Helper function to find match a stream name with epg.json # Helper function to find match a stream name with epg.json
def find_event_entry(events, target_name): def find_event_entry(epg, stream_name):
for entry in events: for entry in epg:
if "name" in entry and entry["name"] == target_name: if "name" in entry and entry["name"] == stream_name:
return {"start_at": entry.get("start_at"), "prio": entry.get("prio")} return {"start_at": entry.get("start_at"), "prio": entry.get("prio")}
return None return None
# Helper function to update the head # Helper function to update the head
def update_head(stream_hls_url): def update_head(stream_id, stream_prio, stream_hls_url):
head = { "head": stream_hls_url } global head
head = { "id": stream_id,
"prio": stream_prio,
"head": stream_hls_url }
logger_job.info(f'Head position is: {str(head)}') logger_job.info(f'Head position is: {str(head)}')
# Tasks # Tasks
def stream_exec(stream_name, stream_prio, stream_hls_url): def stream_exec(stream_id, stream_name, stream_prio, stream_hls_url):
global head
global prio global prio
logger_job.info(f'Hello {stream_name}!') logger_job.info(f'Hello {stream_name}!')
if stream_prio > prio: if stream_prio > prio:
logger_job.info(f'Source priority is now set to: {stream_prio}')
prio = stream_prio prio = stream_prio
update_head(stream_hls_url) logger_job.info(f'Source priority is now set to: {prio}')
update_head(stream_id, stream_prio, stream_hls_url)
elif stream_prio == prio: elif stream_prio == prio:
update_head(stream_hls_url) update_head(stream_id, stream_prio, stream_hls_url)
elif stream_prio < prio: elif stream_prio < prio:
logger_job.warning(f'Source with higher priority ({prio}) is blocking. Skipping head update!') logger_job.warning(f'Source with higher priority ({prio}) is blocking. Skipping head update!')
@ -113,7 +137,6 @@ def core_api_sync():
except Exception as err: except Exception as err:
logger_job.error(f'Error getting process list: {err}') logger_job.error(f'Error getting process list: {err}')
return True return True
for process in process_list: for process in process_list:
try: try:
get_process = get_core_process_details(client, process.id) get_process = get_core_process_details(client, process.id)
@ -159,7 +182,7 @@ try:
logger_api.info('Logging in to Datarhei Core API ' + api_username + '@' + api_hostname) logger_api.info('Logging in to Datarhei Core API ' + api_username + '@' + api_hostname)
client.login() client.login()
except Exception as err: except Exception as err:
logger_api.error('client login error') logger_api.error('Client login error')
logger_api.error(err) logger_api.error(err)
core_api_sync() core_api_sync()