diff --git a/Changelog b/Changelog index f9cd44f184..2e034ff15a 100644 --- a/Changelog +++ b/Changelog @@ -4,6 +4,7 @@ releases are sorted from youngest to oldest. version : - Radiance HDR image support - ddagrab (Desktop Duplication) video capture filter +- ffmpeg -shortest_buf_duration option version 5.1: diff --git a/doc/ffmpeg.texi b/doc/ffmpeg.texi index 767df69b7f..b97496d315 100644 --- a/doc/ffmpeg.texi +++ b/doc/ffmpeg.texi @@ -1765,6 +1765,22 @@ Default value is 0. Enable bitexact mode for (de)muxer and (de/en)coder @item -shortest (@emph{output}) Finish encoding when the shortest output stream ends. + +Note that this option may require buffering frames, which introduces extra +latency. The maximum amount of this latency may be controlled with the +@code{-shortest_buf_duration} option. + +@item -shortest_buf_duration @var{duration} (@emph{output}) +The @code{-shortest} option may require buffering potentially large amounts +of data when at least one of the streams is "sparse" (i.e. has large gaps +between frames – this is typically the case for subtitles). + +This option controls the maximum duration of buffered frames in seconds. +Larger values may allow the @code{-shortest} option to produce more accurate +results, but increase memory use and latency. + +The default value is 10 seconds. + @item -dts_delta_threshold Timestamp discontinuity delta threshold. @item -dts_error_threshold @var{seconds} diff --git a/fftools/Makefile b/fftools/Makefile index 81ad6c4f4f..bc57ebe748 100644 --- a/fftools/Makefile +++ b/fftools/Makefile @@ -14,6 +14,8 @@ OBJS-ffmpeg += \ fftools/ffmpeg_hw.o \ fftools/ffmpeg_mux.o \ fftools/ffmpeg_opt.o \ + fftools/objpool.o \ + fftools/sync_queue.o \ define DOFFTOOL OBJS-$(1) += fftools/cmdutils.o fftools/opt_common.o fftools/$(1).o $(OBJS-$(1)-yes) diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c index 50e17b1890..9b6bb3d759 100644 --- a/fftools/ffmpeg.c +++ b/fftools/ffmpeg.c @@ -104,6 +104,7 @@ #include "ffmpeg.h" #include "cmdutils.h" +#include "sync_queue.h" #include "libavutil/avassert.h" @@ -569,6 +570,7 @@ static void ffmpeg_cleanup(int ret) av_bsf_free(&ost->bsf_ctx); av_frame_free(&ost->filtered_frame); + av_frame_free(&ost->sq_frame); av_frame_free(&ost->last_frame); av_packet_free(&ost->pkt); av_dict_free(&ost->encoder_opts); @@ -691,13 +693,10 @@ static void update_benchmark(const char *fmt, ...) static void close_output_stream(OutputStream *ost) { OutputFile *of = output_files[ost->file_index]; - AVRational time_base = ost->stream_copy ? ost->mux_timebase : ost->enc_ctx->time_base; - ost->finished |= ENCODER_FINISHED; - if (of->shortest) { - int64_t end = av_rescale_q(ost->sync_opts - ost->first_pts, time_base, AV_TIME_BASE_Q); - of->recording_time = FFMIN(of->recording_time, end); - } + + if (ost->sq_idx_encode >= 0) + sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL)); } /* @@ -726,10 +725,15 @@ static void output_packet(OutputFile *of, AVPacket *pkt, goto finish; while ((ret = av_bsf_receive_packet(ost->bsf_ctx, pkt)) >= 0) of_submit_packet(of, pkt, ost); + if (ret == AVERROR_EOF) + of_submit_packet(of, NULL, ost); if (ret == AVERROR(EAGAIN)) ret = 0; - } else if (!eof) - of_submit_packet(of, pkt, ost); + } else + of_submit_packet(of, eof ? NULL : pkt, ost); + + if (eof) + ost->finished |= MUXER_FINISHED; finish: if (ret < 0 && ret != AVERROR_EOF) { @@ -899,6 +903,7 @@ static int encode_frame(OutputFile *of, OutputStream *ost, AVFrame *frame) if (frame) { ost->frames_encoded++; + ost->samples_encoded += frame->nb_samples; if (debug_ts) { av_log(NULL, AV_LOG_INFO, "encoder <- type:%s " @@ -971,6 +976,52 @@ static int encode_frame(OutputFile *of, OutputStream *ost, AVFrame *frame) av_assert0(0); } +static int submit_encode_frame(OutputFile *of, OutputStream *ost, + AVFrame *frame) +{ + int ret; + + if (ost->sq_idx_encode < 0) + return encode_frame(of, ost, frame); + + if (frame) { + ret = av_frame_ref(ost->sq_frame, frame); + if (ret < 0) + return ret; + frame = ost->sq_frame; + } + + ret = sq_send(of->sq_encode, ost->sq_idx_encode, + SQFRAME(frame)); + if (ret < 0) { + if (frame) + av_frame_unref(frame); + if (ret != AVERROR_EOF) + return ret; + } + + while (1) { + AVFrame *enc_frame = ost->sq_frame; + + ret = sq_receive(of->sq_encode, ost->sq_idx_encode, + SQFRAME(enc_frame)); + if (ret == AVERROR_EOF) { + enc_frame = NULL; + } else if (ret < 0) { + return (ret == AVERROR(EAGAIN)) ? 0 : ret; + } + + ret = encode_frame(of, ost, enc_frame); + if (enc_frame) + av_frame_unref(enc_frame); + if (ret < 0) { + if (ret == AVERROR_EOF) + close_output_stream(ost); + return ret; + } + } +} + static void do_audio_out(OutputFile *of, OutputStream *ost, AVFrame *frame) { @@ -984,10 +1035,9 @@ static void do_audio_out(OutputFile *of, OutputStream *ost, if (frame->pts == AV_NOPTS_VALUE || audio_sync_method < 0) frame->pts = ost->sync_opts; ost->sync_opts = frame->pts + frame->nb_samples; - ost->samples_encoded += frame->nb_samples; - ret = encode_frame(of, ost, frame); - if (ret < 0) + ret = submit_encode_frame(of, ost, frame); + if (ret < 0 && ret != AVERROR_EOF) exit_program(1); } @@ -1151,15 +1201,18 @@ static void do_video_out(OutputFile *of, if (delta0 > 1.1) nb0_frames = llrintf(delta0 - 0.6); } + next_picture->pkt_duration = 1; break; case VSYNC_VFR: if (delta <= -0.6) nb_frames = 0; else if (delta > 0.6) ost->sync_opts = llrint(sync_ipts); + next_picture->pkt_duration = duration; break; case VSYNC_DROP: case VSYNC_PASSTHROUGH: + next_picture->pkt_duration = duration; ost->sync_opts = llrint(sync_ipts); break; default: @@ -1273,8 +1326,8 @@ static void do_video_out(OutputFile *of, av_log(NULL, AV_LOG_DEBUG, "Forced keyframe at time %f\n", pts_time); } - ret = encode_frame(of, ost, in_picture); - if (ret < 0) + ret = submit_encode_frame(of, ost, in_picture); + if (ret < 0 && ret != AVERROR_EOF) exit_program(1); ost->sync_opts++; @@ -1286,19 +1339,6 @@ static void do_video_out(OutputFile *of, av_frame_move_ref(ost->last_frame, next_picture); } -static void finish_output_stream(OutputStream *ost) -{ - OutputFile *of = output_files[ost->file_index]; - AVRational time_base = ost->stream_copy ? ost->mux_timebase : ost->enc_ctx->time_base; - - ost->finished = ENCODER_FINISHED | MUXER_FINISHED; - - if (of->shortest) { - int64_t end = av_rescale_q(ost->sync_opts - ost->first_pts, time_base, AV_TIME_BASE_Q); - of->recording_time = FFMIN(of->recording_time, end); - } -} - /** * Get and encode new output from any of the filtergraphs, without causing * activity. @@ -1766,7 +1806,7 @@ static void flush_encoders(void) exit_program(1); } - finish_output_stream(ost); + output_packet(of, ost->pkt, ost, 1); } init_output_stream_wrapper(ost, NULL, 1); @@ -1775,7 +1815,7 @@ static void flush_encoders(void) if (enc->codec_type != AVMEDIA_TYPE_VIDEO && enc->codec_type != AVMEDIA_TYPE_AUDIO) continue; - ret = encode_frame(of, ost, NULL); + ret = submit_encode_frame(of, ost, NULL); if (ret != AVERROR_EOF) exit_program(1); } @@ -3086,6 +3126,9 @@ static int init_output_stream_encode(OutputStream *ost, AVFrame *frame) break; } + if (ost->sq_idx_encode >= 0) + sq_set_tb(of->sq_encode, ost->sq_idx_encode, enc_ctx->time_base); + ost->mux_timebase = enc_ctx->time_base; return 0; @@ -3094,6 +3137,7 @@ static int init_output_stream_encode(OutputStream *ost, AVFrame *frame) static int init_output_stream(OutputStream *ost, AVFrame *frame, char *error, int error_len) { + OutputFile *of = output_files[ost->file_index]; int ret = 0; if (ost->encoding_needed) { @@ -3226,6 +3270,9 @@ static int init_output_stream(OutputStream *ost, AVFrame *frame, if (ret < 0) return ret; + if (ost->sq_idx_mux >= 0) + sq_set_tb(of->sq_mux, ost->sq_idx_mux, ost->mux_timebase); + ost->initialized = 1; ret = of_check_init(output_files[ost->file_index]); @@ -3930,8 +3977,10 @@ static int process_input(int file_index) OutputStream *ost = output_streams[j]; if (ost->source_index == ifile->ist_index + i && - (ost->stream_copy || ost->enc->type == AVMEDIA_TYPE_SUBTITLE)) - finish_output_stream(ost); + (ost->stream_copy || ost->enc->type == AVMEDIA_TYPE_SUBTITLE)) { + OutputFile *of = output_files[ost->file_index]; + output_packet(of, ost->pkt, ost, 1); + } } } diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h index 090bf67d2d..58e093b2cb 100644 --- a/fftools/ffmpeg.h +++ b/fftools/ffmpeg.h @@ -26,6 +26,7 @@ #include #include "cmdutils.h" +#include "sync_queue.h" #include "libavformat/avformat.h" #include "libavformat/avio.h" @@ -151,6 +152,7 @@ typedef struct OptionsContext { int64_t limit_filesize; float mux_preload; float mux_max_delay; + float shortest_buf_duration; int shortest; int bitexact; @@ -484,6 +486,7 @@ typedef struct OutputStream { int64_t max_frames; AVFrame *filtered_frame; AVFrame *last_frame; + AVFrame *sq_frame; AVPacket *pkt; int64_t last_dropped; int64_t last_nb0_frames[3]; @@ -575,6 +578,9 @@ typedef struct OutputStream { /* frame encode sum of squared error values */ int64_t error[4]; + + int sq_idx_encode; + int sq_idx_mux; } OutputStream; typedef struct Muxer Muxer; @@ -585,6 +591,9 @@ typedef struct OutputFile { Muxer *mux; const AVOutputFormat *format; + SyncQueue *sq_encode; + SyncQueue *sq_mux; + AVFormatContext *ctx; int ost_index; /* index of the first stream in output_streams */ int64_t recording_time; ///< desired length of the resulting file in microseconds == AV_TIME_BASE units diff --git a/fftools/ffmpeg_mux.c b/fftools/ffmpeg_mux.c index a3350a73e9..453ccac912 100644 --- a/fftools/ffmpeg_mux.c +++ b/fftools/ffmpeg_mux.c @@ -20,6 +20,7 @@ #include #include "ffmpeg.h" +#include "sync_queue.h" #include "libavutil/fifo.h" #include "libavutil/intreadwrite.h" @@ -56,6 +57,8 @@ struct Muxer { int64_t limit_filesize; int64_t final_filesize; int header_written; + + AVPacket *sq_pkt; }; static int want_sdp = 1; @@ -72,13 +75,14 @@ static void close_all_output_streams(OutputStream *ost, OSTFinished this_stream, static int queue_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) { MuxStream *ms = &of->mux->streams[ost->index]; - AVPacket *tmp_pkt; + AVPacket *tmp_pkt = NULL; int ret; if (!av_fifo_can_write(ms->muxing_queue)) { size_t cur_size = av_fifo_can_read(ms->muxing_queue); + size_t pkt_size = pkt ? pkt->size : 0; unsigned int are_we_over_size = - (ms->muxing_queue_data_size + pkt->size) > ost->muxing_queue_data_threshold; + (ms->muxing_queue_data_size + pkt_size) > ost->muxing_queue_data_threshold; size_t limit = are_we_over_size ? ost->max_muxing_queue_size : SIZE_MAX; size_t new_size = FFMIN(2 * cur_size, limit); @@ -93,6 +97,7 @@ static int queue_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) return ret; } + if (pkt) { ret = av_packet_make_refcounted(pkt); if (ret < 0) return ret; @@ -103,6 +108,7 @@ static int queue_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) av_packet_move_ref(tmp_pkt, pkt); ms->muxing_queue_data_size += tmp_pkt->size; + } av_fifo_write(ms->muxing_queue, &tmp_pkt, 1); return 0; @@ -192,11 +198,44 @@ static void write_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) } } +static void submit_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) +{ + if (ost->sq_idx_mux >= 0) { + int ret = sq_send(of->sq_mux, ost->sq_idx_mux, SQPKT(pkt)); + if (ret < 0) { + if (pkt) + av_packet_unref(pkt); + if (ret == AVERROR_EOF) { + ost->finished |= MUXER_FINISHED; + return; + } else + exit_program(1); + } + + while (1) { + ret = sq_receive(of->sq_mux, -1, SQPKT(of->mux->sq_pkt)); + if (ret == AVERROR_EOF || ret == AVERROR(EAGAIN)) + return; + else if (ret < 0) + exit_program(1); + + write_packet(of, output_streams[of->ost_index + ret], + of->mux->sq_pkt); + } + } else { + if (pkt) + write_packet(of, ost, pkt); + else + ost->finished |= MUXER_FINISHED; + } +} + void of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost) { AVStream *st = ost->st; int ret; + if (pkt) { /* * Audio encoders may split the packets -- #frames in != #packets out. * But there is no reordering, so we can limit the number of output packets @@ -211,9 +250,10 @@ void of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost) } ost->frame_number++; } + } if (of->mux->header_written) { - write_packet(of, ost, pkt); + submit_packet(of, ost, pkt); } else { /* the muxer is not initialized yet, buffer the packet */ ret = queue_packet(of, ost, pkt); @@ -321,9 +361,11 @@ int of_check_init(OutputFile *of) ost->mux_timebase = ost->st->time_base; while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0) { - ms->muxing_queue_data_size -= pkt->size; - write_packet(of, ost, pkt); - av_packet_free(&pkt); + submit_packet(of, ost, pkt); + if (pkt) { + ms->muxing_queue_data_size -= pkt->size; + av_packet_free(&pkt); + } } } @@ -383,6 +425,8 @@ static void mux_free(Muxer **pmux, int nb_streams) av_freep(&mux->streams); av_dict_free(&mux->opts); + av_packet_free(&mux->sq_pkt); + av_freep(pmux); } @@ -394,6 +438,9 @@ void of_close(OutputFile **pof) if (!of) return; + sq_free(&of->sq_encode); + sq_free(&of->sq_mux); + s = of->ctx; mux_free(&of->mux, s ? s->nb_streams : 0); @@ -437,6 +484,14 @@ int of_muxer_init(OutputFile *of, AVDictionary *opts, int64_t limit_filesize) if (strcmp(of->format->name, "rtp")) want_sdp = 0; + if (of->sq_mux) { + mux->sq_pkt = av_packet_alloc(); + if (!mux->sq_pkt) { + ret = AVERROR(ENOMEM); + goto fail; + } + } + /* write the header for files with no streams */ if (of->format->flags & AVFMT_NOSTREAMS && of->ctx->nb_streams == 0) { ret = of_check_init(of); diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c index db8ec33cde..4281644cfc 100644 --- a/fftools/ffmpeg_opt.c +++ b/fftools/ffmpeg_opt.c @@ -31,6 +31,7 @@ #include "fopen_utf8.h" #include "cmdutils.h" #include "opt_common.h" +#include "sync_queue.h" #include "libavformat/avformat.h" @@ -236,6 +237,7 @@ static void init_options(OptionsContext *o) o->accurate_seek = 1; o->thread_queue_size = -1; o->input_sync_ref = -1; + o->shortest_buf_duration = 10.f; } static int show_hwaccels(void *optctx, const char *opt, const char *arg) @@ -2385,6 +2387,78 @@ static int init_complex_filters(void) return 0; } +static int setup_sync_queues(OutputFile *of, AVFormatContext *oc, int64_t buf_size_us) +{ + int nb_av_enc = 0, nb_interleaved = 0; + +#define IS_AV_ENC(ost, type) \ + (ost->encoding_needed && (type == AVMEDIA_TYPE_VIDEO || type == AVMEDIA_TYPE_AUDIO)) +#define IS_INTERLEAVED(type) (type != AVMEDIA_TYPE_ATTACHMENT) + + for (int i = 0; i < oc->nb_streams; i++) { + OutputStream *ost = output_streams[of->ost_index + i]; + enum AVMediaType type = ost->st->codecpar->codec_type; + + ost->sq_idx_encode = -1; + ost->sq_idx_mux = -1; + + nb_interleaved += IS_INTERLEAVED(type); + nb_av_enc += IS_AV_ENC(ost, type); + } + + if (!(nb_interleaved > 1 && of->shortest)) + return 0; + + /* if we have more than one encoded audio/video streams, then we + * synchronize them before encoding */ + if (nb_av_enc > 1) { + of->sq_encode = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us); + if (!of->sq_encode) + return AVERROR(ENOMEM); + + for (int i = 0; i < oc->nb_streams; i++) { + OutputStream *ost = output_streams[of->ost_index + i]; + enum AVMediaType type = ost->st->codecpar->codec_type; + + if (!IS_AV_ENC(ost, type)) + continue; + + ost->sq_idx_encode = sq_add_stream(of->sq_encode); + if (ost->sq_idx_encode < 0) + return ost->sq_idx_encode; + + ost->sq_frame = av_frame_alloc(); + if (!ost->sq_frame) + return AVERROR(ENOMEM); + } + } + + /* if there are any additional interleaved streams, then ALL the streams + * are also synchronized before sending them to the muxer */ + if (nb_interleaved > nb_av_enc) { + of->sq_mux = sq_alloc(SYNC_QUEUE_PACKETS, buf_size_us); + if (!of->sq_mux) + return AVERROR(ENOMEM); + + for (int i = 0; i < oc->nb_streams; i++) { + OutputStream *ost = output_streams[of->ost_index + i]; + enum AVMediaType type = ost->st->codecpar->codec_type; + + if (!IS_INTERLEAVED(type)) + continue; + + ost->sq_idx_mux = sq_add_stream(of->sq_mux); + if (ost->sq_idx_mux < 0) + return ost->sq_idx_mux; + } + } + +#undef IS_AV_ENC +#undef IS_INTERLEAVED + + return 0; +} + static int open_output_file(OptionsContext *o, const char *filename) { AVFormatContext *oc; @@ -3022,6 +3096,12 @@ loop_end: exit_program(1); } + err = setup_sync_queues(of, oc, o->shortest_buf_duration * AV_TIME_BASE); + if (err < 0) { + av_log(NULL, AV_LOG_FATAL, "Error setting up output sync queues\n"); + exit_program(1); + } + err = of_muxer_init(of, format_opts, o->limit_filesize); if (err < 0) { av_log(NULL, AV_LOG_FATAL, "Error initializing internal muxing state\n"); @@ -3739,6 +3819,8 @@ const OptionDef options[] = { { "shortest", OPT_BOOL | OPT_EXPERT | OPT_OFFSET | OPT_OUTPUT, { .off = OFFSET(shortest) }, "finish encoding within shortest input" }, + { "shortest_buf_duration", HAS_ARG | OPT_FLOAT | OPT_EXPERT | OPT_OFFSET | OPT_OUTPUT, { .off = OFFSET(shortest_buf_duration) }, + "maximum buffering duration (in seconds) for the -shortest option" }, { "bitexact", OPT_BOOL | OPT_EXPERT | OPT_OFFSET | OPT_OUTPUT | OPT_INPUT, { .off = OFFSET(bitexact) }, "bitexact mode" }, diff --git a/fftools/sync_queue.c b/fftools/sync_queue.c new file mode 100644 index 0000000000..ab654ca790 --- /dev/null +++ b/fftools/sync_queue.c @@ -0,0 +1,425 @@ +/* + * This file is part of FFmpeg. + * + * FFmpeg is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * FFmpeg is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with FFmpeg; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include +#include + +#include "libavutil/avassert.h" +#include "libavutil/error.h" +#include "libavutil/fifo.h" +#include "libavutil/mathematics.h" +#include "libavutil/mem.h" + +#include "objpool.h" +#include "sync_queue.h" + +typedef struct SyncQueueStream { + AVFifo *fifo; + AVRational tb; + + /* stream head: largest timestamp seen */ + int64_t head_ts; + /* no more frames will be sent for this stream */ + int finished; +} SyncQueueStream; + +struct SyncQueue { + enum SyncQueueType type; + + /* no more frames will be sent for any stream */ + int finished; + /* sync head: the stream with the _smallest_ head timestamp + * this stream determines which frames can be output */ + int head_stream; + /* the finished stream with the smallest finish timestamp or -1 */ + int head_finished_stream; + + // maximum buffering duration in microseconds + int64_t buf_size_us; + + SyncQueueStream *streams; + unsigned int nb_streams; + + // pool of preallocated frames to avoid constant allocations + ObjPool *pool; +}; + +static void frame_move(const SyncQueue *sq, SyncQueueFrame dst, + SyncQueueFrame src) +{ + if (sq->type == SYNC_QUEUE_PACKETS) + av_packet_move_ref(dst.p, src.p); + else + av_frame_move_ref(dst.f, src.f); +} + +static int64_t frame_ts(const SyncQueue *sq, SyncQueueFrame frame) +{ + return (sq->type == SYNC_QUEUE_PACKETS) ? + frame.p->pts + frame.p->duration : + frame.f->pts + frame.f->pkt_duration; +} + +static int frame_null(const SyncQueue *sq, SyncQueueFrame frame) +{ + return (sq->type == SYNC_QUEUE_PACKETS) ? (frame.p == NULL) : (frame.f == NULL); +} + +static void finish_stream(SyncQueue *sq, unsigned int stream_idx) +{ + SyncQueueStream *st = &sq->streams[stream_idx]; + + st->finished = 1; + + if (st->head_ts != AV_NOPTS_VALUE) { + /* check if this stream is the new finished head */ + if (sq->head_finished_stream < 0 || + av_compare_ts(st->head_ts, st->tb, + sq->streams[sq->head_finished_stream].head_ts, + sq->streams[sq->head_finished_stream].tb) < 0) { + sq->head_finished_stream = stream_idx; + } + + /* mark as finished all streams that should no longer receive new frames, + * due to them being ahead of some finished stream */ + st = &sq->streams[sq->head_finished_stream]; + for (unsigned int i = 0; i < sq->nb_streams; i++) { + SyncQueueStream *st1 = &sq->streams[i]; + if (st != st1 && st1->head_ts != AV_NOPTS_VALUE && + av_compare_ts(st->head_ts, st->tb, st1->head_ts, st1->tb) <= 0) + st1->finished = 1; + } + } + + /* mark the whole queue as finished if all streams are finished */ + for (unsigned int i = 0; i < sq->nb_streams; i++) { + if (!sq->streams[i].finished) + return; + } + sq->finished = 1; +} + +static void queue_head_update(SyncQueue *sq) +{ + if (sq->head_stream < 0) { + /* wait for one timestamp in each stream before determining + * the queue head */ + for (unsigned int i = 0; i < sq->nb_streams; i++) { + SyncQueueStream *st = &sq->streams[i]; + if (st->head_ts == AV_NOPTS_VALUE) + return; + } + + // placeholder value, correct one will be found below + sq->head_stream = 0; + } + + for (unsigned int i = 0; i < sq->nb_streams; i++) { + SyncQueueStream *st_head = &sq->streams[sq->head_stream]; + SyncQueueStream *st_other = &sq->streams[i]; + if (st_other->head_ts != AV_NOPTS_VALUE && + av_compare_ts(st_other->head_ts, st_other->tb, + st_head->head_ts, st_head->tb) < 0) + sq->head_stream = i; + } +} + +/* update this stream's head timestamp */ +static void stream_update_ts(SyncQueue *sq, unsigned int stream_idx, int64_t ts) +{ + SyncQueueStream *st = &sq->streams[stream_idx]; + + if (ts == AV_NOPTS_VALUE || + (st->head_ts != AV_NOPTS_VALUE && st->head_ts >= ts)) + return; + + st->head_ts = ts; + + /* if this stream is now ahead of some finished stream, then + * this stream is also finished */ + if (sq->head_finished_stream >= 0 && + av_compare_ts(sq->streams[sq->head_finished_stream].head_ts, + sq->streams[sq->head_finished_stream].tb, + ts, st->tb) <= 0) + finish_stream(sq, stream_idx); + + /* update the overall head timestamp if it could have changed */ + if (sq->head_stream < 0 || sq->head_stream == stream_idx) + queue_head_update(sq); +} + +/* If the queue for the given stream (or all streams when stream_idx=-1) + * is overflowing, trigger a fake heartbeat on lagging streams. + * + * @return 1 if heartbeat triggered, 0 otherwise + */ +static int overflow_heartbeat(SyncQueue *sq, int stream_idx) +{ + SyncQueueStream *st; + SyncQueueFrame frame; + int64_t tail_ts = AV_NOPTS_VALUE; + + /* if no stream specified, pick the one that is most ahead */ + if (stream_idx < 0) { + int64_t ts = AV_NOPTS_VALUE; + + for (int i = 0; i < sq->nb_streams; i++) { + st = &sq->streams[i]; + if (st->head_ts != AV_NOPTS_VALUE && + (ts == AV_NOPTS_VALUE || + av_compare_ts(ts, sq->streams[stream_idx].tb, + st->head_ts, st->tb) < 0)) { + ts = st->head_ts; + stream_idx = i; + } + } + /* no stream has a timestamp yet -> nothing to do */ + if (stream_idx < 0) + return 0; + } + + st = &sq->streams[stream_idx]; + + /* get the chosen stream's tail timestamp */ + for (size_t i = 0; tail_ts == AV_NOPTS_VALUE && + av_fifo_peek(st->fifo, &frame, 1, i) >= 0; i++) + tail_ts = frame_ts(sq, frame); + + /* overflow triggers when the tail is over specified duration behind the head */ + if (tail_ts == AV_NOPTS_VALUE || tail_ts >= st->head_ts || + av_rescale_q(st->head_ts - tail_ts, st->tb, AV_TIME_BASE_Q) < sq->buf_size_us) + return 0; + + /* signal a fake timestamp for all streams that prevent tail_ts from being output */ + tail_ts++; + for (unsigned int i = 0; i < sq->nb_streams; i++) { + SyncQueueStream *st1 = &sq->streams[i]; + int64_t ts; + + if (st == st1 || st1->finished || + (st1->head_ts != AV_NOPTS_VALUE && + av_compare_ts(tail_ts, st->tb, st1->head_ts, st1->tb) <= 0)) + continue; + + ts = av_rescale_q(tail_ts, st->tb, st1->tb); + if (st1->head_ts != AV_NOPTS_VALUE) + ts = FFMAX(st1->head_ts + 1, ts); + + stream_update_ts(sq, i, ts); + } + + return 1; +} + +int sq_send(SyncQueue *sq, unsigned int stream_idx, SyncQueueFrame frame) +{ + SyncQueueStream *st; + SyncQueueFrame dst; + int64_t ts; + int ret; + + av_assert0(stream_idx < sq->nb_streams); + st = &sq->streams[stream_idx]; + + av_assert0(st->tb.num > 0 && st->tb.den > 0); + + if (frame_null(sq, frame)) { + finish_stream(sq, stream_idx); + return 0; + } + if (st->finished) + return AVERROR_EOF; + + ret = objpool_get(sq->pool, (void**)&dst); + if (ret < 0) + return ret; + + frame_move(sq, dst, frame); + + ts = frame_ts(sq, dst); + + ret = av_fifo_write(st->fifo, &dst, 1); + if (ret < 0) { + frame_move(sq, frame, dst); + objpool_release(sq->pool, (void**)&dst); + return ret; + } + + stream_update_ts(sq, stream_idx, ts); + + return 0; +} + +static int receive_for_stream(SyncQueue *sq, unsigned int stream_idx, + SyncQueueFrame frame) +{ + SyncQueueStream *st_head = sq->head_stream >= 0 ? + &sq->streams[sq->head_stream] : NULL; + SyncQueueStream *st; + + av_assert0(stream_idx < sq->nb_streams); + st = &sq->streams[stream_idx]; + + if (av_fifo_can_read(st->fifo)) { + SyncQueueFrame peek; + int64_t ts; + int cmp = 1; + + av_fifo_peek(st->fifo, &peek, 1, 0); + ts = frame_ts(sq, peek); + + /* check if this stream's tail timestamp does not overtake + * the overall queue head */ + if (ts != AV_NOPTS_VALUE && st_head) + cmp = av_compare_ts(ts, st->tb, st_head->head_ts, st_head->tb); + + /* We can release frames that do not end after the queue head. + * Frames with no timestamps are just passed through with no conditions. + */ + if (cmp <= 0 || ts == AV_NOPTS_VALUE) { + frame_move(sq, frame, peek); + objpool_release(sq->pool, (void**)&peek); + av_fifo_drain2(st->fifo, 1); + return 0; + } + } + + return (sq->finished || (st->finished && !av_fifo_can_read(st->fifo))) ? + AVERROR_EOF : AVERROR(EAGAIN); +} + +static int receive_internal(SyncQueue *sq, int stream_idx, SyncQueueFrame frame) +{ + int nb_eof = 0; + int ret; + + /* read a frame for a specific stream */ + if (stream_idx >= 0) { + ret = receive_for_stream(sq, stream_idx, frame); + return (ret < 0) ? ret : stream_idx; + } + + /* read a frame for any stream with available output */ + for (unsigned int i = 0; i < sq->nb_streams; i++) { + ret = receive_for_stream(sq, i, frame); + if (ret == AVERROR_EOF || ret == AVERROR(EAGAIN)) { + nb_eof += (ret == AVERROR_EOF); + continue; + } + return (ret < 0) ? ret : i; + } + + return (nb_eof == sq->nb_streams) ? AVERROR_EOF : AVERROR(EAGAIN); +} + +int sq_receive(SyncQueue *sq, int stream_idx, SyncQueueFrame frame) +{ + int ret = receive_internal(sq, stream_idx, frame); + + /* try again if the queue overflowed and triggered a fake heartbeat + * for lagging streams */ + if (ret == AVERROR(EAGAIN) && overflow_heartbeat(sq, stream_idx)) + ret = receive_internal(sq, stream_idx, frame); + + return ret; +} + +int sq_add_stream(SyncQueue *sq) +{ + SyncQueueStream *tmp, *st; + + tmp = av_realloc_array(sq->streams, sq->nb_streams + 1, sizeof(*sq->streams)); + if (!tmp) + return AVERROR(ENOMEM); + sq->streams = tmp; + + st = &sq->streams[sq->nb_streams]; + memset(st, 0, sizeof(*st)); + + st->fifo = av_fifo_alloc2(1, sizeof(SyncQueueFrame), AV_FIFO_FLAG_AUTO_GROW); + if (!st->fifo) + return AVERROR(ENOMEM); + + /* we set a valid default, so that a pathological stream that never + * receives even a real timebase (and no frames) won't stall all other + * streams forever; cf. overflow_heartbeat() */ + st->tb = (AVRational){ 1, 1 }; + st->head_ts = AV_NOPTS_VALUE; + + return sq->nb_streams++; +} + +void sq_set_tb(SyncQueue *sq, unsigned int stream_idx, AVRational tb) +{ + SyncQueueStream *st; + + av_assert0(stream_idx < sq->nb_streams); + st = &sq->streams[stream_idx]; + + av_assert0(!av_fifo_can_read(st->fifo)); + + if (st->head_ts != AV_NOPTS_VALUE) + st->head_ts = av_rescale_q(st->head_ts, st->tb, tb); + + st->tb = tb; +} + +SyncQueue *sq_alloc(enum SyncQueueType type, int64_t buf_size_us) +{ + SyncQueue *sq = av_mallocz(sizeof(*sq)); + + if (!sq) + return NULL; + + sq->type = type; + sq->buf_size_us = buf_size_us; + + sq->head_stream = -1; + sq->head_finished_stream = -1; + + sq->pool = (type == SYNC_QUEUE_PACKETS) ? objpool_alloc_packets() : + objpool_alloc_frames(); + if (!sq->pool) { + av_freep(&sq); + return NULL; + } + + return sq; +} + +void sq_free(SyncQueue **psq) +{ + SyncQueue *sq = *psq; + + if (!sq) + return; + + for (unsigned int i = 0; i < sq->nb_streams; i++) { + SyncQueueFrame frame; + while (av_fifo_read(sq->streams[i].fifo, &frame, 1) >= 0) + objpool_release(sq->pool, (void**)&frame); + + av_fifo_freep2(&sq->streams[i].fifo); + } + + av_freep(&sq->streams); + + objpool_free(&sq->pool); + + av_freep(psq); +} diff --git a/fftools/sync_queue.h b/fftools/sync_queue.h new file mode 100644 index 0000000000..e08780b7bf --- /dev/null +++ b/fftools/sync_queue.h @@ -0,0 +1,100 @@ +/* + * This file is part of FFmpeg. + * + * FFmpeg is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * FFmpeg is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with FFmpeg; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef FFTOOLS_SYNC_QUEUE_H +#define FFTOOLS_SYNC_QUEUE_H + +#include + +#include "libavcodec/packet.h" + +#include "libavutil/frame.h" + +enum SyncQueueType { + SYNC_QUEUE_PACKETS, + SYNC_QUEUE_FRAMES, +}; + +typedef union SyncQueueFrame { + AVFrame *f; + AVPacket *p; +} SyncQueueFrame; + +#define SQFRAME(frame) ((SyncQueueFrame){ .f = (frame) }) +#define SQPKT(pkt) ((SyncQueueFrame){ .p = (pkt) }) + +typedef struct SyncQueue SyncQueue; + +/** + * Allocate a sync queue of the given type. + * + * @param buf_size_us maximum duration that will be buffered in microseconds + */ +SyncQueue *sq_alloc(enum SyncQueueType type, int64_t buf_size_us); +void sq_free(SyncQueue **sq); + +/** + * Add a new stream to the sync queue. + * + * @return + * - a non-negative stream index on success + * - a negative error code on error + */ +int sq_add_stream(SyncQueue *sq); + +/** + * Set the timebase for the stream with index stream_idx. Should be called + * before sending any frames for this stream. + */ +void sq_set_tb(SyncQueue *sq, unsigned int stream_idx, AVRational tb); + +/** + * Submit a frame for the stream with index stream_idx. + * + * On success, the sync queue takes ownership of the frame and will reset the + * contents of the supplied frame. On failure, the frame remains owned by the + * caller. + * + * Sending a frame with NULL contents marks the stream as finished. + * + * @return + * - 0 on success + * - AVERROR_EOF when no more frames should be submitted for this stream + * - another a negative error code on failure + */ +int sq_send(SyncQueue *sq, unsigned int stream_idx, SyncQueueFrame frame); + +/** + * Read a frame from the queue. + * + * @param stream_idx index of the stream to read a frame for. May be -1, then + * try to read a frame from any stream that is ready for + * output. + * @param frame output frame will be written here on success. The frame is owned + * by the caller. + * + * @return + * - a non-negative index of the stream to which the returned frame belongs + * - AVERROR(EAGAIN) when more frames need to be submitted to the queue + * - AVERROR_EOF when no more frames will be available for this stream (for any + * stream if stream_idx is -1) + * - another negative error code on failure + */ +int sq_receive(SyncQueue *sq, int stream_idx, SyncQueueFrame frame); + +#endif // FFTOOLS_SYNC_QUEUE_H diff --git a/tests/fate/ffmpeg.mak b/tests/fate/ffmpeg.mak index 154af2fac8..38a1ae7ed5 100644 --- a/tests/fate/ffmpeg.mak +++ b/tests/fate/ffmpeg.mak @@ -105,7 +105,7 @@ FATE_SAMPLES_FFMPEG-$(call ALLYES, COLOR_FILTER, VOBSUB_DEMUXER, MATROSKA_DEMUXE fate-shortest-sub: CMD = enc_dec \ vobsub $(TARGET_SAMPLES)/sub/vobsub.idx matroska \ "-filter_complex 'color=s=1x1:rate=1:duration=400' -pix_fmt rgb24 -allow_raw_vfw 1 -c:s copy -c:v rawvideo" \ - framecrc "-map 0 -c copy -shortest" + framecrc "-map 0 -c copy -shortest -shortest_buf_duration 40" # Basic test for fix_sub_duration, which calculates duration based on the # following subtitle's pts. diff --git a/tests/ref/fate/copy-shortest1 b/tests/ref/fate/copy-shortest1 index 5038973e4e..87bee4c41f 100644 --- a/tests/ref/fate/copy-shortest1 +++ b/tests/ref/fate/copy-shortest1 @@ -120,4 +120,3 @@ 0, 98304, 98304, 2048, 11182, e35a2ab846029effdbca0e43639717f2 1, 85760, 85760, 1536, 418, cf52ea7fc69e4c5bc8f75b354dfe60af 0, 100352, 100352, 2048, 1423, f480272c7d0b97834bc8ea36cceca61d -1, 87296, 87296, 1536, 418, 78ab22657a1b6c8a0e5b8612ceb8081d diff --git a/tests/ref/fate/copy-shortest2 b/tests/ref/fate/copy-shortest2 index 5038973e4e..87bee4c41f 100644 --- a/tests/ref/fate/copy-shortest2 +++ b/tests/ref/fate/copy-shortest2 @@ -120,4 +120,3 @@ 0, 98304, 98304, 2048, 11182, e35a2ab846029effdbca0e43639717f2 1, 85760, 85760, 1536, 418, cf52ea7fc69e4c5bc8f75b354dfe60af 0, 100352, 100352, 2048, 1423, f480272c7d0b97834bc8ea36cceca61d -1, 87296, 87296, 1536, 418, 78ab22657a1b6c8a0e5b8612ceb8081d diff --git a/tests/ref/fate/shortest-sub b/tests/ref/fate/shortest-sub index be0922fd56..0da4ba2e95 100644 --- a/tests/ref/fate/shortest-sub +++ b/tests/ref/fate/shortest-sub @@ -1,4 +1,4 @@ 145b9b48d56f9c966bf41657f7569954 *tests/data/fate/shortest-sub.matroska 139232 tests/data/fate/shortest-sub.matroska -d71f5d359ef788ea689415bc1e4a90df *tests/data/fate/shortest-sub.out.framecrc -stddev:11541.12 PSNR: 15.08 MAXDIFF:22854 bytes: 2591/ 26055 +876ac3fa52e467050ab843969d4cf343 *tests/data/fate/shortest-sub.out.framecrc +stddev:11541.12 PSNR: 15.08 MAXDIFF:22854 bytes: 2591/ 23735