From 06da63cd0c678e377d1c1724e9cb1cf5de43de67 Mon Sep 17 00:00:00 2001 From: deflax Date: Sun, 7 Jan 2024 02:08:25 +0200 Subject: [PATCH] refactor using chatgpt :] --- src/scheduler/app.py | 116 ++++++++++++++++++++++++------------------- 1 file changed, 64 insertions(+), 52 deletions(-) diff --git a/src/scheduler/app.py b/src/scheduler/app.py index 70194f8..7d28e40 100644 --- a/src/scheduler/app.py +++ b/src/scheduler/app.py @@ -35,7 +35,42 @@ epg_json.close() print(epg) logger_api.info(epg) -# Helper functions +# Helper function to get process details +def get_core_process_details(client, process_id): + try: + return client.v3_process_get(id=process_id) + except Exception as err: + logger_job.error(f'Error getting process details for {process_id}: {err}') + return None + +# Helper function to process a running channel +def process_running_channel(client, database, scheduler, stream_id, stream_name, stream_description, stream_hls_url): + if stream_id in database: + # Skip learned channels + return + else: + logger_job.info(f'{stream_id} ({stream_name}) has been registered to the database') + epg_result = find_event_entry(epg, stream_name) + logger_job.info(epg_result) + stream_prio = epg_result.get('prio', 0) + try: + stream_start_hour = epg_result['start_at'] + logger_job.info(f"Stream start hour is set to {stream_start_hour}") + scheduler.add_job( + func=stream_exec, trigger='cron', hour=stream_start_hour, jitter=60, + id=stream_id, args=(stream_name, stream_prio, stream_hls_url) + ) + except TypeError: + logger_job.info("Stream should start now") + scheduler.add_job(func=stream_exec, id=stream_id, args=(stream_name, stream_prio, stream_hls_url)) + database.update({stream_id: {'name': stream_name, 'meta': stream_description, 'src': stream_hls_url}}) + +# Helper function to remove channel from the database +def remove_channel_from_database(stream_id, stream_name, state, database, scheduler): + logger_job.info(f'{stream_id} ({stream_name}) has been removed from the database. Reason: {state.exec}') + database.pop(stream_id) + scheduler.remove_job(stream_id) + def find_event_entry(events, target_name): for entry in events: if "name" in entry and entry["name"] == target_name: @@ -52,77 +87,54 @@ def stream_exec(stream_name, stream_prio, stream_hls_url): head = { "head": stream_hls_url } logger_job.info('head position is: ' + str(head)) +# Main function for synchronizing with Datarhei Core API def core_api_sync(): global database global epg - global prio + new_ids = [] try: process_list = client.v3_process_get_list() except Exception as err: - logger_job.error('client.v3_process_get_list ' + err) + logger_job.error(f'Error getting process list: {err}') return True + for process in process_list: try: - get_process = client.v3_process_get(id=process.id) + get_process = get_core_process_details(client, process.id) + if not get_process: + continue + stream_id = get_process.reference meta = get_process.metadata state = get_process.state except Exception as err: - logger_job.error('client.v3_process_get ' + err) + logger_job.error(f'Error processing process {process.id}: {err}') continue - if meta is None: - # Skip processes without metadata + + if meta is None or meta['restreamer-ui'].get('meta') is None: + # Skip processes without metadata or meta key continue + + new_ids.append(stream_id) + stream_name = meta['restreamer-ui']['meta']['name'] + stream_description = meta['restreamer-ui']['meta']['description'] + stream_storage_type = meta['restreamer-ui']['control']['hls']['storage'] + stream_hls_url = f'https://{api_hostname}/{stream_storage_type}/{stream_id}.m3u8' + + payload = {stream_id: {'name': stream_name, 'meta': stream_description, 'src': stream_hls_url}} + + if state.exec == "running": + process_running_channel(client, database, scheduler, stream_id, stream_name, stream_description, stream_hls_url) else: - if meta['restreamer-ui'].get('meta') is None: - # Skip processes without meta key - #logger_job.warn('{} does not have a meta key'.format(stream_id)) - continue - new_ids.append(stream_id) - stream_name = meta['restreamer-ui']['meta']['name'] - stream_description = meta['restreamer-ui']['meta']['description'] - stream_storage_type = meta['restreamer-ui']['control']['hls']['storage'] - stream_hls_url = 'https://{}/{}/{}.m3u8'.format(api_hostname, stream_storage_type, stream_id) - payload = { stream_id: { 'name': stream_name, 'meta': stream_description, 'src': stream_hls_url } } - - if state.exec == "running": - # Register a running channel to the database - if stream_id in database: - # Skip learned channels - continue - else: - logger_job.info('{} ({}) has been registered to the database'.format(stream_id, stream_name)) - epg_result = find_event_entry(epg, stream_name) - logger_job.info(epg_result) - #stream_prio = epg_result['prio'] - stream_prio = 0 - try: - stream_start_hour = epg_result['start_at'] - logger_job.info("Stream start hour is set to " + stream_start_hour) - scheduler.add_job(func=stream_exec, trigger='cron', hour=stream_start_hour, jitter=60, id=stream_id, args=(stream_name, stream_prio, stream_hls_url)) - except TypeError: - logger_job.info("Stream should start now") - scheduler.add_job(func=stream_exec, id=stream_id, args=(stream_name, stream_prio, stream_hls_url)) - database.update(payload) - else: - # Remove from the database if the state is changed - if stream_id in database: - logger_job.info('{} ({}) has been removed from the database. Reason: {}'.format(stream_id, stream_name, state.exec)) - database.pop(stream_id) - scheduler.remove_job(stream_id) - new_ids.remove(stream_id) + remove_channel_from_database(stream_id, stream_name, state, database, scheduler) + # Cleanup orphaned references - orphan_keys = [] - for key in database: - if key in new_ids: - continue - else: - logger_job.info('Key {} is an orphan. Removing.'.format(key)) - orphan_keys.append(key) + orphan_keys = [key for key in database if key not in new_ids] for orphan_key in orphan_keys: + logger_job.info(f'Key {orphan_key} is an orphan. Removing.') database.pop(orphan_key) - scheduler.remove_job(stream_id) + scheduler.remove_job(orphan_key) def show_database(): global database