In case someone would raise this question:
是的,ffmpeg等已经可以很好的处理多进程。
是的,MPP是处理AVS的很好工具。
但是,对于轻度滤镜,AVS的性能问题会比较突出。
所以我尝试用这个简单的Producer-Consumer模型解决。
https://github.com/cnbeining/parallel-transcode
代码下面,晚上写的,比较脏,别吐槽。
#!/usr/bin/env python #coding:utf-8 # Author: --<> # Purpose: # Created: 11/05/2014 import sys import unittest import os from multiprocessing import Pool from multiprocessing.dummy import Pool as ThreadPool import getopt import logging import shutil import subprocess #import uuid import threading import Queue import time import traceback logging.basicConfig(level = logging.INFO) global basedir basedir = os.getcwd() global tmpdir #tmpdir = './multi-encode-' + str(uuid.uuid4()) tmpdir = '.' global arguments arguments = '' global segment_cutted segment_cutted = [] global segment_cutted_dir segment_cutted_dir = tmpdir + '/segment_cutted_dir' global segment_cutted_error segment_cutted_error = [] global segment_converted segment_converted = [] global segment_converted_dir segment_converted_dir = tmpdir + '/segment_converted_dir' global segment_converted_error segment_converted_error = [] global INPUT_FILE INPUT_FILE = '' global SEG_TIME SEG_TIME = 60 global IS_ERROR IS_ERROR = 0 global DELETE DELETE = 1 global full_command full_command = '' #---------------------------------------------------------------------- def probe_file(filename): cmnd = ['ffprobe', '-show_format', '-pretty', '-loglevel', 'quiet', filename] p = subprocess.Popen(cmnd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) #print filename out, err = p.communicate() #print out if err: print err return None return out #---------------------------------------------------------------------- def time_to_sec(time_raw): """str->int ignore .*.""" hr = int(time_raw.split(':')[0]) * 3600 minute = int(time_raw.split(':')[1]) * 60 sec = int(float(time_raw.split(':')[2])) return int(hr + minute + sec) #---------------------------------------------------------------------- def sec_to_time(sec_raw): """int->str""" hr = int(sec_raw / 3600) sec_raw = sec_raw - 3600 * hr minute = input(sec_raw / 60) sec = sec_raw - 60 * min return str(str(hr) + ':' + str(minute) + ':' + str(sec)) #---------------------------------------------------------------------- def get_file_time(filename): """str->int""" logging.info('Detecting video info...') try: for line in probe_file(filename).split('\n'): if 'duration' in line: video_duration = str(line.split('=')[1]) return video_duration except: logging.fatal('Cannot read video file!') #shutil.rmtree(tmpdir) exit() #---------------------------------------------------------------------- def get_extname(input_file): """""" return os.path.splitext(input_file) #---------------------------------------------------------------------- def make_segment_list(input_file_length, step): """int,int->list must in sec! All the items in the list are starting time.""" if input_file_length % step == 0: return [i * step for i in range(int(input_file_length / step))] else: return [i * step for i in range(int(input_file_length / step) + 1)] #---------------------------------------------------------------------- def cut_one_segment(start_time, input_file = INPUT_FILE, step_time = SEG_TIME): """str,int,int,int-> None(File) start/stop time in sec, int. Input file should be readble by ffmpeg. In case the audio gives us trouble, the audio stream is disabled. TODO: use the same format as convert_one_segment().""" #start_time = sec_to_time(start_time) #All times can be in sec. #original_extname = get_extname(input_file) #Dont need since we will do a convert into loseless h264 format logging.info('Cutting original file piece, start at {start_time}...'.format(start_time = start_time)) # Test shows that ffmpeg is able to handle the time command = 'ffmpeg -i {input_file} -c:v libx264 -an -preset ultrafast -qp 0 -ss {start_time} -t {step_time} {segment_cutted_dir}/{start_time}.mp4'.format(input_file = input_file, start_time = start_time, step_time = step_time, segment_cutted_dir = segment_cutted_dir) #print(command) if subprocess.call(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) == 0: #If ffmpeg return no error #shutil.move() return start_time else: #ffmpeg exit with error logging.warning('Segment cut failed! Start time at {start_time}'.format(start_time = start_time)) return -1 #if stop_time != 0: #os.system('ffmpeg -i \'' + input_file + '\' -ss ' + start_time + ' -t ' + stop_time + ' ' + tmpdir+' + /part' + str(part_num) + '.'+ original_extname) #else: #os.system('ffmpeg -i \'' + input_file + '\' -ss ' + start_time + ' ' + tmpdir+' + /part' + str(part_num) + '.'+ original_extname) #---------------------------------------------------------------------- def convert_one_segment(start_time, arguments, full_command = ''): """"int,str,str->None INPUT_SEGMENT_FILE, OUPUT_SEGMENT_FILE can be used in the full command line to reduce the difficutly. """ logging.info('Converting original file piece, start at {start_time}...'.format(start_time = start_time)) input_segment_file = '{segment_cutted_dir}/{start_time}.mp4'.format(segment_cutted_dir = segment_cutted_dir, start_time = start_time) ouput_segment_file = '{segment_converted_dir}/{start_time}_h264.mp4'.format(segment_converted_dir = segment_converted_dir, start_time = start_time) if full_command == '': command = 'ffmpeg -i {INPUT_SEGMENT_FILE} {arguments} {OUPUT_SEGMENT_FILE}'.format(INPUT_SEGMENT_FILE = input_segment_file, OUPUT_SEGMENT_FILE = ouput_segment_file, arguments = arguments) else: command = full_command.format(INPUT_SEGMENT_FILE = input_segment_file, OUPUT_SEGMENT_FILE = ouput_segment_file) #os.system('x264 -i {start_time}.mkv {arguments}'.format(arguments = arguments)) if subprocess.call(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) == 0: #If ffmpeg return no error #shutil.move() logging.info('Segment at {start_time} converted'.format(start_time = start_time)) #segment_converted.append(start_time) if DELETE == 2: try: os.remove(input_segment_file) except: logging.warning('Cannot remove temp file!') return start_time else: #ffmpeg exit with error logging.warning('Segment convert failed! Start time at {start_time}'.format(start_time = start_time)) return -1 #---------------------------------------------------------------------- def test_if_convert_success(): """""" return len(segment_converted) == len(segment_list) #---------------------------------------------------------------------- def concat_file(time_list = segment_converted, filename = 'video_to_convert'): """""" os.chdir(basedir) f = open('ff.txt', 'w') ff = '' cwd = segment_converted_dir for i in segment_converted: ff = ff + 'file \'{cwd}/{i}_h264.mp4\'\n'.format(cwd = cwd, i = i) ff = ff.encode("utf8") f.write(ff) f.close() if DELETE > 0: try: shutil.rmtree(segment_cutted_dir) except: logging.warning('Cannot delete temp dir!') logging.info('Concating videos...') command = 'ffmpeg -f concat -i ff.txt -c copy "' + filename + '".mp4' if subprocess.call(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) == 0: #If ffmpeg return no error #shutil.move() logging.info('Concated!') else: #ffmpeg exit with error logging.warning('Concat failed') ######################################################################## class CutVideo(threading.Thread): """Threaded Cut Video""" #---------------------------------------------------------------------- def __init__(self, queue, out_queue): threading.Thread.__init__(self) self.queue = queue self.out_queue = out_queue #---------------------------------------------------------------------- def run(self): while True: #grabs start time from queue start_time = self.queue.get() return_value = cut_one_segment(start_time, input_file=INPUT_FILE, step_time=SEG_TIME) if return_value == -1: global IS_ERROR #Error return code #logging.warning('[*] Segment Slicing Failed: Start time {start_time}'.format(start_time = start_time)) IS_ERROR += 1 else: self.out_queue.put(return_value) pass #signals to queue job is done self.queue.task_done() ######################################################################## class ConvertThread(threading.Thread): """Threaded Cut Video""" #---------------------------------------------------------------------- def __init__(self, out_queue): threading.Thread.__init__(self) self.out_queue = out_queue #---------------------------------------------------------------------- def run(self): while True: #grabs time from queue start_time = self.out_queue.get() return_value = convert_one_segment(start_time, arguments = arguments, full_command = full_command) if return_value == -1: #Error return code logging.warning('[*] Segment Convertion Failed: Start time {start_time}'.format(start_time = start_time)) IS_ERROR += 1 else: segment_converted.append(start_time) pass self.out_queue.task_done() #---------------------------------------------------------------------- def main(INPUT_FILE, slicer_thread = 3, converter_thread = 3): """""" try: os.mkdir(segment_converted_dir) os.mkdir(segment_cutted_dir) except: pass start = time.time() logging.info('Probing the file...') input_file_length = time_to_sec(get_file_time(INPUT_FILE)) start_time_list = make_segment_list(input_file_length, SEG_TIME) logging.info('Start the threading!') global queue global out_queue queue = Queue.Queue(int(slicer_thread)) out_queue = Queue.Queue() main_threading(start_time_list=start_time_list, slicer_thread=slicer_thread, converter_thread=slicer_thread) queue.join() out_queue.join() #print(sorted(segment_converted)) #print(start_time_list) if sorted(segment_converted) != start_time_list: logging.fatal('Queue ERROR!') exit() else: concat_file(time_list=segment_converted, filename = os.path.basename(INPUT_FILE).split('.')[0] + '_converted') logging.info('DONE!') print("Elapsed Time: %s" % (time.time() - start)) if DELETE > 0: try: shutil.rmtree(segment_converted_dir) shutil.rmtree(segment_cutted_dir) os.remove('ff.txt') except: pass #---------------------------------------------------------------------- def main_threading(start_time_list = [], slicer_thread = 3, converter_thread = 3): #spawn a pool of threads, and pass them queue instance for i in range(int(slicer_thread)): t = CutVideo(queue, out_queue) t.setDaemon(True) t.start() for i in range(int(converter_thread)): dt = ConvertThread(out_queue) dt.setDaemon(True) dt.start() #populate queue with data for start_time in start_time_list: queue.put(start_time) #wait on the queue until everything has been processed queue.join() out_queue.join() #---------------------------------------------------------------------- def usage(): """""" print(''' Parallel-Transcode https://github.com/cnbeining/parallel-transcode http://www.cnbeining.com/ Beining@ACICFG Usage: python multi-ffmpeg.py (-h/help) (-i/input-file) (-s/slicer) (-c/converter) (-q/queue-length) (-a/arguments) (-f/full_command) (-d/delete-temp) -h: Default: None Print this usage file. -i: Default: None The input file. Please make sure it can be read by ffmpeg. -s: Default: 3 The thread number of Slicer that cut the file into loseless pieces. -c: Default: 3 The thread number of Converter that convert the file into pieces. -q: Default: 10 Max pieces. TODO -a: Default: None The arguments you want to pass to ffmpeg when encoding. -f: Default: None In case you want something special, just input whatever command you want here! To make sure the command can be read, use {INPUT_SEGMENT_FILE}, {OUPUT_SEGMENT_FILE} in the command line. -d: Default: 1 If set to 0, Parallel-Transcode will not delete any temporary files. 1: Only delete at the end of stage. 2: Delete on the fly. ''') if __name__=='__main__': argv_list = sys.argv[1:] try: opts, args = getopt.getopt(argv_list, "hi:s:c:q:t:a:f:d:", ['help', "input-file=", 'slicer=', 'converter=', 'queue-length=', 'segment-time=', 'arguments=', 'full_command=', 'delete-temp=']) except getopt.GetoptError: usage() exit() for o, a in opts: if o in ('-h', '--help'): usage() exit() if o in ('-i', '--input-file'): INPUT_FILE = a if o in ('-s', '--slicer'): slicer_thread = int(a) if o in ('-c', '--converter'): converter_thread = int(a) if o in ('-q', '--queue-length'): queue_length = int(a) if o in ('-t', '--segment-time'): SEG_TIME = int(a) if o in ('-a', '--arguments'): arguments = a if o in ('-f', '--full_command'): full_command = a if o in ('-d', '--delete-temp'): DELETE = int(a) main(INPUT_FILE, slicer_thread=slicer_thread, converter_thread=converter_thread)