From c947b4b4112356076b65c94508ec6a57dd03932d Mon Sep 17 00:00:00 2001 From: deflax Date: Thu, 6 Feb 2025 13:06:32 +0000 Subject: [PATCH] Move the recorder away from the api --- docker-compose.yml | 10 ++-- src/api/api.py | 103 -------------------------------- src/api/requirements.txt | 3 +- src/discordbot/discordbot.py | 98 ++++++++++++++++++++++++++++++ src/discordbot/requirements.txt | 1 + 5 files changed, 106 insertions(+), 109 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 9d43344..772ffd9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,5 +1,3 @@ -version: '3' - networks: net: external: false @@ -38,6 +36,8 @@ services: restart: "no" restreamer: + depends_on: + - "haproxy" image: datarhei/restreamer:2.12.0 env_file: - "variables.env" @@ -75,6 +75,8 @@ services: image: tv-discordbot:latest env_file: - "variables.env" + volumes: + - "./data/recorder:/recordings" restart: unless-stopped networks: - net @@ -82,8 +84,8 @@ services: - meta.role=discordbot icecast: - #depends_on: - # - "radiorelay" + depends_on: + - "api" build: ./src/icecast image: tv-icecast:latest env_file: diff --git a/src/api/api.py b/src/api/api.py index c1037b0..ec1d09c 100644 --- a/src/api/api.py +++ b/src/api/api.py @@ -10,7 +10,6 @@ from flask import Flask, render_template, jsonify, request, abort from flask.helpers import send_file, send_from_directory from apscheduler.schedulers.background import BackgroundScheduler from core_client import Client -from ffmpeg import FFmpeg, Progress app = Flask(__name__) scheduler = BackgroundScheduler() @@ -30,7 +29,6 @@ core_hostname = os.environ.get('CORE_API_HOSTNAME', 'stream.example.com') core_username = os.environ.get('CORE_API_AUTH_USERNAME', 'admin') core_password = os.environ.get('CORE_API_AUTH_PASSWORD', 'pass') core_sync_period = int(os.environ.get('CORE_SYNC_PERIOD', 15)) -hls_converter_period = 180 rec_path = "/recordings" enable_delay = 24 @@ -82,9 +80,6 @@ def process_running_channel(database, scheduler, stream_id, stream_name, stream_ logger_job.error(f'Stream {stream_name} cancelled after {req_counter} attempts.') return scheduler.add_job(func=exec_stream, id=stream_id, args=(stream_id, stream_name, stream_prio, stream_hls_url)) - if stream_prio == 2: - rec_id = f'rec_{stream_id}' - scheduler.add_job(func=exec_recorder, id=rec_id, args=(stream_id, stream_name, stream_hls_url)) else: logger_job.warning(f"Stream start hour is set to {stream_start}") scheduler.add_job( @@ -171,101 +166,6 @@ def exec_stream(stream_id, stream_name, stream_prio, stream_hls_url): elif stream_prio < prio: logger_job.warning(f'Source with higher priority ({prio}) is blocking. Skipping playhead update.') -# Execute recorder -def exec_recorder(stream_id, stream_name, stream_hls_url): - global rechead - current_datetime = datetime.now().strftime("%Y%m%d_%H%M%S-%f") - video_file = current_datetime + ".mp4" - thumb_file = current_datetime + ".png" - if rechead != {}: - logger_job.error('Recorder is already started. Refusing to start another job.') - else: - logger_job.warning(f'Recording {video_file} started.') - rechead = { 'id': stream_id, - 'name': stream_name, - 'video': video_file, - 'thumb': thumb_file } - video_output = f'{rec_path}/live/{video_file}' - thumb_output = f'{rec_path}/live/{thumb_file}' - - try: - # Record a mp4 file - ffmpeg = ( - FFmpeg() - .option("y") - .input(stream_hls_url) - .output(video_output, - {"codec:v": "copy", "codec:a": "copy", "bsf:a": "aac_adtstoasc"}, - )) - @ffmpeg.on("progress") - def on_progress(progress: Progress): - print(progress) - ffmpeg.execute() - logger_job.warning(f'Recording of {video_file} finished.') - - except Exception as joberror: - logger_job.error(f'Recording of {video_file} failed!') - logger_job.error(joberror) - - else: - # Show Metadata - ffmpeg_metadata = ( - FFmpeg(executable="ffprobe") - .input(video_output, - print_format="json", - show_streams=None,) - ) - media = json.loads(ffmpeg_metadata.execute()) - logger_job.warning(f"# Video") - logger_job.warning(f"- Codec: {media['streams'][0]['codec_name']}") - logger_job.warning(f"- Resolution: {media['streams'][0]['width']} X {media['streams'][0]['height']}") - logger_job.warning(f"- Duration: {media['streams'][0]['duration']}") - logger_job.warning(f"# Audio") - logger_job.warning(f"- Codec: {media['streams'][1]['codec_name']}") - logger_job.warning(f"- Sample Rate: {media['streams'][1]['sample_rate']}") - logger_job.warning(f"- Duration: {media['streams'][1]['duration']}") - - thumb_skip_time = float(media['streams'][0]['duration']) // 2 - thumb_width = media['streams'][0]['width'] - - # Generate thumbnail image from the recorded mp4 file - ffmpeg_thumb = ( - FFmpeg() - .input(video_output, ss=thumb_skip_time) - .output(thumb_output, vf='scale={}:{}'.format(thumb_width, -1), vframes=1) - ) - ffmpeg_thumb.execute() - logger_job.warning(f'Thumbnail {thumb_file} created.') - - # When ready, move the recorded from the live dir to the archives and reset the rec head - os.rename(f'{video_output}', f'{rec_path}/vod/{video_file}') - os.rename(f'{thumb_output}', f'{rec_path}/thumb/{thumb_file}') - - finally: - # Reset the rechead - time.sleep(5) - rechead = {} - logger_job.warning(f'Rechead reset.') - -# HLS Converter -def hls_converter(): - directory = f'{rec_path}/vod/' - try: - # Check if the directory exists - if not os.path.exists(directory): - raise FileNotFoundError(f"The directory '{directory}' does not exist.") - - # Iterate through all entries in the directory - for entry in os.listdir(directory): - file_path = os.path.join(directory, entry) - if entry.lower().endswith('.mp4'): - input_file = file_path - break - #logger_job.warning(f'{input_file} found. Converting to HLS...') - - except Exception as e: - logger_job.error(e) - # Datarhei CORE API sync def core_api_sync(): global database @@ -328,9 +228,6 @@ except Exception as err: scheduler.add_job(func=core_api_sync, trigger='interval', seconds=core_sync_period, id='core_api_sync') scheduler.get_job('core_api_sync').modify(next_run_time=datetime.now()) -# Schedule HLS converter job -scheduler.add_job(func=hls_converter, trigger='interval', seconds=hls_converter_period, id='hls_converter') - # Start the scheduler scheduler.start() diff --git a/src/api/requirements.txt b/src/api/requirements.txt index 600b810..3de29ef 100644 --- a/src/api/requirements.txt +++ b/src/api/requirements.txt @@ -7,5 +7,4 @@ itsdangerous==2.2.0 Jinja2==3.1.5 MarkupSafe==3.0.2 waitress==3.0.2 -Werkzeug==3.1.3 -python-ffmpeg==2.0.12 \ No newline at end of file +Werkzeug==3.1.3 \ No newline at end of file diff --git a/src/discordbot/discordbot.py b/src/discordbot/discordbot.py index 13ddf7a..b029013 100644 --- a/src/discordbot/discordbot.py +++ b/src/discordbot/discordbot.py @@ -7,6 +7,7 @@ import requests import discord from discord.ext.commands import Bot, has_permissions, CheckFailure, has_role, MissingRole from apscheduler.schedulers.asyncio import AsyncIOScheduler +from ffmpeg import FFmpeg, Progress # Read env variables bot_token = os.environ.get('DISCORDBOT_TOKEN', 'token') @@ -37,6 +38,8 @@ log_level = os.environ.get('SCHEDULER_LOG_LEVEL', 'INFO').upper() logger_discord.setLevel(log_level) database = {} + +rec_path = "/recordings" rechead = {} # Bot functions @@ -205,5 +208,100 @@ async def announce_live_channel(stream_name, stream_meta): live_channel = bot.get_channel(int(live_channel_id)) await live_channel.send(f'{stream_name} is live! :satellite_orbital: {stream_meta}') +# Execute recorder +async def exec_recorder(stream_id, stream_name, stream_hls_url): + global rechead + current_datetime = datetime.now().strftime("%Y%m%d_%H%M%S-%f") + video_file = current_datetime + ".mp4" + thumb_file = current_datetime + ".png" + if rechead != {}: + logger_job.error('Recorder is already started. Refusing to start another job.') + else: + logger_job.warning(f'Recording {video_file} started.') + rechead = { 'id': stream_id, + 'name': stream_name, + 'video': video_file, + 'thumb': thumb_file } + video_output = f'{rec_path}/live/{video_file}' + thumb_output = f'{rec_path}/live/{thumb_file}' + + try: + # Record a mp4 file + ffmpeg = ( + FFmpeg() + .option("y") + .input(stream_hls_url) + .output(video_output, + {"codec:v": "copy", "codec:a": "copy", "bsf:a": "aac_adtstoasc"}, + )) + @ffmpeg.on("progress") + def on_progress(progress: Progress): + print(progress) + ffmpeg.execute() + logger_job.warning(f'Recording of {video_file} finished.') + + except Exception as joberror: + logger_job.error(f'Recording of {video_file} failed!') + logger_job.error(joberror) + + else: + # Show Metadata + ffmpeg_metadata = ( + FFmpeg(executable="ffprobe") + .input(video_output, + print_format="json", + show_streams=None,) + ) + media = json.loads(ffmpeg_metadata.execute()) + logger_job.warning(f"# Video") + logger_job.warning(f"- Codec: {media['streams'][0]['codec_name']}") + logger_job.warning(f"- Resolution: {media['streams'][0]['width']} X {media['streams'][0]['height']}") + logger_job.warning(f"- Duration: {media['streams'][0]['duration']}") + logger_job.warning(f"# Audio") + logger_job.warning(f"- Codec: {media['streams'][1]['codec_name']}") + logger_job.warning(f"- Sample Rate: {media['streams'][1]['sample_rate']}") + logger_job.warning(f"- Duration: {media['streams'][1]['duration']}") + + thumb_skip_time = float(media['streams'][0]['duration']) // 2 + thumb_width = media['streams'][0]['width'] + + # Generate thumbnail image from the recorded mp4 file + ffmpeg_thumb = ( + FFmpeg() + .input(video_output, ss=thumb_skip_time) + .output(thumb_output, vf='scale={}:{}'.format(thumb_width, -1), vframes=1) + ) + ffmpeg_thumb.execute() + logger_job.warning(f'Thumbnail {thumb_file} created.') + + # When ready, move the recorded from the live dir to the archives and reset the rec head + os.rename(f'{video_output}', f'{rec_path}/vod/{video_file}') + os.rename(f'{thumb_output}', f'{rec_path}/thumb/{thumb_file}') + + finally: + # Reset the rechead + time.sleep(5) + rechead = {} + logger_job.warning(f'Rechead reset.') + +# HLS Converter +async def hls_converter(): + directory = f'{rec_path}/vod/' + try: + # Check if the directory exists + if not os.path.exists(directory): + raise FileNotFoundError(f"The directory '{directory}' does not exist.") + + # Iterate through all entries in the directory + for entry in os.listdir(directory): + file_path = os.path.join(directory, entry) + if entry.lower().endswith('.mp4'): + input_file = file_path + break + #logger_job.warning(f'{input_file} found. Converting to HLS...') + + except Exception as e: + logger_job.error(e) + # Run the bot with your token asyncio.run(bot.run(bot_token)) diff --git a/src/discordbot/requirements.txt b/src/discordbot/requirements.txt index 351e64d..78fd84d 100644 --- a/src/discordbot/requirements.txt +++ b/src/discordbot/requirements.txt @@ -2,3 +2,4 @@ APScheduler==3.11.0 requests==2.32.3 discord.py==2.4.0 audioop-lts==0.2.1 +python-ffmpeg==2.0.12 \ No newline at end of file