趁著有空檔,使用 gemini,請 gemini教我使用 gstreamer 與 gst-launch-1.0

於是就問gemini一個主題,請他寫一個python程式,可以利用 winodws 11上面的 usb camera來做一下人臉辨識的程式

也就是 在 Windows 11 上成功跑通 GStreamer + Python + Webcam + AI 推論迴圈

目前的功能為

.用現代的 深度學習 (Deep Learning, DNN) 模型
deploy.prototxt (模型結構描述)
res10_300x300_ssd_iter_140000.caffemodel (訓練好的權重)

.辨識人臉,並加上框框,並秀出辨識信心度

.左上角秀出 FPS


執行方法 
下載以下兩個檔案並放在同一個目錄下
https://raw.githubusercontent.com/opencv/opencv/master/samples/dnn/face_detector/deploy.prototxt

https://www.google.com/search?q=https://raw.githubusercontent.com/opencv/opencv_3rdparty/dnn_samples_face_detector_20170830/res10_300x300_ssd_iter_140000.caffemodel

python ai_face_stream_native.py

ai_face_stream_native.py 程式碼如下:

import os
import sys
import time
import numpy as np
import cv2

# --- 1. Windows 環境設定 (維持不變) ---
if os.name == 'nt':
gst_root = r"C:\Program Files\gstreamer\1.0\msvc_x86_64"
path_bin = os.path.join(gst_root, "bin")
path_plugins = os.path.join(gst_root, "lib", "gstreamer-1.0")

if os.path.exists(path_bin):
os.add_dll_directory(path_bin)
os.environ['PATH'] = path_bin + ";" + os.environ['PATH']

if os.path.exists(path_plugins):
os.environ['GST_PLUGIN_PATH'] = path_plugins

try:
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
except ImportError:
print("❌ GStreamer 載入失敗")
sys.exit(1)

Gst.init(None)

# --- 2. 設定接收端 IP (請確認這裡!) ---
RECEIVER_IP = "127.0.0.1" # 如果是別台電腦,請改 IP
RECEIVER_PORT = 5000

# --- 3. 建立發送管線 (Sender Pipeline) ---
# 這是這段程式碼最核心的改變:我們手動建立一條 "發射" 管線
# appsrc: 允許 Python 把資料 "推" 進去
# openh264enc: 使用 Cisco 開源編碼器 (Windows 預設通常有這個)
# 修改 sender_cmd
# 1. bitrate=3000000: 設定為 3 Mbps (畫質會大幅提升)
# 2. complexity=0: 維持低複雜度以求速度,如果電腦夠快可以改成 1 或 2 換取更好畫質
sender_cmd = f"""
appsrc name=mysource format=3 is-live=True !
video/x-raw,format=BGR,width=1280,height=720,framerate=30/1 !
videoconvert !
openh264enc complexity=0 bitrate=3000000 !
rtph264pay config-interval=1 pt=96 !
udpsink host={RECEIVER_IP} port={RECEIVER_PORT} sync=false
"""

print("建立發送管線...")
try:
sender_pipeline = Gst.parse_launch(sender_cmd)
sender_src = sender_pipeline.get_by_name("mysource")
sender_pipeline.set_state(Gst.State.PLAYING)
print(f"📡 串流發射準備就緒 -> {RECEIVER_IP}:{RECEIVER_PORT}")
except Exception as e:
print(f"❌ 發送管線建立失敗: {e}")
print("請確認是否有安裝 openh264enc (通常包含在 GStreamer 安裝包中)")
sys.exit(1)

# --- 4. 載入 AI 模型 (維持不變) ---
protoPath = "deploy.prototxt"
modelPath = "res10_300x300_ssd_iter_140000.caffemodel"
if not os.path.exists(protoPath) or not os.path.exists(modelPath):
print("❌ 找不到模型檔案")
sys.exit(1)

net = cv2.dnn.readNetFromCaffe(protoPath, modelPath)
print("✅ AI 模型載入成功!")

prev_frame_time = 0

# --- 5. 核心處理函數 ---
def on_new_sample(sink):
global prev_frame_time

sample = sink.emit("pull-sample")
buf = sample.get_buffer()
caps = sample.get_caps()
height = caps.get_structure(0).get_value('height')
width = caps.get_structure(0).get_value('width')

result, mapinfo = buf.map(Gst.MapFlags.READ)
if result:
# 計算 FPS
new_frame_time = time.time()
fps = 1 / (new_frame_time - prev_frame_time) if (new_frame_time - prev_frame_time) > 0 else 0
prev_frame_time = new_frame_time

# 轉成圖片
img_rgb = np.ndarray((height, width, 3), buffer=mapinfo.data, dtype=np.uint8)
img_bgr = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2BGR)

# --- AI 偵測 ---
blob = cv2.dnn.blobFromImage(cv2.resize(img_bgr, (300, 300)), 1.0, (300, 300), (104.0, 177.0, 123.0))
net.setInput(blob)
detections = net.forward()

for i in range(0, detections.shape[2]):
confidence = detections[0, 0, i, 2]
if confidence > 0.6:
box = detections[0, 0, i, 3:7] * np.array([width, height, width, height])
(startX, startY, endX, endY) = box.astype("int")
text = f"{confidence * 100:.2f}%"
y = startY - 10 if startY - 10 > 10 else startY + 10
cv2.rectangle(img_bgr, (startX, startY), (endX, endY), (0, 255, 0), 2)
cv2.putText(img_bgr, text, (startX, y), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)

# 顯示 FPS
cv2.putText(img_bgr, f"FPS: {int(fps)}", (20, 50), cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 255, 255), 3)

# --- 關鍵修改:發送串流 ---
# 1. 取得 byte 資料
data = img_bgr.tobytes()
# 2. 建立新的 GStreamer Buffer
gst_buffer = Gst.Buffer.new_allocate(None, len(data), None)
gst_buffer.fill(0, data)
# 3. 設定 Buffer 的時間戳記 (這對串流很重要)
gst_buffer.pts = buf.pts
gst_buffer.dts = buf.dts
gst_buffer.duration = buf.duration
# 4. 推送到發送管線
sender_src.emit("push-buffer", gst_buffer)

# 本地顯示
small_frame = cv2.resize(img_bgr, None, fx=0.5, fy=0.5)
cv2.imshow('Sender (Local)', small_frame)
cv2.waitKey(1)

buf.unmap(mapinfo)

return Gst.FlowReturn.OK

# --- 6. 建立攝影機接收管線 ---
# 注意:這裡的 width/height 必須跟上面的 sender_cmd 一致 (1280x720)
cmd = """
mfvideosrc device-index=0 ! decodebin ! videoconvert ! videoscale ! video/x-raw, width=1280, height=720, format=RGB ! appsink name=mysink emit-signals=True drop=True
"""

pipeline = Gst.parse_launch(cmd)
appsink = pipeline.get_by_name("mysink")
appsink.connect("new-sample", on_new_sample)

print("🚀 系統啟動中... (按 Ctrl+C 停止)")
pipeline.set_state(Gst.State.PLAYING)

loop = GLib.MainLoop()
try:
loop.run()
except KeyboardInterrupt:
print("停止中...")
pipeline.set_state(Gst.State.NULL)
sender_pipeline.set_state(Gst.State.NULL)
cv2.destroyAllWindows()


執行後,就可以在 windows上面看到偵測畫面了


以上做紀錄並方便日後查詢。

創作者介紹
創作者 CuteParrot 的頭像
CuteParrot

馴龍窩

CuteParrot 發表在 痞客邦 留言(0) 人氣(22)