fftools/ffmpeg_sched: make sure to always run task cleanup
Even in cases where sch_start() failed. This ensures all links are properly closed and no tasks are left hanging. Fixes #10916. (cherry picked from commit 24b9f29ff2e0b84ae1345f51cbf7240e079d7a2b) Signed-off-by: Anton Khirnov <anton@khirnov.net>
This commit is contained in:
parent
da903c558b
commit
536443919f
@ -260,6 +260,12 @@ typedef struct SchFilterGraph {
|
|||||||
int task_exited;
|
int task_exited;
|
||||||
} SchFilterGraph;
|
} SchFilterGraph;
|
||||||
|
|
||||||
|
enum SchedulerState {
|
||||||
|
SCH_STATE_UNINIT,
|
||||||
|
SCH_STATE_STARTED,
|
||||||
|
SCH_STATE_STOPPED,
|
||||||
|
};
|
||||||
|
|
||||||
struct Scheduler {
|
struct Scheduler {
|
||||||
const AVClass *class;
|
const AVClass *class;
|
||||||
|
|
||||||
@ -292,7 +298,7 @@ struct Scheduler {
|
|||||||
char *sdp_filename;
|
char *sdp_filename;
|
||||||
int sdp_auto;
|
int sdp_auto;
|
||||||
|
|
||||||
int transcode_started;
|
enum SchedulerState state;
|
||||||
atomic_int terminate;
|
atomic_int terminate;
|
||||||
atomic_int task_failed;
|
atomic_int task_failed;
|
||||||
|
|
||||||
@ -1144,7 +1150,8 @@ int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
|
|||||||
|
|
||||||
// this may be called during initialization - do not start
|
// this may be called during initialization - do not start
|
||||||
// threads before sch_start() is called
|
// threads before sch_start() is called
|
||||||
if (++mux->nb_streams_ready == mux->nb_streams && sch->transcode_started)
|
if (++mux->nb_streams_ready == mux->nb_streams &&
|
||||||
|
sch->state >= SCH_STATE_STARTED)
|
||||||
ret = mux_init(sch, mux);
|
ret = mux_init(sch, mux);
|
||||||
|
|
||||||
pthread_mutex_unlock(&sch->mux_ready_lock);
|
pthread_mutex_unlock(&sch->mux_ready_lock);
|
||||||
@ -1514,7 +1521,8 @@ int sch_start(Scheduler *sch)
|
|||||||
if (ret < 0)
|
if (ret < 0)
|
||||||
return ret;
|
return ret;
|
||||||
|
|
||||||
sch->transcode_started = 1;
|
av_assert0(sch->state == SCH_STATE_UNINIT);
|
||||||
|
sch->state = SCH_STATE_STARTED;
|
||||||
|
|
||||||
for (unsigned i = 0; i < sch->nb_mux; i++) {
|
for (unsigned i = 0; i < sch->nb_mux; i++) {
|
||||||
SchMux *mux = &sch->mux[i];
|
SchMux *mux = &sch->mux[i];
|
||||||
@ -1522,7 +1530,7 @@ int sch_start(Scheduler *sch)
|
|||||||
if (mux->nb_streams_ready == mux->nb_streams) {
|
if (mux->nb_streams_ready == mux->nb_streams) {
|
||||||
ret = mux_init(sch, mux);
|
ret = mux_init(sch, mux);
|
||||||
if (ret < 0)
|
if (ret < 0)
|
||||||
return ret;
|
goto fail;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1531,7 +1539,7 @@ int sch_start(Scheduler *sch)
|
|||||||
|
|
||||||
ret = task_start(&enc->task);
|
ret = task_start(&enc->task);
|
||||||
if (ret < 0)
|
if (ret < 0)
|
||||||
return ret;
|
goto fail;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (unsigned i = 0; i < sch->nb_filters; i++) {
|
for (unsigned i = 0; i < sch->nb_filters; i++) {
|
||||||
@ -1539,7 +1547,7 @@ int sch_start(Scheduler *sch)
|
|||||||
|
|
||||||
ret = task_start(&fg->task);
|
ret = task_start(&fg->task);
|
||||||
if (ret < 0)
|
if (ret < 0)
|
||||||
return ret;
|
goto fail;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (unsigned i = 0; i < sch->nb_dec; i++) {
|
for (unsigned i = 0; i < sch->nb_dec; i++) {
|
||||||
@ -1547,7 +1555,7 @@ int sch_start(Scheduler *sch)
|
|||||||
|
|
||||||
ret = task_start(&dec->task);
|
ret = task_start(&dec->task);
|
||||||
if (ret < 0)
|
if (ret < 0)
|
||||||
return ret;
|
goto fail;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (unsigned i = 0; i < sch->nb_demux; i++) {
|
for (unsigned i = 0; i < sch->nb_demux; i++) {
|
||||||
@ -1558,7 +1566,7 @@ int sch_start(Scheduler *sch)
|
|||||||
|
|
||||||
ret = task_start(&d->task);
|
ret = task_start(&d->task);
|
||||||
if (ret < 0)
|
if (ret < 0)
|
||||||
return ret;
|
goto fail;
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_mutex_lock(&sch->schedule_lock);
|
pthread_mutex_lock(&sch->schedule_lock);
|
||||||
@ -1566,6 +1574,9 @@ int sch_start(Scheduler *sch)
|
|||||||
pthread_mutex_unlock(&sch->schedule_lock);
|
pthread_mutex_unlock(&sch->schedule_lock);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
fail:
|
||||||
|
sch_stop(sch, NULL);
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts)
|
int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts)
|
||||||
@ -2414,6 +2425,18 @@ int sch_filter_command(Scheduler *sch, unsigned fg_idx, AVFrame *frame)
|
|||||||
return send_to_filter(sch, fg, fg->nb_inputs, frame);
|
return send_to_filter(sch, fg, fg->nb_inputs, frame);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int task_cleanup(Scheduler *sch, SchedulerNode node)
|
||||||
|
{
|
||||||
|
switch (node.type) {
|
||||||
|
case SCH_NODE_TYPE_DEMUX: return demux_done (sch, node.idx);
|
||||||
|
case SCH_NODE_TYPE_MUX: return mux_done (sch, node.idx);
|
||||||
|
case SCH_NODE_TYPE_DEC: return dec_done (sch, node.idx);
|
||||||
|
case SCH_NODE_TYPE_ENC: return enc_done (sch, node.idx);
|
||||||
|
case SCH_NODE_TYPE_FILTER_IN: return filter_done(sch, node.idx);
|
||||||
|
default: av_assert0(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void *task_wrapper(void *arg)
|
static void *task_wrapper(void *arg)
|
||||||
{
|
{
|
||||||
SchTask *task = arg;
|
SchTask *task = arg;
|
||||||
@ -2426,15 +2449,7 @@ static void *task_wrapper(void *arg)
|
|||||||
av_log(task->func_arg, AV_LOG_ERROR,
|
av_log(task->func_arg, AV_LOG_ERROR,
|
||||||
"Task finished with error code: %d (%s)\n", ret, av_err2str(ret));
|
"Task finished with error code: %d (%s)\n", ret, av_err2str(ret));
|
||||||
|
|
||||||
switch (task->node.type) {
|
err = task_cleanup(sch, task->node);
|
||||||
case SCH_NODE_TYPE_DEMUX: err = demux_done (sch, task->node.idx); break;
|
|
||||||
case SCH_NODE_TYPE_MUX: err = mux_done (sch, task->node.idx); break;
|
|
||||||
case SCH_NODE_TYPE_DEC: err = dec_done (sch, task->node.idx); break;
|
|
||||||
case SCH_NODE_TYPE_ENC: err = enc_done (sch, task->node.idx); break;
|
|
||||||
case SCH_NODE_TYPE_FILTER_IN: err = filter_done(sch, task->node.idx); break;
|
|
||||||
default: av_assert0(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
ret = err_merge(ret, err);
|
ret = err_merge(ret, err);
|
||||||
|
|
||||||
// EOF is considered normal termination
|
// EOF is considered normal termination
|
||||||
@ -2450,13 +2465,13 @@ static void *task_wrapper(void *arg)
|
|||||||
return (void*)(intptr_t)ret;
|
return (void*)(intptr_t)ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int task_stop(SchTask *task)
|
static int task_stop(Scheduler *sch, SchTask *task)
|
||||||
{
|
{
|
||||||
int ret;
|
int ret;
|
||||||
void *thread_ret;
|
void *thread_ret;
|
||||||
|
|
||||||
if (!task->thread_running)
|
if (!task->thread_running)
|
||||||
return 0;
|
return task_cleanup(sch, task->node);
|
||||||
|
|
||||||
ret = pthread_join(task->thread, &thread_ret);
|
ret = pthread_join(task->thread, &thread_ret);
|
||||||
av_assert0(ret == 0);
|
av_assert0(ret == 0);
|
||||||
@ -2470,6 +2485,9 @@ int sch_stop(Scheduler *sch, int64_t *finish_ts)
|
|||||||
{
|
{
|
||||||
int ret = 0, err;
|
int ret = 0, err;
|
||||||
|
|
||||||
|
if (sch->state != SCH_STATE_STARTED)
|
||||||
|
return 0;
|
||||||
|
|
||||||
atomic_store(&sch->terminate, 1);
|
atomic_store(&sch->terminate, 1);
|
||||||
|
|
||||||
for (unsigned type = 0; type < 2; type++)
|
for (unsigned type = 0; type < 2; type++)
|
||||||
@ -2481,40 +2499,42 @@ int sch_stop(Scheduler *sch, int64_t *finish_ts)
|
|||||||
for (unsigned i = 0; i < sch->nb_demux; i++) {
|
for (unsigned i = 0; i < sch->nb_demux; i++) {
|
||||||
SchDemux *d = &sch->demux[i];
|
SchDemux *d = &sch->demux[i];
|
||||||
|
|
||||||
err = task_stop(&d->task);
|
err = task_stop(sch, &d->task);
|
||||||
ret = err_merge(ret, err);
|
ret = err_merge(ret, err);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (unsigned i = 0; i < sch->nb_dec; i++) {
|
for (unsigned i = 0; i < sch->nb_dec; i++) {
|
||||||
SchDec *dec = &sch->dec[i];
|
SchDec *dec = &sch->dec[i];
|
||||||
|
|
||||||
err = task_stop(&dec->task);
|
err = task_stop(sch, &dec->task);
|
||||||
ret = err_merge(ret, err);
|
ret = err_merge(ret, err);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (unsigned i = 0; i < sch->nb_filters; i++) {
|
for (unsigned i = 0; i < sch->nb_filters; i++) {
|
||||||
SchFilterGraph *fg = &sch->filters[i];
|
SchFilterGraph *fg = &sch->filters[i];
|
||||||
|
|
||||||
err = task_stop(&fg->task);
|
err = task_stop(sch, &fg->task);
|
||||||
ret = err_merge(ret, err);
|
ret = err_merge(ret, err);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (unsigned i = 0; i < sch->nb_enc; i++) {
|
for (unsigned i = 0; i < sch->nb_enc; i++) {
|
||||||
SchEnc *enc = &sch->enc[i];
|
SchEnc *enc = &sch->enc[i];
|
||||||
|
|
||||||
err = task_stop(&enc->task);
|
err = task_stop(sch, &enc->task);
|
||||||
ret = err_merge(ret, err);
|
ret = err_merge(ret, err);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (unsigned i = 0; i < sch->nb_mux; i++) {
|
for (unsigned i = 0; i < sch->nb_mux; i++) {
|
||||||
SchMux *mux = &sch->mux[i];
|
SchMux *mux = &sch->mux[i];
|
||||||
|
|
||||||
err = task_stop(&mux->task);
|
err = task_stop(sch, &mux->task);
|
||||||
ret = err_merge(ret, err);
|
ret = err_merge(ret, err);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (finish_ts)
|
if (finish_ts)
|
||||||
*finish_ts = trailing_dts(sch, 1);
|
*finish_ts = trailing_dts(sch, 1);
|
||||||
|
|
||||||
|
sch->state = SCH_STATE_STOPPED;
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user