avconv: multithreaded demuxing.
When there are multiple input files, run demuxing for each input file in a separate thread, so reading packets does not block. This is useful for achieving low latency when reading from multiple (possibly slow) input streams.
This commit is contained in:
		
							parent
							
								
									b0f0dfc485
								
							
						
					
					
						commit
						5db5169e46
					
				
							
								
								
									
										157
									
								
								avconv.c
									
									
									
									
									
								
							
							
						
						
									
										157
									
								
								avconv.c
									
									
									
									
									
								
							| @ -69,6 +69,14 @@ | |||||||
| #include <sys/select.h> | #include <sys/select.h> | ||||||
| #endif | #endif | ||||||
| 
 | 
 | ||||||
|  | #if HAVE_THREADS | ||||||
|  | #if HAVE_PTHREADS | ||||||
|  | #include <pthread.h> | ||||||
|  | #else | ||||||
|  | #include "libavcodec/w32pthreads.h" | ||||||
|  | #endif | ||||||
|  | #endif | ||||||
|  | 
 | ||||||
| #include <time.h> | #include <time.h> | ||||||
| 
 | 
 | ||||||
| #include "cmdutils.h" | #include "cmdutils.h" | ||||||
| @ -140,6 +148,11 @@ static float dts_delta_threshold = 10; | |||||||
| 
 | 
 | ||||||
| static int print_stats = 1; | static int print_stats = 1; | ||||||
| 
 | 
 | ||||||
|  | #if HAVE_THREADS | ||||||
|  | /* signal to input threads that they should exit; set by the main thread */ | ||||||
|  | static int transcoding_finished; | ||||||
|  | #endif | ||||||
|  | 
 | ||||||
| #define DEFAULT_PASS_LOGFILENAME_PREFIX "av2pass" | #define DEFAULT_PASS_LOGFILENAME_PREFIX "av2pass" | ||||||
| 
 | 
 | ||||||
| typedef struct InputFilter { | typedef struct InputFilter { | ||||||
| @ -219,6 +232,15 @@ typedef struct InputFile { | |||||||
|     int nb_streams;       /* number of stream that avconv is aware of; may be different
 |     int nb_streams;       /* number of stream that avconv is aware of; may be different
 | ||||||
|                              from ctx.nb_streams if new streams appear during av_read_frame() */ |                              from ctx.nb_streams if new streams appear during av_read_frame() */ | ||||||
|     int rate_emu; |     int rate_emu; | ||||||
|  | 
 | ||||||
|  | #if HAVE_THREADS | ||||||
|  |     pthread_t thread;           /* thread reading from this file */ | ||||||
|  |     int finished;               /* the thread has exited */ | ||||||
|  |     int joined;                 /* the thread has been joined */ | ||||||
|  |     pthread_mutex_t fifo_lock;  /* lock for access to fifo */ | ||||||
|  |     pthread_cond_t  fifo_cond;  /* the main thread will signal on this cond after reading from fifo */ | ||||||
|  |     AVFifoBuffer *fifo;         /* demuxed packets are stored here; freed by the main thread */ | ||||||
|  | #endif | ||||||
| } InputFile; | } InputFile; | ||||||
| 
 | 
 | ||||||
| typedef struct OutputStream { | typedef struct OutputStream { | ||||||
| @ -2765,6 +2787,125 @@ static int select_input_file(uint8_t *no_packet) | |||||||
|     return file_index; |     return file_index; | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | #if HAVE_THREADS | ||||||
|  | static void *input_thread(void *arg) | ||||||
|  | { | ||||||
|  |     InputFile *f = arg; | ||||||
|  |     int ret = 0; | ||||||
|  | 
 | ||||||
|  |     while (!transcoding_finished && ret >= 0) { | ||||||
|  |         AVPacket pkt; | ||||||
|  |         ret = av_read_frame(f->ctx, &pkt); | ||||||
|  | 
 | ||||||
|  |         if (ret == AVERROR(EAGAIN)) { | ||||||
|  |             usleep(10000); | ||||||
|  |             ret = 0; | ||||||
|  |             continue; | ||||||
|  |         } else if (ret < 0) | ||||||
|  |             break; | ||||||
|  | 
 | ||||||
|  |         pthread_mutex_lock(&f->fifo_lock); | ||||||
|  |         while (!av_fifo_space(f->fifo)) | ||||||
|  |             pthread_cond_wait(&f->fifo_cond, &f->fifo_lock); | ||||||
|  | 
 | ||||||
|  |         av_dup_packet(&pkt); | ||||||
|  |         av_fifo_generic_write(f->fifo, &pkt, sizeof(pkt), NULL); | ||||||
|  | 
 | ||||||
|  |         pthread_mutex_unlock(&f->fifo_lock); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     f->finished = 1; | ||||||
|  |     return NULL; | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | static void free_input_threads(void) | ||||||
|  | { | ||||||
|  |     int i; | ||||||
|  | 
 | ||||||
|  |     if (nb_input_files == 1) | ||||||
|  |         return; | ||||||
|  | 
 | ||||||
|  |     transcoding_finished = 1; | ||||||
|  | 
 | ||||||
|  |     for (i = 0; i < nb_input_files; i++) { | ||||||
|  |         InputFile *f = input_files[i]; | ||||||
|  |         AVPacket pkt; | ||||||
|  | 
 | ||||||
|  |         if (f->joined) | ||||||
|  |             continue; | ||||||
|  | 
 | ||||||
|  |         pthread_mutex_lock(&f->fifo_lock); | ||||||
|  |         while (av_fifo_size(f->fifo)) { | ||||||
|  |             av_fifo_generic_read(f->fifo, &pkt, sizeof(pkt), NULL); | ||||||
|  |             av_free_packet(&pkt); | ||||||
|  |         } | ||||||
|  |         pthread_cond_signal(&f->fifo_cond); | ||||||
|  |         pthread_mutex_unlock(&f->fifo_lock); | ||||||
|  | 
 | ||||||
|  |         pthread_join(f->thread, NULL); | ||||||
|  |         f->joined = 1; | ||||||
|  | 
 | ||||||
|  |         while (av_fifo_size(f->fifo)) { | ||||||
|  |             av_fifo_generic_read(f->fifo, &pkt, sizeof(pkt), NULL); | ||||||
|  |             av_free_packet(&pkt); | ||||||
|  |         } | ||||||
|  |         av_fifo_free(f->fifo); | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | static int init_input_threads(void) | ||||||
|  | { | ||||||
|  |     int i, ret; | ||||||
|  | 
 | ||||||
|  |     if (nb_input_files == 1) | ||||||
|  |         return 0; | ||||||
|  | 
 | ||||||
|  |     for (i = 0; i < nb_input_files; i++) { | ||||||
|  |         InputFile *f = input_files[i]; | ||||||
|  | 
 | ||||||
|  |         if (!(f->fifo = av_fifo_alloc(8*sizeof(AVPacket)))) | ||||||
|  |             return AVERROR(ENOMEM); | ||||||
|  | 
 | ||||||
|  |         pthread_mutex_init(&f->fifo_lock, NULL); | ||||||
|  |         pthread_cond_init (&f->fifo_cond, NULL); | ||||||
|  | 
 | ||||||
|  |         if ((ret = pthread_create(&f->thread, NULL, input_thread, f))) | ||||||
|  |             return AVERROR(ret); | ||||||
|  |     } | ||||||
|  |     return 0; | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | static int get_input_packet_mt(InputFile *f, AVPacket *pkt) | ||||||
|  | { | ||||||
|  |     int ret = 0; | ||||||
|  | 
 | ||||||
|  |     pthread_mutex_lock(&f->fifo_lock); | ||||||
|  | 
 | ||||||
|  |     if (av_fifo_size(f->fifo)) { | ||||||
|  |         av_fifo_generic_read(f->fifo, pkt, sizeof(*pkt), NULL); | ||||||
|  |         pthread_cond_signal(&f->fifo_cond); | ||||||
|  |     } else { | ||||||
|  |         if (f->finished) | ||||||
|  |             ret = AVERROR_EOF; | ||||||
|  |         else | ||||||
|  |             ret = AVERROR(EAGAIN); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     pthread_mutex_unlock(&f->fifo_lock); | ||||||
|  | 
 | ||||||
|  |     return ret; | ||||||
|  | } | ||||||
|  | #endif | ||||||
|  | 
 | ||||||
|  | static int get_input_packet(InputFile *f, AVPacket *pkt) | ||||||
|  | { | ||||||
|  | #if HAVE_THREADS | ||||||
|  |     if (nb_input_files > 1) | ||||||
|  |         return get_input_packet_mt(f, pkt); | ||||||
|  | #endif | ||||||
|  |     return av_read_frame(f->ctx, pkt); | ||||||
|  | } | ||||||
|  | 
 | ||||||
| /*
 | /*
 | ||||||
|  * The following code is the main loop of the file converter |  * The following code is the main loop of the file converter | ||||||
|  */ |  */ | ||||||
| @ -2790,6 +2931,11 @@ static int transcode(void) | |||||||
| 
 | 
 | ||||||
|     timer_start = av_gettime(); |     timer_start = av_gettime(); | ||||||
| 
 | 
 | ||||||
|  | #if HAVE_THREADS | ||||||
|  |     if ((ret = init_input_threads()) < 0) | ||||||
|  |         goto fail; | ||||||
|  | #endif | ||||||
|  | 
 | ||||||
|     for (; received_sigterm == 0;) { |     for (; received_sigterm == 0;) { | ||||||
|         int file_index, ist_index; |         int file_index, ist_index; | ||||||
|         AVPacket pkt; |         AVPacket pkt; | ||||||
| @ -2810,12 +2956,13 @@ static int transcode(void) | |||||||
|                 usleep(10000); |                 usleep(10000); | ||||||
|                 continue; |                 continue; | ||||||
|             } |             } | ||||||
|  |             av_log(NULL, AV_LOG_VERBOSE, "No more inputs to read from, finishing.\n"); | ||||||
|             break; |             break; | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         /* read a frame from it and output it in the fifo */ |  | ||||||
|         is  = input_files[file_index]->ctx; |         is  = input_files[file_index]->ctx; | ||||||
|         ret = av_read_frame(is, &pkt); |         ret = get_input_packet(input_files[file_index], &pkt); | ||||||
|  | 
 | ||||||
|         if (ret == AVERROR(EAGAIN)) { |         if (ret == AVERROR(EAGAIN)) { | ||||||
|             no_packet[file_index] = 1; |             no_packet[file_index] = 1; | ||||||
|             no_packet_count++; |             no_packet_count++; | ||||||
| @ -2897,6 +3044,9 @@ static int transcode(void) | |||||||
|         /* dump report by using the output first video and audio streams */ |         /* dump report by using the output first video and audio streams */ | ||||||
|         print_report(0, timer_start); |         print_report(0, timer_start); | ||||||
|     } |     } | ||||||
|  | #if HAVE_THREADS | ||||||
|  |     free_input_threads(); | ||||||
|  | #endif | ||||||
| 
 | 
 | ||||||
|     /* at the end of stream, we must flush the decoder buffers */ |     /* at the end of stream, we must flush the decoder buffers */ | ||||||
|     for (i = 0; i < nb_input_streams; i++) { |     for (i = 0; i < nb_input_streams; i++) { | ||||||
| @ -2941,6 +3091,9 @@ static int transcode(void) | |||||||
| 
 | 
 | ||||||
|  fail: |  fail: | ||||||
|     av_freep(&no_packet); |     av_freep(&no_packet); | ||||||
|  | #if HAVE_THREADS | ||||||
|  |     free_input_threads(); | ||||||
|  | #endif | ||||||
| 
 | 
 | ||||||
|     if (output_streams) { |     if (output_streams) { | ||||||
|         for (i = 0; i < nb_output_streams; i++) { |         for (i = 0; i < nb_output_streams; i++) { | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user