import v4l2 import fcntl import os import mmap import numpy as np import cv2 import time # Y12P FOURCC 'Y12P' V4L2_PIX_FMT_Y12P = 0x50323159 def y12p_numpy_reshape(pack_buf: np.ndarray, out: np.ndarray): n_trip = pack_buf.size // 3 triplets = pack_buf.reshape(n_trip, 3) b0 = triplets[:,0] b1 = triplets[:,1] b2 = triplets[:,2] pix0 = (b0.astype(np.uint16) << 4) | (b2 >> 4) pix1 = (b1.astype(np.uint16) << 4) | (b2 & 0x0F) out[0::2] = pix0 out[1::2] = pix1 # 硬件参数 WIDTH = 2464 HEIGHT = 2056 DEV_PATH = "/dev/video0" BUFFER_COUNT = 4 WIN_NAME = "Y12P RAW Preview" out_16bit = np.empty(WIDTH * HEIGHT, dtype=np.uint16) buffers = [] fd = -1 frame_count = 0 start_time = time.time() try: fd = os.open(DEV_PATH, os.O_RDWR, 0) cv2.namedWindow(WIN_NAME, cv2.WINDOW_NORMAL) cv2.resizeWindow(WIN_NAME, 1280, 960) # 设置Y12P格式 fmt = v4l2.v4l2_format() fmt.type = v4l2.V4L2_BUF_TYPE_VIDEO_CAPTURE fmt.fmt.pix.width = WIDTH fmt.fmt.pix.height = HEIGHT fmt.fmt.pix.pixelformat = V4L2_PIX_FMT_Y12P fmt.fmt.pix.field = v4l2.V4L2_FIELD_NONE fcntl.ioctl(fd, v4l2.VIDIOC_S_FMT, fmt) # 读取实际参数 fcntl.ioctl(fd, v4l2.VIDIOC_G_FMT, fmt) stride = fmt.fmt.pix.bytesperline frame_size = fmt.fmt.pix.sizeimage print(f"设备生效参数:行stride={stride}, 单帧总字节={frame_size}") # 申请MMAP缓冲 reqbuf = v4l2.v4l2_requestbuffers() reqbuf.count = BUFFER_COUNT reqbuf.type = v4l2.V4L2_BUF_TYPE_VIDEO_CAPTURE reqbuf.memory = v4l2.V4L2_MEMORY_MMAP fcntl.ioctl(fd, v4l2.VIDIOC_REQBUFS, reqbuf) # 映射所有缓冲并入队 for idx in range(reqbuf.count): buf = v4l2.v4l2_buffer() buf.type = v4l2.V4L2_BUF_TYPE_VIDEO_CAPTURE buf.memory = v4l2.V4L2_MEMORY_MMAP buf.index = idx fcntl.ioctl(fd, v4l2.VIDIOC_QUERYBUF, buf) map_data = mmap.mmap(fd, buf.length, mmap.MAP_SHARED, offset=buf.m.offset) buffers.append((buf, map_data)) fcntl.ioctl(fd, v4l2.VIDIOC_QBUF, buf) # 尝试启动流,捕获启动错误单独提示 stream_on = v4l2.v4l2_buf_type(v4l2.V4L2_BUF_TYPE_VIDEO_CAPTURE) try: fcntl.ioctl(fd, v4l2.VIDIOC_STREAMON, stream_on) except OSError as e: raise RuntimeError(f"VIDIOC_STREAMON 启动失败: {e}") from e print("采集已启动,按ESC关闭窗口退出\n") while True: buf = v4l2.v4l2_buffer() buf.type = v4l2.V4L2_BUF_TYPE_VIDEO_CAPTURE buf.memory = v4l2.V4L2_MEMORY_MMAP fcntl.ioctl(fd, v4l2.VIDIOC_DQBUF, buf) map_data = buffers[buf.index][1] raw_arr = np.frombuffer(map_data, dtype=np.uint8, count=buf.bytesused).copy() y12p_numpy_reshape(raw_arr, out_16bit) img_12bit = out_16bit.reshape(HEIGHT, WIDTH) img_8bit = (img_12bit >> 4).astype(np.uint8) cv2.imshow(WIN_NAME, img_8bit) frame_count += 1 now = time.time() if now - start_time >= 1.0: print(f"FPS: {frame_count:.1f}") frame_count = 0 start_time = now key = cv2.waitKey(1) & 0xFF if key == 27: break fcntl.ioctl(fd, v4l2.VIDIOC_QBUF, buf) except KeyboardInterrupt: print("\n收到Ctrl+C停止信号,正在退出...") except RuntimeError as e: print(f"\n采集启动失败: {e}") finally: cv2.destroyAllWindows() if fd != -1: stream_off = v4l2.v4l2_buf_type(v4l2.V4L2_BUF_TYPE_VIDEO_CAPTURE) try: fcntl.ioctl(fd, v4l2.VIDIOC_STREAMOFF, stream_off) except Exception: pass for _, map_data in buffers: try: map_data.close() except BufferError: pass os.close(fd) print("设备与窗口已全部关闭")