原文:基于opencv实现视频流的编解码和存储 - 2020.05.28

作者:AlexChung

主要使用python多线程处理IO密集型应用的特性.

一 个基本的视频处理和通信系统通常包括采集、预处理、视频编码、通信、图像处理以及显示等几个步骤:

1. 时间戳

时间戳(time stamp)时是指格林威治(GMT)时间1970年01月01日08时00分00秒起至当下的总秒数, 因此时间戳一定时大于或等于0. 时间戳作为一个能表示一份数据在某个特定时间之前已经存在的、 完整的、 可验证的数据. 时间戳通常是一个字符序列,唯一地标识某一刻的时间. 它的提出主要是为用户提供一份电子证据, 以证明用户的某些数据的产生时间.

时间戳转换 Python 实现

import time

if __name__ == "__main__":
    print("convert time to stamp:")
    local_time = time.localtime()
    format_local_time = time.strftime("%Y-%m-%d %H:%M:%S", local_time)
    print(format_local_time)
    local_stamp = time.mktime(local_time)
    print(local_stamp)

    print("convert stamp to time:")
    stamp_local_time = time.localtime(local_stamp)
    stamp_format_time = time.strftime("%Y-%m-%d %H:%M:%S", stamp_local_time)
    print(stamp_format_time)

    # get GMT start time by stamp
    print("Greenwich Mean Time start:")
    gmt_start_stamp = 0
    gmt_local_time = time.localtime(gmt_start_stamp)
    gmt_format_time = time.strftime("%Y-%m-%d %H:%M:%S", gmt_local_time)
    print(gmt_format_time)

输出如:

convert time to stamp:
2020-10-21 13:24:17
1603257857.0
convert stamp to time:
2020-10-21 13:24:17
Greenwich Mean Time start:
1970-01-01 08:00:00

2. ffourcc

FourCC是英文Four-Character Codes的缩写,可以翻译为四字符代码. 一个字符在上下文中占一个字节(byte)/8位(bit), 因此 fourccs 占一个4字节或32位的长度大小. ffourcc 是视频编解码器、压缩格式、媒体文件中使用的颜色或像素格式的标识符.

opencv 中使用fourcc作为编解码器的标识符. 关于fourcc支持的编解码器列表可以参考 fourcc.org

3. 视频流的编解码

在实际场景中,经常需要捕获现场摄像头传输的视频流数据,然后将数据输入到训练好的模型进行预测,并返回结果.

视频信号由于信息量大,传输网络带宽要求高. 为了节省传送带宽和存储空间 ,传输的数据流,一般都是经过特定编解码器编码后的数据流,编解码器的作用本质上就是图像的压缩. 因此对于视频流的处理,首先依赖于视频流的编解码处理.

在图像/视频处理中通常使用的工具库有 ffmpeg 和 opencv,都支持对视频流的编解码. 本文使用opencv 完成视频流的编解码操作.

opencv 中视频流数据的编解码可以分为两个步骤:

[1] - 使用 VideoCapture视频捕获对象,去捕获视频流, opencv会自动完成视频流的解码操作,并返回视频流的编码格式、帧率等信息.

VideoCapture支持的数据源接口有设备序号(如,本机摄像头的设备序号,0)、视频文件(如本地视频文件, “*.avi”),RTSP协议数据流(如hikvision网络网络摄像头,“ rtsp://{user}:{pwd}@{ip}//Streaming/Channels/{channel}”)等.

[2] - VideoWriter_fourcc获取对应编解码器的fourcc字节码标识符,VideoWrite 生成一个写入器.

4. 获取视频流的编码格式

参考fourcc编码规范,可以得到解码方式:

def get_video_format(cap):
    """
    get video format
    """
    raw_codec_format = int(cap.get(cv2.CAP_PROP_FOURCC))
    decoded_codec_format = (
        chr(raw_codec_format & 0xFF), 
        chr((raw_codec_format & 0xFF00) >> 8),
        chr((raw_codec_format & 0xFF0000) >> 16), 
        chr((raw_codec_format & 0xFF000000) >> 24))
    return decoded_codec_format

5. 捕获视频并显示

捕获本机摄像头并显示:

stream_path = 0
cap = cv2.VideoCapture(stream_path)
# get fps
fps = cap.get(cv2.CAP_PROP_FPS)

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        print("Can't receive frame (stream end?). Exiting ...")
        break
    frame = cv2.resize(frame, (dst_width, dst_height))
    wait_time = int(1000/fps)
    cv2.imshow("video", frame)
    if cv2.waitKey(wait_time) == ord('q'):
        break

6. 转换视频编码格式

[1] - 设置编码器格式(设置目标编码器格式为H.264编码对应的fourcc字节码为‘X264’)

# format 1
fourcc = cv2.VideoWriter_fourcc('X', '2', '6', '4')  # H.264 codec
# format 2
fourcc = cv2.VideoWriter_fourcc(*'X264')

[2] - 设置视频写入器

out = cv2.VideoWriter(filename=output_video, 
                     fourcc=fourcc, 
                     fps=dst_fps, 
                     frameSize=(dst_width, dst_height),
                     isColor=True)

fourcc 设置编码器格式, fps设置输出视频的帧率, frameSize设置输出视频每帧的尺寸大小.

6.1. 完整实现

import os
import cv2


video_dataset = '/path/to/videos'
rafting_video = os.path.join(video_dataset, 'rafting.avi')

output_path = "/path/to/outputs"
output_video = os.path.join(output_path, 'test.avi')


def get_video_format(cap):
    """
    get video format
    """
    raw_codec_format = int(cap.get(cv2.CAP_PROP_FOURCC))
    decoded_codec_format = (
        chr(raw_codec_format & 0xFF), 
        chr((raw_codec_format & 0xFF00) >> 8),
        chr((raw_codec_format & 0xFF0000) >> 16), 
        chr((raw_codec_format & 0xFF000000) >> 24))
    return decoded_codec_format


def convert_video_format(
    video_stream, 
    output_path, 
    dst_height=None, 
    dst_width=None, 
    dst_fps=None,  
    is_show=False):
    #
    cap = cv2.VideoCapture(video_stream)
    # step get video info
    fps = cap.get(cv2.CAP_PROP_FPS)
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))

    if dst_height is None:
        dst_height = height
    else:
        dst_height = dst_height
    if dst_width is None:
        dst_width = width
    else:
        dst_width = dst_width

    if dst_fps is None:
        dst_fps = fps

    # fourcc = cv2.VideoWriter_fourcc('a', 'v', 'c', '1')  # avc1 is one of format of h.264
    # fourcc = cv2.VideoWriter_fourcc(*'X264')
    fourcc = cv2.VideoWriter_fourcc('X', '2', '6', '4')  # H.264 codec
    out = cv2.VideoWriter(filename=output_path, 
                         fourcc=fourcc, 
                         fps=fps, 
                         frameSize=(dst_width, dst_height),
                         isColor=True)
    try:
        show_time_per_frame = int(1000 / dst_fps)
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                print("Can't receive frame (stream end?). Exiting ...")
                break
            frame = cv2.resize(frame, (dst_width, dst_height))
            out.write(frame)
            if is_show:
                cv2.imshow("video", frame)
                if cv2.waitKey(show_time_per_frame) == ord('q'):
                    break
        cap.release()
    except cv2.error as e:
        print(f"Failed to save video, due to {e}")
        raise e


if __name__ == "__main__":
    video_stream = rafting_video
    #get input video info
    cap = cv2.VideoCapture(video_stream)
    # cap.set(cv2.CAP_PROP_FPS, 25)
    # get fps
    fps = cap.get(cv2.CAP_PROP_FPS)
    print(fps)
    # get frame count
    count_frame = cap.get(cv2.CAP_PROP_FRAME_COUNT)
    print(count_frame)
    # get frame height and width
    height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
    width = cap.get(cv2.CAP_PROP_FRAME_WIDTH)
    print(f'Frame height {height}')
    print(f'Frame width {width}')
    # get codec format
    src_codec_format = get_video_format(cap)
    print(f'Video codec format: {src_codec_format}')
    #convert video fomat
    convert_video_format(video_stream=video_stream, output_path=output_video, is_show=True)

    #get target video codec format
    cap_output = cv2.VideoCapture(output_video)
    # get codec format
    dst_codec_format = get_video_format(cap_output)
    print(f'Video codec format: {dst_codec_format}')
    print('Done!')

运行结果如,

Video FPS: 25.0
Number frames of video: 248.0
Frame height: 240.0
Frame width: 320.0
Video codec format: ('X', 'V', 'I', 'D')
Can't receive frame (stream end?). Exiting ...
Target video codec format: ('X', '2', '6', '4')
Done!

7. 视频流的存储

在实际的场景中需要接收实时的视频流,利用训练好的离线模型预测或者进行在线训练. 但是这样就会遇到一个棘手的问题,不同模型的预测或推理的的最大帧率有很大区别,比如对于目标检测来说 Faster RCNN 在 GPU 上的处理帧率约5FPS,而YOLO V3 可以达到接近50FPS的处理帧率,而通常摄像头传输的视频流为25FPS或30FPS,帧率的不匹配导致一般无法直接对视频流进行处理.

针对上述问题,可以使用kafka去处理视频流,然后再去消费;或者将视频先保存到本地,再进行处理. 本文针对后一种情形.

7.1. 存储实时视频流为单个文件

实时视频流存储为单个文件,参考上述编码转换实现. 只需要把数据流接口修改为实时数据流即可实现.

def save_video_stream(video_stream, output_path):
    """
    save online video stream
    """
    cap = cv2.VideoCapture(video_stream)
    # step get video info
    fps = cap.get(cv2.CAP_PROP_FPS)
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))

    fourcc = cv2.VideoWriter_fourcc('X', '2', '6', '4')  # H.264 codec
    out = cv2.VideoWriter(
        filename=output_path, 
        fourcc=fourcc, 
        fps=fps, 
        frameSize=(width, height),
        isColor=True)
    try:
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                print("Can't receive frame (stream end?). Exiting ...")
                break
            out.write(frame)
        cap.release()
    except cv2.error as e:
        print(f"Failed to save video, due to {e}")
        raise e

7.2. 存储实时视频流为多个文件

场景需求: 直接将视频流保存为单个文件虽然简单可行,但是这意味着视频文件会一直被保存接口占用,无法实时地去消费视频去进行进一步处理或预测. 因此考虑,将视频保存为小的片段.

主要步骤处理步骤分为三步:

[1] - 创建一个全局缓冲队列

[2] - 将实时数据流,分段存储到一个队列(queue)缓存(Buffer)中

[3] -消费队列中保存的视频段,并存储到设备

采用了多线程策略,第二步和第三步分别分配一个子线程,完成存储.

[1] - 创建缓冲队列

# video buffer
video_buffer = queue.Queue()

[2] - 视频缓冲(捕获视频流并存储到buffer)

这里的关键点为如何将5秒的视频数据保存为一个视频片段,或者说如何获取5秒的视频流数据. 我们知道视频本质上是由一帧帧的数据组成的,那么获取5秒的数据流数据,本质上就是获取5秒内所有的视频帧,那么问题就转换为获取5秒内的视频帧的数量,对应计算公式容易得到:

$$ num\_frames = fps \times num\_second $$

其中,fps 为视频帧率; num_second 表示视频的时间长度(秒).

实现如:

fps = cap.get(cv2.CAP_PROP_FPS)
num_frame_per_clip = int(num_second_per_clips * fps)

保存到缓冲队列的数据采用(tmp_stamp, frames)为单位,其中tmp_stamp 表示当前片段的初始时间戳,frames为存储包含整个视频片段内视频帧的列表

视频缓冲的关键代码如下:

global video_buffer

while cap.isOpened():
    frames = []

    tmp_stamp = int(time.mktime(time.localtime()))
    for _ in range(num_frame_per_clip):
        ret, frame = cap.read()
        if not ret:
            print("Can't receive frame (stream end?). Exiting ...")
            break
            frame = cv2.resize(frame, (dst_width, dst_height))
            frames.append(frame)
        video_buffer.put((tmp_stamp, frames))
        break

[3] - 视频存储(获取buffer中的数据并存储到本地)

video_stream 为输入的视频流;save_path 为输出视频片段保存的设备文件夹.

def save_buffer_to_device(video_stream, save_path):
    """
    save buffer video to hardware device
    """
    global video_buffer

    if video_buffer.empty() is False:
        video_name, video = video_buffer.get()
        dst_height = video[0].shape[0]
        dst_width = video[0].shape[1]
        save_video(video_stream, 
                   save_path, 
                   video_name, 
                   video, 
                   dst_height, 
                   dst_width)

[4] - 应用多线程处理

tream_path 为输入视频流的路径.

num_second_per_clips 为每个视频流片段的长度,默认值5,表示每5秒的视频流数据保存为一个视频片段

多线程处理,采用多线程的高级接口 concurrent.futures.

def save_video_stream(stream_path=0, num_second_per_clips=5):
    """
    save video stream
    """

    cap = cv2.VideoCapture(stream_path)
    while cap.isOpened():
        with concurrent.futures.ThreadPoolExecutor() as executor:
            executor.submit(save_video_to_buffer, cap, num_second_per_clips)
            executor.submit(save_buffer_to_device, stream_path, output_path)
    cap.release()

[5] - 完整实现:

输出视频的采用H.264编码格式进行存储,帧率和帧的尺度大小与视频流保持一致.

import os
import queue
import cv2
import time
import concurrent.futures


output_path = "/path/to/outputs"

# video buffer
video_buffer = queue.Queue()


def save_video_to_buffer(
    cap, num_second_per_clips=None, frame_height=None, frame_width=None):
    """
    save video to buffer
    """
    global video_buffer

    # step get video info
    fps = cap.get(cv2.CAP_PROP_FPS)
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    num_frame_per_clip = int(num_second_per_clips * fps)

    if frame_height is None:
        dst_height = height
    else:
        dst_height = frame_height
    if frame_width is None:
        dst_width = width
    else:
        dst_width = frame_width

    try:
        while cap.isOpened():
            frames = []

            tmp_stamp = int(time.mktime(time.localtime()))
            for _ in range(num_frame_per_clip):
                ret, frame = cap.read()
                if not ret:
                    print("Can't receive frame (stream end?). Exiting ...")
                    break
                frame = cv2.resize(frame, (dst_width, dst_height))
                frames.append(frame)
            video_buffer.put((tmp_stamp, frames))
            break
        # cap.release()
    except cv2.error as e:
        print(f"Failed to save video, due to {e}")
        raise e


def save_video(video_stream, save_path, video_name, video, dst_height, dst_width):
    """
    save one clips video to device
    """
    cap = cv2.VideoCapture(video_stream)
    # step get video info
    fps = cap.get(cv2.CAP_PROP_FPS)
    fourcc = cv2.VideoWriter_fourcc('X', '2', '6', '4')  # H.264 codec

    video_path = os.path.join(save_path, str(video_name) + '.avi')
    out = cv2.VideoWriter(filename=video_path, 
                          fourcc=fourcc, 
                          fps=fps, 
                          frameSize=(dst_width, dst_height),
                          isColor=True)
    try:
        for i in range(len(video)):
            out.write(video[i])
        out.release()
        cap.release()
    except cv2.error as e:
        print(f"Failed to save video, due to {e}")
        raise e


def save_buffer_to_device(video_stream, save_path):
    """
    save buffer video to hardware device
    """
    global video_buffer

    if video_buffer.empty() is False:
        video_name, video = video_buffer.get()
        dst_height = video[0].shape[0]
        dst_width = video[0].shape[1]
        save_video(video_stream, save_path, 
                   video_name, video, 
                   dst_height, dst_width)


def save_video_stream(stream_path, output_path, num_second_per_clips=5):
    """
    save video stream
    """

    cap = cv2.VideoCapture(stream_path)
    while cap.isOpened():
        with concurrent.futures.ThreadPoolExecutor() as executor:
            executor.submit(save_video_to_buffer, cap, num_second_per_clips)
            executor.submit(save_buffer_to_device, stream_path, output_path)
    cap.release()


if __name__ == "__main__":
    stream_path = 0
    save_video_stream(stream_path, output_path, num_second_per_clips=5)

7.3. 视频流保存过程中出现丢帧的原因和解决

[1] - 丢帧现象

视频片段的名称片段开始时的时间戳,比如采用5秒为间隔进行视频的存储. 那么理论上每个视频片段的名称所代表的时间戳应该是以 5 为等差中项的等差数列,但是现在的差值为7,这意味着丢失了每隔5秒就会丢失2秒的时间帧. 这显然是有问题的.

[2] - 导致丢帧的视频流缓存原代码

def save_video_to_buffer(video_stream, num_second_per_clips=None, frame_height=None, frame_width=None):
    """
    save video to buffer
    """
    global video_buffer

    cap = cv2.VideoCapture(video_stream)
    # step get video info
    fps = cap.get(cv2.CAP_PROP_FPS)

    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    num_frame_per_clip = int(num_second_per_clips * fps)

    if frame_height is None:
        dst_height = height
    else:
        dst_height = frame_height
    if frame_width is None:
        dst_width = width
    else:
        dst_width = frame_width

    try:
        while cap.isOpened():
            frames = []

            tmp_stamp = int(time.mktime(time.localtime()))
            for _ in range(num_frame_per_clip):
                ret, frame = cap.read()
                if not ret:
                    print("Can't receive frame (stream end?). Exiting ...")
                    break
                frame = cv2.resize(frame, (dst_width, dst_height))
                frames.append(frame)
            video_buffer.put((tmp_stamp, frames))
            break
        # cap.release()
    except cv2.error as e:
        print(f"Failed to save video, due to {e}")
        raise e

[3] - 获取初始化VideoCapture所需时间

start = time.perf_counter()
cap = cv2.VideoCapture(stream_path)
finish = time.perf_counter()
print(f'Finished in {round(finish - start, 2)} second(s)')  
# Finished in 2.77 second(s) 

视频捕获对象的创建过程,竟然耗时近 3秒,因此需要解决视频捕获对象创建方式,去解决丢帧问题.

[4] - 解决思路

当前丢帧是因为每次启动视频缓冲线程时,调用VideoCapture接口创建视频捕获对象的过程太多的时间损耗,导致创建过程过程中的视频流不能被写入缓冲,而导致丢帧.

其实并不需要每次启动线程都去创建一个视频捕获对象,可以通过使用全局视频捕获对象, 在视频流写缓冲阶段调用已经创建的的全局视频捕获器. 通过这种措施,实现只进行一次视频捕获对象创建,多次使用,从而避免多次创建的时间损耗,解决丢帧.

[5] - 优化后的视频流缓冲代码

与前一种方法的差别:

入口参数:用cap 替换 video_stream, cap代表一个全局的视频捕获模块

代码块内部:删除视频捕获模块对应代码cap = cv2.VideoCapture(video_stream)

def save_video_to_buffer(cap, num_second_per_clips=None, frame_height=None, frame_width=None):
    """
    save video to buffer
    """
    global video_buffer

    # step get video info
    fps = cap.get(cv2.CAP_PROP_FPS)

    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    num_frame_per_clip = int(num_second_per_clips * fps)

    if frame_height is None:
        dst_height = height
    else:
        dst_height = frame_height
    if frame_width is None:
        dst_width = width
    else:
        dst_width = frame_width

    try:
        while cap.isOpened():
            frames = []

            tmp_stamp = int(time.mktime(time.localtime()))
            for _ in range(num_frame_per_clip):
                ret, frame = cap.read()
                if not ret:
                    print("Can't receive frame (stream end?). Exiting ...")
                    break
                frame = cv2.resize(frame, (dst_width, dst_height))
                frames.append(frame)
            video_buffer.put((tmp_stamp, frames))
            break
        # cap.release()
    except cv2.error as e:
        print(f"Failed to save video, due to {e}")
        raise e
Last modification:October 21st, 2020 at 02:08 pm