Parallel-Transcode:一个单机并行转码的高适应性测试方案

In case someone would raise this question:
是的,ffmpeg等已经可以很好的处理多进程。
是的,MPP是处理AVS的很好工具。
但是,对于轻度滤镜,AVS的性能问题会比较突出。
所以我尝试用这个简单的Producer-Consumer模型解决。
https://github.com/cnbeining/parallel-transcode
代码下面,晚上写的,比较脏,别吐槽。

#!/usr/bin/env python
#coding:utf-8
# Author:   --<>
# Purpose:
# Created: 11/05/2014
import sys
import unittest
import os
from multiprocessing import Pool
from multiprocessing.dummy import Pool as ThreadPool
import getopt
import logging
import shutil
import subprocess
#import uuid
import threading
import Queue
import time
import traceback
logging.basicConfig(level = logging.INFO)
global basedir
basedir = os.getcwd()
global tmpdir
#tmpdir = './multi-encode-' + str(uuid.uuid4())
tmpdir = '.'
global arguments
arguments = ''
global segment_cutted
segment_cutted = []
global segment_cutted_dir
segment_cutted_dir = tmpdir + '/segment_cutted_dir'
global segment_cutted_error
segment_cutted_error = []
global segment_converted
segment_converted = []
global segment_converted_dir
segment_converted_dir = tmpdir + '/segment_converted_dir'
global segment_converted_error
segment_converted_error = []
global INPUT_FILE
INPUT_FILE = ''
global SEG_TIME
SEG_TIME = 60
global IS_ERROR
IS_ERROR = 0
global DELETE
DELETE = 1
global full_command
full_command = ''
#----------------------------------------------------------------------
def probe_file(filename):
    cmnd = ['ffprobe', '-show_format', '-pretty', '-loglevel', 'quiet', filename]
    p = subprocess.Popen(cmnd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    #print filename
    out, err =  p.communicate()
    #print out
    if err:
        print err
        return None
    return out
#----------------------------------------------------------------------
def time_to_sec(time_raw):
    """str->int
    ignore .*."""
    hr = int(time_raw.split(':')[0]) * 3600
    minute = int(time_raw.split(':')[1]) * 60
    sec = int(float(time_raw.split(':')[2]))
    return int(hr + minute + sec)
#----------------------------------------------------------------------
def sec_to_time(sec_raw):
    """int->str"""
    hr = int(sec_raw / 3600)
    sec_raw = sec_raw - 3600 * hr
    minute = input(sec_raw / 60)
    sec = sec_raw - 60 * min
    return str(str(hr) + ':' + str(minute) + ':' + str(sec))
#----------------------------------------------------------------------
def get_file_time(filename):
    """str->int"""
    logging.info('Detecting video info...')
    try:
        for line in probe_file(filename).split('\n'):
            if 'duration' in line:
                video_duration = str(line.split('=')[1])
                return video_duration
    except:
        logging.fatal('Cannot read video file!')
        #shutil.rmtree(tmpdir)
        exit()
#----------------------------------------------------------------------
def get_extname(input_file):
    """"""
    return os.path.splitext(input_file)
#----------------------------------------------------------------------
def make_segment_list(input_file_length, step):
    """int,int->list
    must in sec!
    All the items in the list are starting time."""
    if input_file_length % step == 0:
        return [i * step for i in range(int(input_file_length / step))]
    else:
        return [i * step for i in range(int(input_file_length / step) + 1)]
#----------------------------------------------------------------------
def cut_one_segment(start_time, input_file = INPUT_FILE, step_time = SEG_TIME):
    """str,int,int,int-> None(File)
    start/stop time in sec, int.
    Input file should be readble by ffmpeg.
    In case the audio gives us trouble, the audio stream is disabled.
    TODO: use the same format as convert_one_segment()."""
    #start_time = sec_to_time(start_time)
    #All times can be in sec.
    #original_extname = get_extname(input_file)
    #Dont need since we will do a convert into loseless h264 format
    logging.info('Cutting original file piece, start at {start_time}...'.format(start_time = start_time))
    # Test shows that ffmpeg is able to handle the time
    command = 'ffmpeg -i {input_file} -c:v libx264 -an -preset ultrafast -qp 0 -ss {start_time} -t {step_time} {segment_cutted_dir}/{start_time}.mp4'.format(input_file = input_file, start_time = start_time, step_time = step_time, segment_cutted_dir = segment_cutted_dir)
    #print(command)
    if subprocess.call(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) == 0:
        #If ffmpeg return no error
        #shutil.move()
        return start_time
    else:
        #ffmpeg exit with error
        logging.warning('Segment cut failed! Start time at {start_time}'.format(start_time = start_time))
        return -1
    #if stop_time != 0:
        #os.system('ffmpeg -i \'' + input_file + '\' -ss ' + start_time + ' -t ' + stop_time + ' ' + tmpdir+' + /part' + str(part_num) + '.'+ original_extname)
    #else:
        #os.system('ffmpeg -i \'' + input_file + '\' -ss ' + start_time + ' ' + tmpdir+' + /part' + str(part_num) + '.'+ original_extname)
#----------------------------------------------------------------------
def convert_one_segment(start_time, arguments, full_command = ''):
    """"int,str,str->None
    INPUT_SEGMENT_FILE, OUPUT_SEGMENT_FILE can be used in the full command line to reduce the difficutly.
    """
    logging.info('Converting original file piece, start at {start_time}...'.format(start_time = start_time))
    input_segment_file = '{segment_cutted_dir}/{start_time}.mp4'.format(segment_cutted_dir = segment_cutted_dir, start_time = start_time)
    ouput_segment_file = '{segment_converted_dir}/{start_time}_h264.mp4'.format(segment_converted_dir = segment_converted_dir, start_time = start_time)
    if full_command == '':
        command = 'ffmpeg -i {INPUT_SEGMENT_FILE} {arguments} {OUPUT_SEGMENT_FILE}'.format(INPUT_SEGMENT_FILE = input_segment_file, OUPUT_SEGMENT_FILE = ouput_segment_file, arguments = arguments)
    else:
        command = full_command.format(INPUT_SEGMENT_FILE = input_segment_file, OUPUT_SEGMENT_FILE = ouput_segment_file)
    #os.system('x264 -i {start_time}.mkv {arguments}'.format(arguments = arguments))
    if subprocess.call(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) == 0:
        #If ffmpeg return no error
        #shutil.move()
        logging.info('Segment at {start_time} converted'.format(start_time = start_time))
        #segment_converted.append(start_time)
        if DELETE == 2:
            try:
                os.remove(input_segment_file)
            except:
                logging.warning('Cannot remove temp file!')
        return start_time
    else:
        #ffmpeg exit with error
        logging.warning('Segment convert failed! Start time at {start_time}'.format(start_time = start_time))
        return -1
#----------------------------------------------------------------------
def test_if_convert_success():
    """"""
    return len(segment_converted) == len(segment_list)
#----------------------------------------------------------------------
def concat_file(time_list = segment_converted, filename = 'video_to_convert'):
    """"""
    os.chdir(basedir)
    f = open('ff.txt', 'w')
    ff = ''
    cwd = segment_converted_dir
    for i in segment_converted:
        ff = ff + 'file \'{cwd}/{i}_h264.mp4\'\n'.format(cwd = cwd, i = i)
    ff = ff.encode("utf8")
    f.write(ff)
    f.close()
    if DELETE > 0:
        try:
            shutil.rmtree(segment_cutted_dir)
        except:
            logging.warning('Cannot delete temp dir!')
    logging.info('Concating videos...')
    command = 'ffmpeg -f concat -i ff.txt -c copy "' + filename + '".mp4'
    if subprocess.call(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) == 0:
        #If ffmpeg return no error
        #shutil.move()
        logging.info('Concated!')
    else:
        #ffmpeg exit with error
        logging.warning('Concat failed')
########################################################################
class CutVideo(threading.Thread):
    """Threaded Cut Video"""
    #----------------------------------------------------------------------
    def __init__(self, queue, out_queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.out_queue = out_queue
    #----------------------------------------------------------------------
    def run(self):
        while True:
            #grabs start time from queue
            start_time = self.queue.get()
            return_value = cut_one_segment(start_time, input_file=INPUT_FILE,
                                          step_time=SEG_TIME)
            if return_value == -1:
                global IS_ERROR
                #Error return code
                #logging.warning('[*] Segment Slicing Failed: Start time {start_time}'.format(start_time = start_time))
                IS_ERROR += 1
            else:
                self.out_queue.put(return_value)
                pass
            #signals to queue job is done
            self.queue.task_done()
########################################################################
class ConvertThread(threading.Thread):
    """Threaded Cut Video"""
    #----------------------------------------------------------------------
    def __init__(self, out_queue):
        threading.Thread.__init__(self)
        self.out_queue = out_queue
    #----------------------------------------------------------------------
    def run(self):
        while True:
            #grabs time from queue
            start_time = self.out_queue.get()
            return_value = convert_one_segment(start_time, arguments = arguments, full_command = full_command)
            if return_value == -1:
                #Error return code
                logging.warning('[*] Segment Convertion Failed: Start time {start_time}'.format(start_time = start_time))
                IS_ERROR += 1
            else:
                segment_converted.append(start_time)
                pass
            self.out_queue.task_done()
#----------------------------------------------------------------------
def main(INPUT_FILE, slicer_thread = 3, converter_thread = 3):
    """"""
    try:
        os.mkdir(segment_converted_dir)
        os.mkdir(segment_cutted_dir)
    except:
        pass
    start = time.time()
    logging.info('Probing the file...')
    input_file_length = time_to_sec(get_file_time(INPUT_FILE))
    start_time_list = make_segment_list(input_file_length, SEG_TIME)
    logging.info('Start the threading!')
    global queue
    global out_queue
    queue = Queue.Queue(int(slicer_thread))
    out_queue = Queue.Queue()
    main_threading(start_time_list=start_time_list, slicer_thread=slicer_thread,
                  converter_thread=slicer_thread)
    queue.join()
    out_queue.join()
    #print(sorted(segment_converted))
    #print(start_time_list)
    if sorted(segment_converted) != start_time_list:
        logging.fatal('Queue ERROR!')
        exit()
    else:
        concat_file(time_list=segment_converted, filename = os.path.basename(INPUT_FILE).split('.')[0] + '_converted')
    logging.info('DONE!')
    print("Elapsed Time: %s" % (time.time() - start))
    if DELETE > 0:
        try:
            shutil.rmtree(segment_converted_dir)
            shutil.rmtree(segment_cutted_dir)
            os.remove('ff.txt')
        except:
            pass
#----------------------------------------------------------------------
def main_threading(start_time_list = [], slicer_thread = 3, converter_thread = 3):
    #spawn a pool of threads, and pass them queue instance
    for i in range(int(slicer_thread)):
        t = CutVideo(queue, out_queue)
        t.setDaemon(True)
        t.start()
    for i in range(int(converter_thread)):
        dt = ConvertThread(out_queue)
        dt.setDaemon(True)
        dt.start()
    #populate queue with data
    for start_time in start_time_list:
        queue.put(start_time)
    #wait on the queue until everything has been processed
    queue.join()
    out_queue.join()
#----------------------------------------------------------------------
def usage():
    """"""
    print('''
    Parallel-Transcode
    https://github.com/cnbeining/parallel-transcode
    http://www.cnbeining.com/
    Beining@ACICFG
    Usage:
    python multi-ffmpeg.py (-h/help) (-i/input-file) (-s/slicer)
                           (-c/converter) (-q/queue-length)
                           (-a/arguments) (-f/full_command)
                           (-d/delete-temp)
    -h: Default: None
        Print this usage file.
    -i: Default: None
        The input file.
        Please make sure it can be read by ffmpeg.
    -s: Default: 3
        The thread number of Slicer that cut the file into loseless pieces.
    -c: Default: 3
        The thread number of Converter that convert the file into pieces.
    -q: Default: 10
        Max pieces.
        TODO
    -a: Default: None
        The arguments you want to pass to ffmpeg when encoding.
    -f: Default: None
        In case you want something special, just input whatever command you want here!
        To make sure the command can be read,
        use {INPUT_SEGMENT_FILE}, {OUPUT_SEGMENT_FILE} in the command line.
    -d: Default: 1
        If set to 0, Parallel-Transcode will not delete any temporary files.
        1: Only delete at the end of stage.
        2: Delete on the fly.
''')
if __name__=='__main__':
    argv_list = sys.argv[1:]
    try:
        opts, args = getopt.getopt(argv_list, "hi:s:c:q:t:a:f:d:",
                                   ['help', "input-file=", 'slicer=', 'converter=', 'queue-length=', 'segment-time=', 'arguments=', 'full_command=', 'delete-temp='])
    except getopt.GetoptError:
        usage()
        exit()
    for o, a in opts:
        if o in ('-h', '--help'):
            usage()
            exit()
        if o in ('-i', '--input-file'):
            INPUT_FILE = a
        if o in ('-s', '--slicer'):
            slicer_thread = int(a)
        if o in ('-c', '--converter'):
            converter_thread = int(a)
        if o in ('-q', '--queue-length'):
            queue_length = int(a)
        if o in ('-t', '--segment-time'):
            SEG_TIME = int(a)
        if o in ('-a', '--arguments'):
            arguments = a
        if o in ('-f', '--full_command'):
            full_command = a
        if o in ('-d', '--delete-temp'):
            DELETE = int(a)
    main(INPUT_FILE, slicer_thread=slicer_thread,
            converter_thread=converter_thread)

 

Leave a Reply

Your email address will not be published. Required fields are marked *