avutil: add thread executor
The executor design pattern was introduced by java <https://docs.oracle.com/en/java/javase/20/docs/api/java.base/java/util/concurrent/Executor.html> it also adapted by python <https://docs.python.org/3/library/concurrent.futures.html> Compared to handcrafted thread pool management, it greatly simplifies the thread code. Signed-off-by: Michael Niedermayer <michael@niedermayer.cc>
This commit is contained in:
		
							parent
							
								
									139e54911c
								
							
						
					
					
						commit
						25ecc94d58
					
				@ -2,6 +2,9 @@ The last version increases of all libraries were on 2023-02-09
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
API changes, most recent first:
 | 
					API changes, most recent first:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					2023-09-02 - xxxxxxxxxx - lavu 58.19.100 - executor.h
 | 
				
			||||||
 | 
					  Add AVExecutor API
 | 
				
			||||||
 | 
					
 | 
				
			||||||
2023-09-xx - xxxxxxxxxx - lavc 60.25.100 - avfft.h
 | 
					2023-09-xx - xxxxxxxxxx - lavc 60.25.100 - avfft.h
 | 
				
			||||||
  The entire header will be deprecated and removed in two major bumps.
 | 
					  The entire header will be deprecated and removed in two major bumps.
 | 
				
			||||||
  For a replacement to av_dct, av_rdft, av_fft and av_mdct, use
 | 
					  For a replacement to av_dct, av_rdft, av_fft and av_mdct, use
 | 
				
			||||||
 | 
				
			|||||||
@ -31,6 +31,7 @@ HEADERS = adler32.h                                                     \
 | 
				
			|||||||
          encryption_info.h                                             \
 | 
					          encryption_info.h                                             \
 | 
				
			||||||
          error.h                                                       \
 | 
					          error.h                                                       \
 | 
				
			||||||
          eval.h                                                        \
 | 
					          eval.h                                                        \
 | 
				
			||||||
 | 
					          executor.h                                                    \
 | 
				
			||||||
          fifo.h                                                        \
 | 
					          fifo.h                                                        \
 | 
				
			||||||
          file.h                                                        \
 | 
					          file.h                                                        \
 | 
				
			||||||
          frame.h                                                       \
 | 
					          frame.h                                                       \
 | 
				
			||||||
@ -127,6 +128,7 @@ OBJS = adler32.o                                                        \
 | 
				
			|||||||
       encryption_info.o                                                \
 | 
					       encryption_info.o                                                \
 | 
				
			||||||
       error.o                                                          \
 | 
					       error.o                                                          \
 | 
				
			||||||
       eval.o                                                           \
 | 
					       eval.o                                                           \
 | 
				
			||||||
 | 
					       executor.o                                                       \
 | 
				
			||||||
       fifo.o                                                           \
 | 
					       fifo.o                                                           \
 | 
				
			||||||
       file.o                                                           \
 | 
					       file.o                                                           \
 | 
				
			||||||
       file_open.o                                                      \
 | 
					       file_open.o                                                      \
 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										201
									
								
								libavutil/executor.c
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										201
									
								
								libavutil/executor.c
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,201 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					 * Copyright (C) 2023 Nuo Mi
 | 
				
			||||||
 | 
					 *
 | 
				
			||||||
 | 
					 * This file is part of FFmpeg.
 | 
				
			||||||
 | 
					 *
 | 
				
			||||||
 | 
					 * FFmpeg is free software; you can redistribute it and/or
 | 
				
			||||||
 | 
					 * modify it under the terms of the GNU Lesser General Public
 | 
				
			||||||
 | 
					 * License as published by the Free Software Foundation; either
 | 
				
			||||||
 | 
					 * version 2.1 of the License, or (at your option) any later version.
 | 
				
			||||||
 | 
					 *
 | 
				
			||||||
 | 
					 * FFmpeg is distributed in the hope that it will be useful,
 | 
				
			||||||
 | 
					 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
				
			||||||
 | 
					 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 | 
				
			||||||
 | 
					 * Lesser General Public License for more details.
 | 
				
			||||||
 | 
					 *
 | 
				
			||||||
 | 
					 * You should have received a copy of the GNU Lesser General Public
 | 
				
			||||||
 | 
					 * License along with FFmpeg; if not, write to the Free Software
 | 
				
			||||||
 | 
					 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 | 
				
			||||||
 | 
					 */
 | 
				
			||||||
 | 
					#include "internal.h"
 | 
				
			||||||
 | 
					#include "mem.h"
 | 
				
			||||||
 | 
					#include "thread.h"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#include "executor.h"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#if !HAVE_THREADS
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#define ExecutorThread  char
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#define executor_thread_create(t, a, s, ar)      0
 | 
				
			||||||
 | 
					#define executor_thread_join(t, r)               do {} while(0)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#else
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#define ExecutorThread  pthread_t
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#define executor_thread_create(t, a, s, ar)      pthread_create(t, a, s, ar)
 | 
				
			||||||
 | 
					#define executor_thread_join(t, r)               pthread_join(t, r)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#endif //!HAVE_THREADS
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					typedef struct ThreadInfo {
 | 
				
			||||||
 | 
					    AVExecutor *e;
 | 
				
			||||||
 | 
					    ExecutorThread thread;
 | 
				
			||||||
 | 
					} ThreadInfo;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					struct AVExecutor {
 | 
				
			||||||
 | 
					    AVTaskCallbacks cb;
 | 
				
			||||||
 | 
					    int thread_count;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    ThreadInfo *threads;
 | 
				
			||||||
 | 
					    uint8_t *local_contexts;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    AVMutex lock;
 | 
				
			||||||
 | 
					    AVCond cond;
 | 
				
			||||||
 | 
					    int die;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    AVTask *tasks;
 | 
				
			||||||
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					static AVTask* remove_task(AVTask **prev, AVTask *t)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					    *prev  = t->next;
 | 
				
			||||||
 | 
					    t->next = NULL;
 | 
				
			||||||
 | 
					    return t;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					static void add_task(AVTask **prev, AVTask *t)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					    t->next = *prev;
 | 
				
			||||||
 | 
					    *prev   = t;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					static int run_one_task(AVExecutor *e, void *lc)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					    AVTaskCallbacks *cb = &e->cb;
 | 
				
			||||||
 | 
					    AVTask **prev;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    for (prev = &e->tasks; *prev && !cb->ready(*prev, cb->user_data); prev = &(*prev)->next)
 | 
				
			||||||
 | 
					        /* nothing */;
 | 
				
			||||||
 | 
					    if (*prev) {
 | 
				
			||||||
 | 
					        AVTask *t = remove_task(prev, *prev);
 | 
				
			||||||
 | 
					        ff_mutex_unlock(&e->lock);
 | 
				
			||||||
 | 
					        cb->run(t, lc, cb->user_data);
 | 
				
			||||||
 | 
					        ff_mutex_lock(&e->lock);
 | 
				
			||||||
 | 
					        return 1;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    return 0;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#if HAVE_THREADS
 | 
				
			||||||
 | 
					static void *executor_worker_task(void *data)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					    ThreadInfo *ti = (ThreadInfo*)data;
 | 
				
			||||||
 | 
					    AVExecutor *e  = ti->e;
 | 
				
			||||||
 | 
					    void *lc       = e->local_contexts + (ti - e->threads) * e->cb.local_context_size;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    ff_mutex_lock(&e->lock);
 | 
				
			||||||
 | 
					    while (1) {
 | 
				
			||||||
 | 
					        if (e->die) break;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if (!run_one_task(e, lc)) {
 | 
				
			||||||
 | 
					            //no task in one loop
 | 
				
			||||||
 | 
					            ff_cond_wait(&e->cond, &e->lock);
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    ff_mutex_unlock(&e->lock);
 | 
				
			||||||
 | 
					    return NULL;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					#endif
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					static void executor_free(AVExecutor *e, const int has_lock, const int has_cond)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					    if (e->thread_count) {
 | 
				
			||||||
 | 
					        //signal die
 | 
				
			||||||
 | 
					        ff_mutex_lock(&e->lock);
 | 
				
			||||||
 | 
					        e->die = 1;
 | 
				
			||||||
 | 
					        ff_cond_broadcast(&e->cond);
 | 
				
			||||||
 | 
					        ff_mutex_unlock(&e->lock);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        for (int i = 0; i < e->thread_count; i++)
 | 
				
			||||||
 | 
					            executor_thread_join(e->threads[i].thread, NULL);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    if (has_cond)
 | 
				
			||||||
 | 
					        ff_cond_destroy(&e->cond);
 | 
				
			||||||
 | 
					    if (has_lock)
 | 
				
			||||||
 | 
					        ff_mutex_destroy(&e->lock);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    av_free(e->threads);
 | 
				
			||||||
 | 
					    av_free(e->local_contexts);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    av_free(e);
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					AVExecutor* av_executor_alloc(const AVTaskCallbacks *cb, int thread_count)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					    AVExecutor *e;
 | 
				
			||||||
 | 
					    int has_lock = 0, has_cond = 0;
 | 
				
			||||||
 | 
					    if (!cb || !cb->user_data || !cb->ready || !cb->run || !cb->priority_higher)
 | 
				
			||||||
 | 
					        return NULL;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    e = av_mallocz(sizeof(*e));
 | 
				
			||||||
 | 
					    if (!e)
 | 
				
			||||||
 | 
					        return NULL;
 | 
				
			||||||
 | 
					    e->cb = *cb;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    e->local_contexts = av_calloc(thread_count, e->cb.local_context_size);
 | 
				
			||||||
 | 
					    if (!e->local_contexts)
 | 
				
			||||||
 | 
					        goto free_executor;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    e->threads = av_calloc(thread_count, sizeof(*e->threads));
 | 
				
			||||||
 | 
					    if (!e->threads)
 | 
				
			||||||
 | 
					        goto free_executor;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    has_lock = !ff_mutex_init(&e->lock, NULL);
 | 
				
			||||||
 | 
					    has_cond = !ff_cond_init(&e->cond, NULL);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    if (!has_lock || !has_cond)
 | 
				
			||||||
 | 
					        goto free_executor;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    for (/* nothing */; e->thread_count < thread_count; e->thread_count++) {
 | 
				
			||||||
 | 
					        ThreadInfo *ti = e->threads + e->thread_count;
 | 
				
			||||||
 | 
					        ti->e = e;
 | 
				
			||||||
 | 
					        if (executor_thread_create(&ti->thread, NULL, executor_worker_task, ti))
 | 
				
			||||||
 | 
					            goto free_executor;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    return e;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					free_executor:
 | 
				
			||||||
 | 
					    executor_free(e, has_lock, has_cond);
 | 
				
			||||||
 | 
					    return NULL;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					void av_executor_free(AVExecutor **executor)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					    if (!executor || !*executor)
 | 
				
			||||||
 | 
					        return;
 | 
				
			||||||
 | 
					    executor_free(*executor, 1, 1);
 | 
				
			||||||
 | 
					    *executor = NULL;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					void av_executor_execute(AVExecutor *e, AVTask *t)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					    AVTaskCallbacks *cb = &e->cb;
 | 
				
			||||||
 | 
					    AVTask **prev;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    ff_mutex_lock(&e->lock);
 | 
				
			||||||
 | 
					    if (t) {
 | 
				
			||||||
 | 
					        for (prev = &e->tasks; *prev && cb->priority_higher(*prev, t); prev = &(*prev)->next)
 | 
				
			||||||
 | 
					            /* nothing */;
 | 
				
			||||||
 | 
					        add_task(prev, t);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    ff_cond_signal(&e->cond);
 | 
				
			||||||
 | 
					    ff_mutex_unlock(&e->lock);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#if !HAVE_THREADS
 | 
				
			||||||
 | 
					    // We are running in a single-threaded environment, so we must handle all tasks ourselves
 | 
				
			||||||
 | 
					    while (run_one_task(e, e->local_contexts))
 | 
				
			||||||
 | 
					        /* nothing */;
 | 
				
			||||||
 | 
					#endif
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
							
								
								
									
										67
									
								
								libavutil/executor.h
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										67
									
								
								libavutil/executor.h
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,67 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					 * Copyright (C) 2023 Nuo Mi
 | 
				
			||||||
 | 
					 *
 | 
				
			||||||
 | 
					 * This file is part of FFmpeg.
 | 
				
			||||||
 | 
					 *
 | 
				
			||||||
 | 
					 * FFmpeg is free software; you can redistribute it and/or
 | 
				
			||||||
 | 
					 * modify it under the terms of the GNU Lesser General Public
 | 
				
			||||||
 | 
					 * License as published by the Free Software Foundation; either
 | 
				
			||||||
 | 
					 * version 2.1 of the License, or (at your option) any later version.
 | 
				
			||||||
 | 
					 *
 | 
				
			||||||
 | 
					 * FFmpeg is distributed in the hope that it will be useful,
 | 
				
			||||||
 | 
					 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
				
			||||||
 | 
					 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 | 
				
			||||||
 | 
					 * Lesser General Public License for more details.
 | 
				
			||||||
 | 
					 *
 | 
				
			||||||
 | 
					 * You should have received a copy of the GNU Lesser General Public
 | 
				
			||||||
 | 
					 * License along with FFmpeg; if not, write to the Free Software
 | 
				
			||||||
 | 
					 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 | 
				
			||||||
 | 
					 */
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#ifndef AVUTIL_EXECUTOR_H
 | 
				
			||||||
 | 
					#define AVUTIL_EXECUTOR_H
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					typedef struct AVExecutor AVExecutor;
 | 
				
			||||||
 | 
					typedef struct AVTask AVTask;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					struct AVTask {
 | 
				
			||||||
 | 
					    AVTask *next;
 | 
				
			||||||
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					typedef struct AVTaskCallbacks {
 | 
				
			||||||
 | 
					    void *user_data;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    int local_context_size;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    // return 1 if a's priority > b's priority
 | 
				
			||||||
 | 
					    int (*priority_higher)(const AVTask *a, const AVTask *b);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    // task is ready for run
 | 
				
			||||||
 | 
					    int (*ready)(const AVTask *t, void *user_data);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    // run the task
 | 
				
			||||||
 | 
					    int (*run)(AVTask *t, void *local_context, void *user_data);
 | 
				
			||||||
 | 
					} AVTaskCallbacks;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					/**
 | 
				
			||||||
 | 
					 * Alloc executor
 | 
				
			||||||
 | 
					 * @param callbacks callback structure for executor
 | 
				
			||||||
 | 
					 * @param thread_count worker thread number
 | 
				
			||||||
 | 
					 * @return return the executor
 | 
				
			||||||
 | 
					 */
 | 
				
			||||||
 | 
					AVExecutor* av_executor_alloc(const AVTaskCallbacks *callbacks, int thread_count);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					/**
 | 
				
			||||||
 | 
					 * Free executor
 | 
				
			||||||
 | 
					 * @param e  pointer to executor
 | 
				
			||||||
 | 
					 */
 | 
				
			||||||
 | 
					void av_executor_free(AVExecutor **e);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					/**
 | 
				
			||||||
 | 
					 * Add task to executor
 | 
				
			||||||
 | 
					 * @param e pointer to executor
 | 
				
			||||||
 | 
					 * @param t pointer to task. If NULL, it will wakeup one work thread
 | 
				
			||||||
 | 
					 */
 | 
				
			||||||
 | 
					void av_executor_execute(AVExecutor *e, AVTask *t);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#endif //AVUTIL_EXECUTOR_H
 | 
				
			||||||
@ -79,7 +79,7 @@
 | 
				
			|||||||
 */
 | 
					 */
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#define LIBAVUTIL_VERSION_MAJOR  58
 | 
					#define LIBAVUTIL_VERSION_MAJOR  58
 | 
				
			||||||
#define LIBAVUTIL_VERSION_MINOR  18
 | 
					#define LIBAVUTIL_VERSION_MINOR  19
 | 
				
			||||||
#define LIBAVUTIL_VERSION_MICRO 100
 | 
					#define LIBAVUTIL_VERSION_MICRO 100
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#define LIBAVUTIL_VERSION_INT   AV_VERSION_INT(LIBAVUTIL_VERSION_MAJOR, \
 | 
					#define LIBAVUTIL_VERSION_INT   AV_VERSION_INT(LIBAVUTIL_VERSION_MAJOR, \
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user