Move the recorder away from the api
This commit is contained in:
parent
e7fca847e0
commit
c947b4b411
5 changed files with 106 additions and 109 deletions
|
@ -1,5 +1,3 @@
|
||||||
version: '3'
|
|
||||||
|
|
||||||
networks:
|
networks:
|
||||||
net:
|
net:
|
||||||
external: false
|
external: false
|
||||||
|
@ -38,6 +36,8 @@ services:
|
||||||
restart: "no"
|
restart: "no"
|
||||||
|
|
||||||
restreamer:
|
restreamer:
|
||||||
|
depends_on:
|
||||||
|
- "haproxy"
|
||||||
image: datarhei/restreamer:2.12.0
|
image: datarhei/restreamer:2.12.0
|
||||||
env_file:
|
env_file:
|
||||||
- "variables.env"
|
- "variables.env"
|
||||||
|
@ -75,6 +75,8 @@ services:
|
||||||
image: tv-discordbot:latest
|
image: tv-discordbot:latest
|
||||||
env_file:
|
env_file:
|
||||||
- "variables.env"
|
- "variables.env"
|
||||||
|
volumes:
|
||||||
|
- "./data/recorder:/recordings"
|
||||||
restart: unless-stopped
|
restart: unless-stopped
|
||||||
networks:
|
networks:
|
||||||
- net
|
- net
|
||||||
|
@ -82,8 +84,8 @@ services:
|
||||||
- meta.role=discordbot
|
- meta.role=discordbot
|
||||||
|
|
||||||
icecast:
|
icecast:
|
||||||
#depends_on:
|
depends_on:
|
||||||
# - "radiorelay"
|
- "api"
|
||||||
build: ./src/icecast
|
build: ./src/icecast
|
||||||
image: tv-icecast:latest
|
image: tv-icecast:latest
|
||||||
env_file:
|
env_file:
|
||||||
|
|
103
src/api/api.py
103
src/api/api.py
|
@ -10,7 +10,6 @@ from flask import Flask, render_template, jsonify, request, abort
|
||||||
from flask.helpers import send_file, send_from_directory
|
from flask.helpers import send_file, send_from_directory
|
||||||
from apscheduler.schedulers.background import BackgroundScheduler
|
from apscheduler.schedulers.background import BackgroundScheduler
|
||||||
from core_client import Client
|
from core_client import Client
|
||||||
from ffmpeg import FFmpeg, Progress
|
|
||||||
|
|
||||||
app = Flask(__name__)
|
app = Flask(__name__)
|
||||||
scheduler = BackgroundScheduler()
|
scheduler = BackgroundScheduler()
|
||||||
|
@ -30,7 +29,6 @@ core_hostname = os.environ.get('CORE_API_HOSTNAME', 'stream.example.com')
|
||||||
core_username = os.environ.get('CORE_API_AUTH_USERNAME', 'admin')
|
core_username = os.environ.get('CORE_API_AUTH_USERNAME', 'admin')
|
||||||
core_password = os.environ.get('CORE_API_AUTH_PASSWORD', 'pass')
|
core_password = os.environ.get('CORE_API_AUTH_PASSWORD', 'pass')
|
||||||
core_sync_period = int(os.environ.get('CORE_SYNC_PERIOD', 15))
|
core_sync_period = int(os.environ.get('CORE_SYNC_PERIOD', 15))
|
||||||
hls_converter_period = 180
|
|
||||||
|
|
||||||
rec_path = "/recordings"
|
rec_path = "/recordings"
|
||||||
enable_delay = 24
|
enable_delay = 24
|
||||||
|
@ -82,9 +80,6 @@ def process_running_channel(database, scheduler, stream_id, stream_name, stream_
|
||||||
logger_job.error(f'Stream {stream_name} cancelled after {req_counter} attempts.')
|
logger_job.error(f'Stream {stream_name} cancelled after {req_counter} attempts.')
|
||||||
return
|
return
|
||||||
scheduler.add_job(func=exec_stream, id=stream_id, args=(stream_id, stream_name, stream_prio, stream_hls_url))
|
scheduler.add_job(func=exec_stream, id=stream_id, args=(stream_id, stream_name, stream_prio, stream_hls_url))
|
||||||
if stream_prio == 2:
|
|
||||||
rec_id = f'rec_{stream_id}'
|
|
||||||
scheduler.add_job(func=exec_recorder, id=rec_id, args=(stream_id, stream_name, stream_hls_url))
|
|
||||||
else:
|
else:
|
||||||
logger_job.warning(f"Stream start hour is set to {stream_start}")
|
logger_job.warning(f"Stream start hour is set to {stream_start}")
|
||||||
scheduler.add_job(
|
scheduler.add_job(
|
||||||
|
@ -171,101 +166,6 @@ def exec_stream(stream_id, stream_name, stream_prio, stream_hls_url):
|
||||||
elif stream_prio < prio:
|
elif stream_prio < prio:
|
||||||
logger_job.warning(f'Source with higher priority ({prio}) is blocking. Skipping playhead update.')
|
logger_job.warning(f'Source with higher priority ({prio}) is blocking. Skipping playhead update.')
|
||||||
|
|
||||||
# Execute recorder
|
|
||||||
def exec_recorder(stream_id, stream_name, stream_hls_url):
|
|
||||||
global rechead
|
|
||||||
current_datetime = datetime.now().strftime("%Y%m%d_%H%M%S-%f")
|
|
||||||
video_file = current_datetime + ".mp4"
|
|
||||||
thumb_file = current_datetime + ".png"
|
|
||||||
if rechead != {}:
|
|
||||||
logger_job.error('Recorder is already started. Refusing to start another job.')
|
|
||||||
else:
|
|
||||||
logger_job.warning(f'Recording {video_file} started.')
|
|
||||||
rechead = { 'id': stream_id,
|
|
||||||
'name': stream_name,
|
|
||||||
'video': video_file,
|
|
||||||
'thumb': thumb_file }
|
|
||||||
video_output = f'{rec_path}/live/{video_file}'
|
|
||||||
thumb_output = f'{rec_path}/live/{thumb_file}'
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Record a mp4 file
|
|
||||||
ffmpeg = (
|
|
||||||
FFmpeg()
|
|
||||||
.option("y")
|
|
||||||
.input(stream_hls_url)
|
|
||||||
.output(video_output,
|
|
||||||
{"codec:v": "copy", "codec:a": "copy", "bsf:a": "aac_adtstoasc"},
|
|
||||||
))
|
|
||||||
@ffmpeg.on("progress")
|
|
||||||
def on_progress(progress: Progress):
|
|
||||||
print(progress)
|
|
||||||
ffmpeg.execute()
|
|
||||||
logger_job.warning(f'Recording of {video_file} finished.')
|
|
||||||
|
|
||||||
except Exception as joberror:
|
|
||||||
logger_job.error(f'Recording of {video_file} failed!')
|
|
||||||
logger_job.error(joberror)
|
|
||||||
|
|
||||||
else:
|
|
||||||
# Show Metadata
|
|
||||||
ffmpeg_metadata = (
|
|
||||||
FFmpeg(executable="ffprobe")
|
|
||||||
.input(video_output,
|
|
||||||
print_format="json",
|
|
||||||
show_streams=None,)
|
|
||||||
)
|
|
||||||
media = json.loads(ffmpeg_metadata.execute())
|
|
||||||
logger_job.warning(f"# Video")
|
|
||||||
logger_job.warning(f"- Codec: {media['streams'][0]['codec_name']}")
|
|
||||||
logger_job.warning(f"- Resolution: {media['streams'][0]['width']} X {media['streams'][0]['height']}")
|
|
||||||
logger_job.warning(f"- Duration: {media['streams'][0]['duration']}")
|
|
||||||
logger_job.warning(f"# Audio")
|
|
||||||
logger_job.warning(f"- Codec: {media['streams'][1]['codec_name']}")
|
|
||||||
logger_job.warning(f"- Sample Rate: {media['streams'][1]['sample_rate']}")
|
|
||||||
logger_job.warning(f"- Duration: {media['streams'][1]['duration']}")
|
|
||||||
|
|
||||||
thumb_skip_time = float(media['streams'][0]['duration']) // 2
|
|
||||||
thumb_width = media['streams'][0]['width']
|
|
||||||
|
|
||||||
# Generate thumbnail image from the recorded mp4 file
|
|
||||||
ffmpeg_thumb = (
|
|
||||||
FFmpeg()
|
|
||||||
.input(video_output, ss=thumb_skip_time)
|
|
||||||
.output(thumb_output, vf='scale={}:{}'.format(thumb_width, -1), vframes=1)
|
|
||||||
)
|
|
||||||
ffmpeg_thumb.execute()
|
|
||||||
logger_job.warning(f'Thumbnail {thumb_file} created.')
|
|
||||||
|
|
||||||
# When ready, move the recorded from the live dir to the archives and reset the rec head
|
|
||||||
os.rename(f'{video_output}', f'{rec_path}/vod/{video_file}')
|
|
||||||
os.rename(f'{thumb_output}', f'{rec_path}/thumb/{thumb_file}')
|
|
||||||
|
|
||||||
finally:
|
|
||||||
# Reset the rechead
|
|
||||||
time.sleep(5)
|
|
||||||
rechead = {}
|
|
||||||
logger_job.warning(f'Rechead reset.')
|
|
||||||
|
|
||||||
# HLS Converter
|
|
||||||
def hls_converter():
|
|
||||||
directory = f'{rec_path}/vod/'
|
|
||||||
try:
|
|
||||||
# Check if the directory exists
|
|
||||||
if not os.path.exists(directory):
|
|
||||||
raise FileNotFoundError(f"The directory '{directory}' does not exist.")
|
|
||||||
|
|
||||||
# Iterate through all entries in the directory
|
|
||||||
for entry in os.listdir(directory):
|
|
||||||
file_path = os.path.join(directory, entry)
|
|
||||||
if entry.lower().endswith('.mp4'):
|
|
||||||
input_file = file_path
|
|
||||||
break
|
|
||||||
#logger_job.warning(f'{input_file} found. Converting to HLS...')
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger_job.error(e)
|
|
||||||
|
|
||||||
# Datarhei CORE API sync
|
# Datarhei CORE API sync
|
||||||
def core_api_sync():
|
def core_api_sync():
|
||||||
global database
|
global database
|
||||||
|
@ -328,9 +228,6 @@ except Exception as err:
|
||||||
scheduler.add_job(func=core_api_sync, trigger='interval', seconds=core_sync_period, id='core_api_sync')
|
scheduler.add_job(func=core_api_sync, trigger='interval', seconds=core_sync_period, id='core_api_sync')
|
||||||
scheduler.get_job('core_api_sync').modify(next_run_time=datetime.now())
|
scheduler.get_job('core_api_sync').modify(next_run_time=datetime.now())
|
||||||
|
|
||||||
# Schedule HLS converter job
|
|
||||||
scheduler.add_job(func=hls_converter, trigger='interval', seconds=hls_converter_period, id='hls_converter')
|
|
||||||
|
|
||||||
# Start the scheduler
|
# Start the scheduler
|
||||||
scheduler.start()
|
scheduler.start()
|
||||||
|
|
||||||
|
|
|
@ -8,4 +8,3 @@ Jinja2==3.1.5
|
||||||
MarkupSafe==3.0.2
|
MarkupSafe==3.0.2
|
||||||
waitress==3.0.2
|
waitress==3.0.2
|
||||||
Werkzeug==3.1.3
|
Werkzeug==3.1.3
|
||||||
python-ffmpeg==2.0.12
|
|
|
@ -7,6 +7,7 @@ import requests
|
||||||
import discord
|
import discord
|
||||||
from discord.ext.commands import Bot, has_permissions, CheckFailure, has_role, MissingRole
|
from discord.ext.commands import Bot, has_permissions, CheckFailure, has_role, MissingRole
|
||||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||||
|
from ffmpeg import FFmpeg, Progress
|
||||||
|
|
||||||
# Read env variables
|
# Read env variables
|
||||||
bot_token = os.environ.get('DISCORDBOT_TOKEN', 'token')
|
bot_token = os.environ.get('DISCORDBOT_TOKEN', 'token')
|
||||||
|
@ -37,6 +38,8 @@ log_level = os.environ.get('SCHEDULER_LOG_LEVEL', 'INFO').upper()
|
||||||
logger_discord.setLevel(log_level)
|
logger_discord.setLevel(log_level)
|
||||||
|
|
||||||
database = {}
|
database = {}
|
||||||
|
|
||||||
|
rec_path = "/recordings"
|
||||||
rechead = {}
|
rechead = {}
|
||||||
|
|
||||||
# Bot functions
|
# Bot functions
|
||||||
|
@ -205,5 +208,100 @@ async def announce_live_channel(stream_name, stream_meta):
|
||||||
live_channel = bot.get_channel(int(live_channel_id))
|
live_channel = bot.get_channel(int(live_channel_id))
|
||||||
await live_channel.send(f'{stream_name} is live! :satellite_orbital: {stream_meta}')
|
await live_channel.send(f'{stream_name} is live! :satellite_orbital: {stream_meta}')
|
||||||
|
|
||||||
|
# Execute recorder
|
||||||
|
async def exec_recorder(stream_id, stream_name, stream_hls_url):
|
||||||
|
global rechead
|
||||||
|
current_datetime = datetime.now().strftime("%Y%m%d_%H%M%S-%f")
|
||||||
|
video_file = current_datetime + ".mp4"
|
||||||
|
thumb_file = current_datetime + ".png"
|
||||||
|
if rechead != {}:
|
||||||
|
logger_job.error('Recorder is already started. Refusing to start another job.')
|
||||||
|
else:
|
||||||
|
logger_job.warning(f'Recording {video_file} started.')
|
||||||
|
rechead = { 'id': stream_id,
|
||||||
|
'name': stream_name,
|
||||||
|
'video': video_file,
|
||||||
|
'thumb': thumb_file }
|
||||||
|
video_output = f'{rec_path}/live/{video_file}'
|
||||||
|
thumb_output = f'{rec_path}/live/{thumb_file}'
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Record a mp4 file
|
||||||
|
ffmpeg = (
|
||||||
|
FFmpeg()
|
||||||
|
.option("y")
|
||||||
|
.input(stream_hls_url)
|
||||||
|
.output(video_output,
|
||||||
|
{"codec:v": "copy", "codec:a": "copy", "bsf:a": "aac_adtstoasc"},
|
||||||
|
))
|
||||||
|
@ffmpeg.on("progress")
|
||||||
|
def on_progress(progress: Progress):
|
||||||
|
print(progress)
|
||||||
|
ffmpeg.execute()
|
||||||
|
logger_job.warning(f'Recording of {video_file} finished.')
|
||||||
|
|
||||||
|
except Exception as joberror:
|
||||||
|
logger_job.error(f'Recording of {video_file} failed!')
|
||||||
|
logger_job.error(joberror)
|
||||||
|
|
||||||
|
else:
|
||||||
|
# Show Metadata
|
||||||
|
ffmpeg_metadata = (
|
||||||
|
FFmpeg(executable="ffprobe")
|
||||||
|
.input(video_output,
|
||||||
|
print_format="json",
|
||||||
|
show_streams=None,)
|
||||||
|
)
|
||||||
|
media = json.loads(ffmpeg_metadata.execute())
|
||||||
|
logger_job.warning(f"# Video")
|
||||||
|
logger_job.warning(f"- Codec: {media['streams'][0]['codec_name']}")
|
||||||
|
logger_job.warning(f"- Resolution: {media['streams'][0]['width']} X {media['streams'][0]['height']}")
|
||||||
|
logger_job.warning(f"- Duration: {media['streams'][0]['duration']}")
|
||||||
|
logger_job.warning(f"# Audio")
|
||||||
|
logger_job.warning(f"- Codec: {media['streams'][1]['codec_name']}")
|
||||||
|
logger_job.warning(f"- Sample Rate: {media['streams'][1]['sample_rate']}")
|
||||||
|
logger_job.warning(f"- Duration: {media['streams'][1]['duration']}")
|
||||||
|
|
||||||
|
thumb_skip_time = float(media['streams'][0]['duration']) // 2
|
||||||
|
thumb_width = media['streams'][0]['width']
|
||||||
|
|
||||||
|
# Generate thumbnail image from the recorded mp4 file
|
||||||
|
ffmpeg_thumb = (
|
||||||
|
FFmpeg()
|
||||||
|
.input(video_output, ss=thumb_skip_time)
|
||||||
|
.output(thumb_output, vf='scale={}:{}'.format(thumb_width, -1), vframes=1)
|
||||||
|
)
|
||||||
|
ffmpeg_thumb.execute()
|
||||||
|
logger_job.warning(f'Thumbnail {thumb_file} created.')
|
||||||
|
|
||||||
|
# When ready, move the recorded from the live dir to the archives and reset the rec head
|
||||||
|
os.rename(f'{video_output}', f'{rec_path}/vod/{video_file}')
|
||||||
|
os.rename(f'{thumb_output}', f'{rec_path}/thumb/{thumb_file}')
|
||||||
|
|
||||||
|
finally:
|
||||||
|
# Reset the rechead
|
||||||
|
time.sleep(5)
|
||||||
|
rechead = {}
|
||||||
|
logger_job.warning(f'Rechead reset.')
|
||||||
|
|
||||||
|
# HLS Converter
|
||||||
|
async def hls_converter():
|
||||||
|
directory = f'{rec_path}/vod/'
|
||||||
|
try:
|
||||||
|
# Check if the directory exists
|
||||||
|
if not os.path.exists(directory):
|
||||||
|
raise FileNotFoundError(f"The directory '{directory}' does not exist.")
|
||||||
|
|
||||||
|
# Iterate through all entries in the directory
|
||||||
|
for entry in os.listdir(directory):
|
||||||
|
file_path = os.path.join(directory, entry)
|
||||||
|
if entry.lower().endswith('.mp4'):
|
||||||
|
input_file = file_path
|
||||||
|
break
|
||||||
|
#logger_job.warning(f'{input_file} found. Converting to HLS...')
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger_job.error(e)
|
||||||
|
|
||||||
# Run the bot with your token
|
# Run the bot with your token
|
||||||
asyncio.run(bot.run(bot_token))
|
asyncio.run(bot.run(bot_token))
|
||||||
|
|
|
@ -2,3 +2,4 @@ APScheduler==3.11.0
|
||||||
requests==2.32.3
|
requests==2.32.3
|
||||||
discord.py==2.4.0
|
discord.py==2.4.0
|
||||||
audioop-lts==0.2.1
|
audioop-lts==0.2.1
|
||||||
|
python-ffmpeg==2.0.12
|
Loading…
Add table
Reference in a new issue