Merge branch 'main' of github.com:deflax/television
This commit is contained in:
commit
e14b575b97
1 changed files with 55 additions and 23 deletions
|
@ -1,11 +1,12 @@
|
||||||
import os
|
import os
|
||||||
import logging
|
import logging
|
||||||
import json
|
import json
|
||||||
|
import time
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from flask import Flask, render_template, jsonify, request
|
from flask import Flask, render_template, jsonify, request
|
||||||
from apscheduler.schedulers.background import BackgroundScheduler
|
from apscheduler.schedulers.background import BackgroundScheduler
|
||||||
from core_client import Client
|
from core_client import Client
|
||||||
import time
|
from ffmpeg import FFmpeg
|
||||||
|
|
||||||
app = Flask(__name__)
|
app = Flask(__name__)
|
||||||
scheduler = BackgroundScheduler()
|
scheduler = BackgroundScheduler()
|
||||||
|
@ -26,7 +27,8 @@ api_password = os.environ.get('CORE_API_AUTH_PASSWORD', 'pass')
|
||||||
# Init
|
# Init
|
||||||
database = {}
|
database = {}
|
||||||
prio = 0
|
prio = 0
|
||||||
head = {}
|
playhead = {}
|
||||||
|
rechead = ""
|
||||||
|
|
||||||
with open('/config/epg.json', 'r') as epg_json:
|
with open('/config/epg.json', 'r') as epg_json:
|
||||||
epg = json.load(epg_json)
|
epg = json.load(epg_json)
|
||||||
|
@ -42,6 +44,7 @@ def get_core_process_details(client, process_id):
|
||||||
|
|
||||||
# Helper function to process a running channel
|
# Helper function to process a running channel
|
||||||
def process_running_channel(database, scheduler, stream_id, stream_name, stream_description, stream_hls_url):
|
def process_running_channel(database, scheduler, stream_id, stream_name, stream_description, stream_hls_url):
|
||||||
|
global recording
|
||||||
if stream_id in database:
|
if stream_id in database:
|
||||||
# Skip learned channels
|
# Skip learned channels
|
||||||
return
|
return
|
||||||
|
@ -55,23 +58,26 @@ def process_running_channel(database, scheduler, stream_id, stream_name, stream_
|
||||||
logger_job.warning(f'{stream_id} ({stream_name}) has been registered to the database')
|
logger_job.warning(f'{stream_id} ({stream_name}) has been registered to the database')
|
||||||
if stream_start == "now":
|
if stream_start == "now":
|
||||||
logger_job.warning("Stream should start now")
|
logger_job.warning("Stream should start now")
|
||||||
scheduler.add_job(func=stream_exec, id=stream_id, args=(stream_id, stream_name, stream_prio, stream_hls_url))
|
scheduler.add_job(func=exec_stream, id=stream_id, args=(stream_id, stream_name, stream_prio, stream_hls_url))
|
||||||
|
rec_id = f'rec_{stream_id}'
|
||||||
|
scheduler.add_job(func=exec_recorder, id=rec_id, args=(stream_id, stream_hls_url))
|
||||||
else:
|
else:
|
||||||
logger_job.warning(f"Stream start hour is set to {stream_start}")
|
logger_job.warning(f"Stream start hour is set to {stream_start}")
|
||||||
scheduler.add_job(
|
scheduler.add_job(
|
||||||
func=stream_exec, trigger='cron', hour=stream_start, jitter=60,
|
func=exec_stream, trigger='cron', hour=stream_start, jitter=60,
|
||||||
id=stream_id, args=(stream_id, stream_name, stream_prio, stream_hls_url)
|
id=stream_id, args=(stream_id, stream_name, stream_prio, stream_hls_url)
|
||||||
)
|
)
|
||||||
database.update({stream_id: {'name': stream_name, 'start_at': stream_start, 'meta': stream_description, 'src': stream_hls_url}})
|
database.update({stream_id: {'name': stream_name, 'start_at': stream_start, 'meta': stream_description, 'src': stream_hls_url}})
|
||||||
# Bootstrap the playhead if its still empty.
|
# Bootstrap the playhead if its still empty.
|
||||||
if head == {}:
|
if playhead == {}:
|
||||||
fallback = fallback_search(database)
|
fallback = fallback_search(database)
|
||||||
scheduler.add_job(func=stream_exec, id="fallback", args=(fallback['stream_id'], fallback['stream_name'], 0, fallback['stream_hls_url']))
|
scheduler.add_job(func=exec_stream, id="fallback", args=(fallback['stream_id'], fallback['stream_name'], 0, fallback['stream_hls_url']))
|
||||||
|
|
||||||
# Helper function to remove channel from the database
|
# Helper function to remove channel from the database
|
||||||
def remove_channel_from_database(database, scheduler, stream_id, stream_name, state):
|
def remove_channel_from_database(database, scheduler, stream_id, stream_name, state):
|
||||||
global prio
|
global prio
|
||||||
global head
|
global playhead
|
||||||
|
global rechead
|
||||||
if stream_id in database:
|
if stream_id in database:
|
||||||
logger_job.warning(f'{stream_id} ({stream_name}) will be removed from the database. Reason: {state.exec}')
|
logger_job.warning(f'{stream_id} ({stream_name}) will be removed from the database. Reason: {state.exec}')
|
||||||
database.pop(stream_id)
|
database.pop(stream_id)
|
||||||
|
@ -79,13 +85,19 @@ def remove_channel_from_database(database, scheduler, stream_id, stream_name, st
|
||||||
scheduler.remove_job(stream_id)
|
scheduler.remove_job(stream_id)
|
||||||
except Exception as joberror:
|
except Exception as joberror:
|
||||||
logger_job.error(joberror)
|
logger_job.error(joberror)
|
||||||
|
# If the removed stream is currently recording we should stop the rec process and reset the rechead
|
||||||
|
if stream_id == rechead:
|
||||||
|
logger_job.warning(f'{stream_id} recording stopped.')
|
||||||
|
rec_id = f'rec_{stream_id}'
|
||||||
|
scheduler.remove_job(rec_id)
|
||||||
|
rechead = ""
|
||||||
# Handle the situation where we remove an stream that is currently playing
|
# Handle the situation where we remove an stream that is currently playing
|
||||||
if head['id'] == stream_id:
|
if stream_id == playhead['id']
|
||||||
logger_job.warning(f'{stream_id} was currently running.')
|
logger_job.warning(f'{stream_id} was playing.')
|
||||||
fallback = fallback_search(database)
|
fallback = fallback_search(database)
|
||||||
prio = 0
|
prio = 0
|
||||||
logger_job.warning(f'Source priority is reset to 0')
|
logger_job.warning(f'Source priority is reset to 0')
|
||||||
scheduler.add_job(func=stream_exec, id="fallback", args=(fallback['stream_id'], fallback['stream_name'], 0, fallback['stream_hls_url']))
|
scheduler.add_job(func=exec_stream, id="fallback", args=(fallback['stream_id'], fallback['stream_name'], 0, fallback['stream_hls_url']))
|
||||||
|
|
||||||
# Helper function to search for a fallback stream
|
# Helper function to search for a fallback stream
|
||||||
def fallback_search(database):
|
def fallback_search(database):
|
||||||
|
@ -94,6 +106,7 @@ def fallback_search(database):
|
||||||
hour_set = []
|
hour_set = []
|
||||||
for key, value in database.items():
|
for key, value in database.items():
|
||||||
if value['start_at'] == "now" or value['start_at'] == "never":
|
if value['start_at'] == "now" or value['start_at'] == "never":
|
||||||
|
# do not use non-time scheduled streams as fallbacks
|
||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
hour_set.append(value['start_at'])
|
hour_set.append(value['start_at'])
|
||||||
|
@ -113,26 +126,45 @@ def find_event_entry(epg, stream_name):
|
||||||
return {"start_at": entry.get("start_at"), "prio": entry.get("prio")}
|
return {"start_at": entry.get("start_at"), "prio": entry.get("prio")}
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# Helper function to update the head
|
# Helper function to update the playhead
|
||||||
def update_head(stream_id, stream_prio, stream_hls_url):
|
def update_playhead(stream_id, stream_prio, stream_hls_url):
|
||||||
global head
|
global playhead
|
||||||
head = { "id": stream_id,
|
playhead = { "id": stream_id,
|
||||||
"prio": stream_prio,
|
"prio": stream_prio,
|
||||||
"head": stream_hls_url }
|
"head": stream_hls_url }
|
||||||
logger_job.warning(f'Head position is: {str(head)}')
|
logger_job.warning(f'Playhead position is: {str(playhead)}')
|
||||||
|
|
||||||
# Tasks
|
# Execute stream
|
||||||
def stream_exec(stream_id, stream_name, stream_prio, stream_hls_url):
|
def exec_stream(stream_id, stream_name, stream_prio, stream_hls_url):
|
||||||
global prio
|
global prio
|
||||||
logger_job.warning(f'Hello {stream_name}!')
|
logger_job.warning(f'Hello {stream_name}!')
|
||||||
if stream_prio > prio:
|
if stream_prio > prio:
|
||||||
prio = stream_prio
|
prio = stream_prio
|
||||||
logger_job.warning(f'Source priority is now set to: {prio}')
|
logger_job.warning(f'Source priority is now set to: {prio}')
|
||||||
update_head(stream_id, stream_prio, stream_hls_url)
|
update_playhead(stream_id, stream_prio, stream_hls_url)
|
||||||
elif stream_prio == prio:
|
elif stream_prio == prio:
|
||||||
update_head(stream_id, stream_prio, stream_hls_url)
|
update_playhead(stream_id, stream_prio, stream_hls_url)
|
||||||
elif stream_prio < prio:
|
elif stream_prio < prio:
|
||||||
logger_job.warning(f'Source with higher priority ({prio}) is blocking. Skipping head update!')
|
logger_job.warning(f'Source with higher priority ({prio}) is blocking. Skipping playhead update!')
|
||||||
|
|
||||||
|
# Execute recorder
|
||||||
|
def exec_recorder(stream_id, stream_hls_url):
|
||||||
|
global rechead
|
||||||
|
current_datetime = datetime.now().strftime("%Y-%m-%d_%H-%M-%S-%f")
|
||||||
|
output_file = current_datetime + ".mp4"
|
||||||
|
if rechead != "":
|
||||||
|
logger_job.error('Recorder is already started. Refusing to start another rec job.')
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
logger_job.warning(f'Starting recording job for {output_file}')
|
||||||
|
rechead = stream_id
|
||||||
|
ffmpeg = (
|
||||||
|
FFmpeg()
|
||||||
|
.option("y")
|
||||||
|
.input(stream_hls_url)
|
||||||
|
.output(output_file, vcodec="copy")
|
||||||
|
)
|
||||||
|
ffmpeg.execute()
|
||||||
|
|
||||||
def core_api_sync():
|
def core_api_sync():
|
||||||
global database
|
global database
|
||||||
|
@ -181,7 +213,7 @@ def core_api_sync():
|
||||||
# TODO fix logger_api
|
# TODO fix logger_api
|
||||||
try:
|
try:
|
||||||
client = Client(base_url='https://' + api_hostname, username=api_username, password=api_password)
|
client = Client(base_url='https://' + api_hostname, username=api_username, password=api_password)
|
||||||
logger_api.info('Logging in to Datarhei Core API ' + api_username + '@' + api_hostname)
|
logger_api.warning('Logging in to Datarhei Core API ' + api_username + '@' + api_hostname)
|
||||||
client.login()
|
client.login()
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
logger_api.error('Client login error')
|
logger_api.error('Client login error')
|
||||||
|
@ -194,8 +226,8 @@ scheduler.start()
|
||||||
|
|
||||||
@app.route('/', methods=['GET'])
|
@app.route('/', methods=['GET'])
|
||||||
def root_query():
|
def root_query():
|
||||||
global head
|
global playhead
|
||||||
return jsonify(head)
|
return jsonify(playhead)
|
||||||
|
|
||||||
def create_app():
|
def create_app():
|
||||||
return app
|
return app
|
||||||
|
|
Loading…
Reference in a new issue