television/src/scheduler/app.py
2024-01-12 16:54:53 +02:00

233 lines
9.1 KiB
Python

import os
import logging
import json
import time
from datetime import datetime
from flask import Flask, render_template, jsonify, request
from apscheduler.schedulers.background import BackgroundScheduler
from core_client import Client
from ffmpeg import FFmpeg
app = Flask(__name__)
scheduler = BackgroundScheduler()
# Log handlers
logger_api = logging.getLogger('waitress')
logger_job = logging.getLogger('apscheduler')
log_level = os.environ.get('SCHEDULER_LOG_LEVEL', 'INFO').upper()
logger_api.setLevel(log_level)
logger_job.setLevel(log_level)
# Variables
CORE_SYNC_PERIOD = int(os.environ.get('CORE_SYNC_PERIOD', 15))
api_hostname = os.environ.get('CORE_API_HOSTNAME', 'stream.example.com')
api_username = os.environ.get('CORE_API_AUTH_USERNAME', 'admin')
api_password = os.environ.get('CORE_API_AUTH_PASSWORD', 'pass')
# Init
database = {}
prio = 0
playhead = {}
rechead = ""
with open('/config/epg.json', 'r') as epg_json:
epg = json.load(epg_json)
epg_json.close()
# Helper function to get process details
def get_core_process_details(client, process_id):
try:
return client.v3_process_get(id=process_id)
except Exception as err:
logger_job.error(f'Error getting process details for {process_id}: {err}')
return None
# Helper function to process a running channel
def process_running_channel(database, scheduler, stream_id, stream_name, stream_description, stream_hls_url):
global recording
if stream_id in database:
# Skip learned channels
return
else:
epg_result = find_event_entry(epg, stream_name)
stream_start = epg_result.get('start_at')
stream_prio = epg_result.get('prio', 0)
if stream_start == "never":
# Skip channels that are set to never start automatically
return
logger_job.warning(f'{stream_id} ({stream_name}) has been registered to the database')
if stream_start == "now":
logger_job.warning("Stream should start now")
scheduler.add_job(func=exec_stream, id=stream_id, args=(stream_id, stream_name, stream_prio, stream_hls_url))
rec_id = f'rec_{stream_id}'
scheduler.add_job(func=exec_recorder, id=rec_id, args=(stream_id, stream_hls_url))
else:
logger_job.warning(f"Stream start hour is set to {stream_start}")
scheduler.add_job(
func=exec_stream, trigger='cron', hour=stream_start, jitter=60,
id=stream_id, args=(stream_id, stream_name, stream_prio, stream_hls_url)
)
database.update({stream_id: {'name': stream_name, 'start_at': stream_start, 'meta': stream_description, 'src': stream_hls_url}})
# Bootstrap the playhead if its still empty.
if playhead == {}:
fallback = fallback_search(database)
scheduler.add_job(func=exec_stream, id="fallback", args=(fallback['stream_id'], fallback['stream_name'], 0, fallback['stream_hls_url']))
# Helper function to remove channel from the database
def remove_channel_from_database(database, scheduler, stream_id, stream_name, state):
global prio
global playhead
global rechead
if stream_id in database:
logger_job.warning(f'{stream_id} ({stream_name}) will be removed from the database. Reason: {state.exec}')
database.pop(stream_id)
try:
scheduler.remove_job(stream_id)
except Exception as joberror:
logger_job.error(joberror)
# If the removed stream is currently recording we should stop the rec process and reset the rechead
if stream_id == rechead:
logger_job.warning(f'{stream_id} recording stopped.')
rec_id = f'rec_{stream_id}'
scheduler.remove_job(rec_id)
rechead = ""
# Handle the situation where we remove an stream that is currently playing
if stream_id == playhead['id']
logger_job.warning(f'{stream_id} was playing.')
fallback = fallback_search(database)
prio = 0
logger_job.warning(f'Source priority is reset to 0')
scheduler.add_job(func=exec_stream, id="fallback", args=(fallback['stream_id'], fallback['stream_name'], 0, fallback['stream_hls_url']))
# Helper function to search for a fallback stream
def fallback_search(database):
logger_job.warning('Searching for a fallback job.')
current_hour = datetime.now().hour
hour_set = []
for key, value in database.items():
if value['start_at'] == "now" or value['start_at'] == "never":
# do not use non-time scheduled streams as fallbacks
continue
else:
hour_set.append(value['start_at'])
closest_hour = min(hour_set, key=lambda item: abs(int(item) - current_hour))
for key, value in database.items():
if value['start_at'] == str(closest_hour):
fallback = { "stream_id": key,
"stream_name": value['name'],
"stream_hls_url": value['src']
}
return fallback
# Helper function to find match a stream name with epg.json
def find_event_entry(epg, stream_name):
for entry in epg:
if "name" in entry and entry["name"] == stream_name:
return {"start_at": entry.get("start_at"), "prio": entry.get("prio")}
return None
# Helper function to update the playhead
def update_playhead(stream_id, stream_prio, stream_hls_url):
global playhead
playhead = { "id": stream_id,
"prio": stream_prio,
"head": stream_hls_url }
logger_job.warning(f'Playhead position is: {str(playhead)}')
# Execute stream
def exec_stream(stream_id, stream_name, stream_prio, stream_hls_url):
global prio
logger_job.warning(f'Hello {stream_name}!')
if stream_prio > prio:
prio = stream_prio
logger_job.warning(f'Source priority is now set to: {prio}')
update_playhead(stream_id, stream_prio, stream_hls_url)
elif stream_prio == prio:
update_playhead(stream_id, stream_prio, stream_hls_url)
elif stream_prio < prio:
logger_job.warning(f'Source with higher priority ({prio}) is blocking. Skipping playhead update!')
# Execute recorder
def exec_recorder(stream_id, stream_hls_url):
global rechead
current_datetime = datetime.now().strftime("%Y-%m-%d_%H-%M-%S-%f")
output_file = current_datetime + ".mp4"
if rechead != "":
logger_job.error('Recorder is already started. Refusing to start another rec job.')
return False
else:
logger_job.warning(f'Starting recording job for {output_file}')
rechead = stream_id
ffmpeg = (
FFmpeg()
.option("y")
.input(stream_hls_url)
.output(output_file, vcodec="copy")
)
ffmpeg.execute()
def core_api_sync():
global database
global epg
new_ids = []
try:
process_list = client.v3_process_get_list()
except Exception as err:
logger_job.error(f'Error getting process list: {err}')
return True
for process in process_list:
try:
get_process = get_core_process_details(client, process.id)
if not get_process:
continue
stream_id = get_process.reference
meta = get_process.metadata
state = get_process.state
except Exception as err:
logger_job.error(f'Error processing {process.id}: {err}')
continue
if meta is None or meta['restreamer-ui'].get('meta') is None:
# Skip processes without metadata or meta key
continue
new_ids.append(stream_id)
stream_name = meta['restreamer-ui']['meta']['name']
stream_description = meta['restreamer-ui']['meta']['description']
stream_storage_type = meta['restreamer-ui']['control']['hls']['storage']
stream_hls_url = f'https://{api_hostname}/{stream_storage_type}/{stream_id}.m3u8'
if state.exec == "running":
process_running_channel(database, scheduler, stream_id, stream_name, stream_description, stream_hls_url)
else:
remove_channel_from_database(database, scheduler, stream_id, stream_name, state)
new_ids.remove(stream_id)
# Cleanup orphaned references
orphan_keys = [key for key in database if key not in new_ids]
for orphan_key in orphan_keys:
logger_job.warning(f'Key {orphan_key} is an orphan. Removing.')
database.pop(orphan_key)
scheduler.remove_job(orphan_key)
# Login
# TODO fix logger_api
try:
client = Client(base_url='https://' + api_hostname, username=api_username, password=api_password)
logger_api.warning('Logging in to Datarhei Core API ' + api_username + '@' + api_hostname)
client.login()
except Exception as err:
logger_api.error('Client login error')
logger_api.error(err)
# Schedule datarhei core api sync
scheduler.add_job(func=core_api_sync, trigger="interval", seconds=CORE_SYNC_PERIOD, id="core_api_sync")
scheduler.start()
@app.route('/', methods=['GET'])
def root_query():
global playhead
return jsonify(playhead)
def create_app():
return app