import v4l2 import fcntl import os import mmap import numpy as np import cv2 import time # Y10P FOURCC 手动定义 V4L2_PIX_FMT_Y10P = 0x50303159 # 'Y10P' def y10p_numpy_decode_with_padding(pack_buf: np.ndarray, out: np.ndarray, width=2464, height=2056): pix_per_group = 4 byte_per_group = 5 row_valid_bytes = width // pix_per_group * byte_per_group row_total_bytes = row_valid_bytes + 8 assert pack_buf.size == row_total_bytes * height, "帧数据尺寸不匹配stride*height" total_pixel = width * height assert out.dtype == np.uint16 and out.size == total_pixel frame_rows = pack_buf.reshape(height, row_total_bytes) valid_y10p = frame_rows[:, :row_valid_bytes].ravel() n_group = valid_y10p.size // byte_per_group groups = valid_y10p.reshape(n_group, byte_per_group) b0 = groups[:, 0] b1 = groups[:, 1] b2 = groups[:, 2] b3 = groups[:, 3] b4 = groups[:, 4] lo0 = (b4 >> 6) & 0x03 lo1 = (b4 >> 4) & 0x03 lo2 = (b4 >> 2) & 0x03 lo3 = b4 & 0x03 p0 = (b0.astype(np.uint16) << 2) | lo0 p1 = (b1.astype(np.uint16) << 2) | lo1 p2 = (b2.astype(np.uint16) << 2) | lo2 p3 = (b3.astype(np.uint16) << 2) | lo3 out[0::4] = p0 out[1::4] = p1 out[2::4] = p2 out[3::4] = p3 # -------------------------- 硬件配置 -------------------------- WIDTH = 2464 HEIGHT = 2056 DEV_PATH = "/dev/video0" BUFFER_COUNT = 4 WIN_NAME = "Y10P RAW Preview" # 预分配10bit输出缓冲区 out_16bit = np.empty(WIDTH * HEIGHT, dtype=np.uint16) buffers = [] fd = -1 # FPS统计变量 frame_count = 0 start_time = time.time() try: fd = os.open(DEV_PATH, os.O_RDWR, 0) cv2.namedWindow(WIN_NAME, cv2.WINDOW_NORMAL) #cv2.resizeWindow(WIN_NAME, 1280, 960) # 配置V4L2格式 Y10P fmt = v4l2.v4l2_format() fmt.type = v4l2.V4L2_BUF_TYPE_VIDEO_CAPTURE fmt.fmt.pix.width = WIDTH fmt.fmt.pix.height = HEIGHT fmt.fmt.pix.pixelformat = V4L2_PIX_FMT_Y10P fmt.fmt.pix.field = v4l2.V4L2_FIELD_NONE fcntl.ioctl(fd, v4l2.VIDIOC_S_FMT, fmt) # 读取驱动实际生效参数 fcntl.ioctl(fd, v4l2.VIDIOC_G_FMT, fmt) stride = fmt.fmt.pix.bytesperline frame_size = fmt.fmt.pix.sizeimage print(f"设备生效参数:行stride={stride}, 单帧总字节={frame_size}") # 请求MMAP缓冲 reqbuf = v4l2.v4l2_requestbuffers() reqbuf.count = BUFFER_COUNT reqbuf.type = v4l2.V4L2_BUF_TYPE_VIDEO_CAPTURE reqbuf.memory = v4l2.V4L2_MEMORY_MMAP fcntl.ioctl(fd, v4l2.VIDIOC_REQBUFS, reqbuf) # 内存映射缓冲 for idx in range(reqbuf.count): buf = v4l2.v4l2_buffer() buf.type = v4l2.V4L2_BUF_TYPE_VIDEO_CAPTURE buf.memory = v4l2.V4L2_MEMORY_MMAP buf.index = idx fcntl.ioctl(fd, v4l2.VIDIOC_QUERYBUF, buf) map_data = mmap.mmap(fd, buf.length, mmap.MAP_SHARED, offset=buf.m.offset) buffers.append((buf, map_data)) fcntl.ioctl(fd, v4l2.VIDIOC_QBUF, buf) # 启动采集流 stream_on = v4l2.v4l2_buf_type(v4l2.V4L2_BUF_TYPE_VIDEO_CAPTURE) fcntl.ioctl(fd, v4l2.VIDIOC_STREAMON, stream_on) print("采集已启动,按ESC关闭窗口退出\n") while True: buf = v4l2.v4l2_buffer() buf.type = v4l2.V4L2_BUF_TYPE_VIDEO_CAPTURE buf.memory = v4l2.V4L2_MEMORY_MMAP fcntl.ioctl(fd, v4l2.VIDIOC_DQBUF, buf) map_data = buffers[buf.index][1] # 关键改动:.copy() 拷贝数据,断开mmap内存引用 raw_arr = np.frombuffer(map_data, dtype=np.uint8, count=buf.bytesused).copy() # Y10P解码 y10p_numpy_decode_with_padding(raw_arr, out_16bit, WIDTH, HEIGHT) img_10bit = out_16bit.reshape(HEIGHT, WIDTH) img_8bit = (img_10bit >> 2).astype(np.uint8) cv2.imshow(WIN_NAME, img_8bit) # FPS打印 frame_count += 1 now = time.time() if now - start_time >= 1.0: print(f"FPS: {frame_count:.1f}") frame_count = 0 start_time = now key = cv2.waitKey(1) & 0xFF if key == 27: break fcntl.ioctl(fd, v4l2.VIDIOC_QBUF, buf) except KeyboardInterrupt: print("\n收到Ctrl+C停止信号,正在退出...") finally: # 销毁窗口,释放图像数组引用 cv2.destroyAllWindows() # 手动删除大数组,彻底清除内存视图引用 del raw_arr, img_10bit, img_8bit, out_16bit if fd != -1: stream_off = v4l2.v4l2_buf_type(v4l2.V4L2_BUF_TYPE_VIDEO_CAPTURE) try: fcntl.ioctl(fd, v4l2.VIDIOC_STREAMOFF, stream_off) except Exception: pass # 此时无导出指针,可安全close mmap for _, map_data in buffers: try: map_data.close() except BufferError: pass os.close(fd) print("设备与窗口已全部关闭")