refactor using chatgpt :]
This commit is contained in:
parent
64373534d6
commit
06da63cd0c
1 changed files with 64 additions and 52 deletions
|
@ -35,7 +35,42 @@ epg_json.close()
|
||||||
print(epg)
|
print(epg)
|
||||||
logger_api.info(epg)
|
logger_api.info(epg)
|
||||||
|
|
||||||
# Helper functions
|
# Helper function to get process details
|
||||||
|
def get_core_process_details(client, process_id):
|
||||||
|
try:
|
||||||
|
return client.v3_process_get(id=process_id)
|
||||||
|
except Exception as err:
|
||||||
|
logger_job.error(f'Error getting process details for {process_id}: {err}')
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Helper function to process a running channel
|
||||||
|
def process_running_channel(client, database, scheduler, stream_id, stream_name, stream_description, stream_hls_url):
|
||||||
|
if stream_id in database:
|
||||||
|
# Skip learned channels
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
logger_job.info(f'{stream_id} ({stream_name}) has been registered to the database')
|
||||||
|
epg_result = find_event_entry(epg, stream_name)
|
||||||
|
logger_job.info(epg_result)
|
||||||
|
stream_prio = epg_result.get('prio', 0)
|
||||||
|
try:
|
||||||
|
stream_start_hour = epg_result['start_at']
|
||||||
|
logger_job.info(f"Stream start hour is set to {stream_start_hour}")
|
||||||
|
scheduler.add_job(
|
||||||
|
func=stream_exec, trigger='cron', hour=stream_start_hour, jitter=60,
|
||||||
|
id=stream_id, args=(stream_name, stream_prio, stream_hls_url)
|
||||||
|
)
|
||||||
|
except TypeError:
|
||||||
|
logger_job.info("Stream should start now")
|
||||||
|
scheduler.add_job(func=stream_exec, id=stream_id, args=(stream_name, stream_prio, stream_hls_url))
|
||||||
|
database.update({stream_id: {'name': stream_name, 'meta': stream_description, 'src': stream_hls_url}})
|
||||||
|
|
||||||
|
# Helper function to remove channel from the database
|
||||||
|
def remove_channel_from_database(stream_id, stream_name, state, database, scheduler):
|
||||||
|
logger_job.info(f'{stream_id} ({stream_name}) has been removed from the database. Reason: {state.exec}')
|
||||||
|
database.pop(stream_id)
|
||||||
|
scheduler.remove_job(stream_id)
|
||||||
|
|
||||||
def find_event_entry(events, target_name):
|
def find_event_entry(events, target_name):
|
||||||
for entry in events:
|
for entry in events:
|
||||||
if "name" in entry and entry["name"] == target_name:
|
if "name" in entry and entry["name"] == target_name:
|
||||||
|
@ -52,77 +87,54 @@ def stream_exec(stream_name, stream_prio, stream_hls_url):
|
||||||
head = { "head": stream_hls_url }
|
head = { "head": stream_hls_url }
|
||||||
logger_job.info('head position is: ' + str(head))
|
logger_job.info('head position is: ' + str(head))
|
||||||
|
|
||||||
|
# Main function for synchronizing with Datarhei Core API
|
||||||
def core_api_sync():
|
def core_api_sync():
|
||||||
global database
|
global database
|
||||||
global epg
|
global epg
|
||||||
global prio
|
|
||||||
new_ids = []
|
new_ids = []
|
||||||
try:
|
try:
|
||||||
process_list = client.v3_process_get_list()
|
process_list = client.v3_process_get_list()
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
logger_job.error('client.v3_process_get_list ' + err)
|
logger_job.error(f'Error getting process list: {err}')
|
||||||
return True
|
return True
|
||||||
|
|
||||||
for process in process_list:
|
for process in process_list:
|
||||||
try:
|
try:
|
||||||
get_process = client.v3_process_get(id=process.id)
|
get_process = get_core_process_details(client, process.id)
|
||||||
|
if not get_process:
|
||||||
|
continue
|
||||||
|
|
||||||
stream_id = get_process.reference
|
stream_id = get_process.reference
|
||||||
meta = get_process.metadata
|
meta = get_process.metadata
|
||||||
state = get_process.state
|
state = get_process.state
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
logger_job.error('client.v3_process_get ' + err)
|
logger_job.error(f'Error processing process {process.id}: {err}')
|
||||||
continue
|
continue
|
||||||
if meta is None:
|
|
||||||
# Skip processes without metadata
|
if meta is None or meta['restreamer-ui'].get('meta') is None:
|
||||||
|
# Skip processes without metadata or meta key
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
new_ids.append(stream_id)
|
||||||
|
stream_name = meta['restreamer-ui']['meta']['name']
|
||||||
|
stream_description = meta['restreamer-ui']['meta']['description']
|
||||||
|
stream_storage_type = meta['restreamer-ui']['control']['hls']['storage']
|
||||||
|
stream_hls_url = f'https://{api_hostname}/{stream_storage_type}/{stream_id}.m3u8'
|
||||||
|
|
||||||
|
payload = {stream_id: {'name': stream_name, 'meta': stream_description, 'src': stream_hls_url}}
|
||||||
|
|
||||||
|
if state.exec == "running":
|
||||||
|
process_running_channel(client, database, scheduler, stream_id, stream_name, stream_description, stream_hls_url)
|
||||||
else:
|
else:
|
||||||
if meta['restreamer-ui'].get('meta') is None:
|
remove_channel_from_database(stream_id, stream_name, state, database, scheduler)
|
||||||
# Skip processes without meta key
|
|
||||||
#logger_job.warn('{} does not have a meta key'.format(stream_id))
|
|
||||||
continue
|
|
||||||
new_ids.append(stream_id)
|
|
||||||
stream_name = meta['restreamer-ui']['meta']['name']
|
|
||||||
stream_description = meta['restreamer-ui']['meta']['description']
|
|
||||||
stream_storage_type = meta['restreamer-ui']['control']['hls']['storage']
|
|
||||||
stream_hls_url = 'https://{}/{}/{}.m3u8'.format(api_hostname, stream_storage_type, stream_id)
|
|
||||||
payload = { stream_id: { 'name': stream_name, 'meta': stream_description, 'src': stream_hls_url } }
|
|
||||||
|
|
||||||
if state.exec == "running":
|
|
||||||
# Register a running channel to the database
|
|
||||||
if stream_id in database:
|
|
||||||
# Skip learned channels
|
|
||||||
continue
|
|
||||||
else:
|
|
||||||
logger_job.info('{} ({}) has been registered to the database'.format(stream_id, stream_name))
|
|
||||||
epg_result = find_event_entry(epg, stream_name)
|
|
||||||
logger_job.info(epg_result)
|
|
||||||
#stream_prio = epg_result['prio']
|
|
||||||
stream_prio = 0
|
|
||||||
try:
|
|
||||||
stream_start_hour = epg_result['start_at']
|
|
||||||
logger_job.info("Stream start hour is set to " + stream_start_hour)
|
|
||||||
scheduler.add_job(func=stream_exec, trigger='cron', hour=stream_start_hour, jitter=60, id=stream_id, args=(stream_name, stream_prio, stream_hls_url))
|
|
||||||
except TypeError:
|
|
||||||
logger_job.info("Stream should start now")
|
|
||||||
scheduler.add_job(func=stream_exec, id=stream_id, args=(stream_name, stream_prio, stream_hls_url))
|
|
||||||
database.update(payload)
|
|
||||||
else:
|
|
||||||
# Remove from the database if the state is changed
|
|
||||||
if stream_id in database:
|
|
||||||
logger_job.info('{} ({}) has been removed from the database. Reason: {}'.format(stream_id, stream_name, state.exec))
|
|
||||||
database.pop(stream_id)
|
|
||||||
scheduler.remove_job(stream_id)
|
|
||||||
new_ids.remove(stream_id)
|
|
||||||
# Cleanup orphaned references
|
# Cleanup orphaned references
|
||||||
orphan_keys = []
|
orphan_keys = [key for key in database if key not in new_ids]
|
||||||
for key in database:
|
|
||||||
if key in new_ids:
|
|
||||||
continue
|
|
||||||
else:
|
|
||||||
logger_job.info('Key {} is an orphan. Removing.'.format(key))
|
|
||||||
orphan_keys.append(key)
|
|
||||||
for orphan_key in orphan_keys:
|
for orphan_key in orphan_keys:
|
||||||
|
logger_job.info(f'Key {orphan_key} is an orphan. Removing.')
|
||||||
database.pop(orphan_key)
|
database.pop(orphan_key)
|
||||||
scheduler.remove_job(stream_id)
|
scheduler.remove_job(orphan_key)
|
||||||
|
|
||||||
def show_database():
|
def show_database():
|
||||||
global database
|
global database
|
||||||
|
|
Loading…
Reference in a new issue