根據在上一次的

face detect 後,與公司AI部門的同事閒聊後,學到可以設定ROI來設定想要辨識的區域

於是自己回家練習一下,與GEMINI對話後,請 GEMINI base在前一版的基礎下,做到以下功能:

===

加上只在整個畫面的某個區域才偵測人臉

並在畫面上秀出是否偵測到人臉

如果偵測到,就發出語音"偵測到人臉",否則就不要出聲

並秀出偵測到人臉的個數

==


軟體安裝

---

需求
pip install pyttsx3
pip install pywin32
執行
python ai_face_stream_native-v2.py

---


以下為 gemini產生的程式碼

import os
import sys
import time
import numpy as np
import cv2
import threading
import queue

# --- 0. 語音功能設定 (TTS) ---
speech_queue = queue.Queue()
is_face_present = False # 記錄目前是否有人臉的狀態 (狀態機)
last_face_time = 0 # 記錄最後一次看到人臉的時間
FACE_TIMEOUT = 3.0 # 離開畫面超過 3 秒才重置狀態

def tts_worker():
"""獨立的語音執行緒,避免阻塞影像串流"""
if os.name == 'nt':
# Windows 環境:直接使用原生的 SAPI5,避開 pyttsx3 的卡死 Bug
try:
import pythoncom
import win32com.client
# 必須在 Thread 中初始化 COM
pythoncom.CoInitialize()
speaker = win32com.client.Dispatch("SAPI.SpVoice")
print("✅ 成功載入 Windows 原生語音引擎 (win32com)")

while True:
text = speech_queue.get()
if text is None:
break
print(f"🗣️ [語音播報] {text}")
speaker.Speak(text)
return
except Exception as e:
print(f"⚠️ win32com 語音初始化失敗: {e},將嘗試備用方案...")

# 備用方案:pyttsx3
try:
import pyttsx3
except ImportError:
print("❌ 找不到語音模組,語音功能停用。")
return

while True:
text = speech_queue.get()
if text is None:
break
print(f"🗣️ [語音播報] {text}")
try:
# 每次發聲前重新 init,發聲完後銷毀,避免 runAndWait 卡死
engine = pyttsx3.init()
engine.say(text)
engine.runAndWait()
except Exception as e:
print(f"TTS 錯誤: {e}")

# 啟動語音執行緒
tts_thread = threading.Thread(target=tts_worker, daemon=True)
tts_thread.start()

# --- 1. Windows 環境設定 ---
if os.name == 'nt':
gst_root = r"C:\Program Files\gstreamer\1.0\msvc_x86_64"
path_bin = os.path.join(gst_root, "bin")
path_plugins = os.path.join(gst_root, "lib", "gstreamer-1.0")

if os.path.exists(path_bin):
os.add_dll_directory(path_bin)
os.environ['PATH'] = path_bin + ";" + os.environ['PATH']

if os.path.exists(path_plugins):
os.environ['GST_PLUGIN_PATH'] = path_plugins

try:
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
except ImportError:
print("❌ GStreamer 載入失敗")
sys.exit(1)

Gst.init(None)

# --- 2. 設定接收端 IP ---
RECEIVER_IP = "127.0.0.1"
RECEIVER_PORT = 5000

# --- 3. 建立發送管線 (Sender Pipeline) ---
sender_cmd = f"""
appsrc name=mysource format=3 is-live=True !
video/x-raw,format=BGR,width=1280,height=720,framerate=30/1 !
videoconvert !
openh264enc complexity=0 bitrate=3000000 !
rtph264pay config-interval=1 pt=96 !
udpsink host={RECEIVER_IP} port={RECEIVER_PORT} sync=false
"""

print("建立發送管線...")
try:
sender_pipeline = Gst.parse_launch(sender_cmd)
sender_src = sender_pipeline.get_by_name("mysource")
sender_pipeline.set_state(Gst.State.PLAYING)
print(f"📡 串流發射準備就緒 -> {RECEIVER_IP}:{RECEIVER_PORT}")
except Exception as e:
print(f"❌ 發送管線建立失敗: {e}")
sys.exit(1)

# --- 4. 載入 AI 模型 ---
protoPath = "deploy.prototxt"
modelPath = "res10_300x300_ssd_iter_140000.caffemodel"
if not os.path.exists(protoPath) or not os.path.exists(modelPath):
print("❌ 找不到模型檔案")
sys.exit(1)

net = cv2.dnn.readNetFromCaffe(protoPath, modelPath)
print("✅ AI 模型載入成功!")

prev_frame_time = 0

# --- 定義偵測區域 (Region of Interest) ---
# 假設原始影像是 1280x720,我們在正中間開一個 640x480 的區域
ROI_X1, ROI_Y1 = 320, 120
ROI_X2, ROI_Y2 = 960, 600
ROI_W = ROI_X2 - ROI_X1
ROI_H = ROI_Y2 - ROI_Y1

# --- 5. 核心處理函數 ---
def on_new_sample(sink):
global prev_frame_time, is_face_present, last_face_time

sample = sink.emit("pull-sample")
buf = sample.get_buffer()
caps = sample.get_caps()
height = caps.get_structure(0).get_value('height')
width = caps.get_structure(0).get_value('width')

result, mapinfo = buf.map(Gst.MapFlags.READ)
if result:
new_frame_time = time.time()
fps = 1 / (new_frame_time - prev_frame_time) if (new_frame_time - prev_frame_time) > 0 else 0
prev_frame_time = new_frame_time

# 轉成圖片
img_rgb = np.ndarray((height, width, 3), buffer=mapinfo.data, dtype=np.uint8)
img_bgr = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2BGR)

# --- 裁切出特定區域 (ROI) ---
# 只把這個區域送給 AI 辨識,可以提升速度並限定偵測範圍
roi_img = img_bgr[ROI_Y1:ROI_Y2, ROI_X1:ROI_X2]

# --- AI 偵測 (針對 ROI) ---
blob = cv2.dnn.blobFromImage(cv2.resize(roi_img, (300, 300)), 1.0, (300, 300), (104.0, 177.0, 123.0))
net.setInput(blob)
detections = net.forward()

face_count = 0

# 畫出偵測區域的框線 (藍色)
cv2.rectangle(img_bgr, (ROI_X1, ROI_Y1), (ROI_X2, ROI_Y2), (255, 0, 0), 2)
cv2.putText(img_bgr, "Detection Area", (ROI_X1, ROI_Y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 0), 2)

for i in range(0, detections.shape[2]):
confidence = detections[0, 0, i, 2]
if confidence > 0.6:
face_count += 1
# 計算在 ROI 內的座標
box = detections[0, 0, i, 3:7] * np.array([ROI_W, ROI_H, ROI_W, ROI_H])
(startX, startY, endX, endY) = box.astype("int")

# 將 ROI 的座標轉換回「原始全畫面」的座標,才能畫在正確位置
startX += ROI_X1
startY += ROI_Y1
endX += ROI_X1
endY += ROI_Y1

text = f"{confidence * 100:.2f}%"
y = startY - 10 if startY - 10 > 10 else startY + 10
cv2.rectangle(img_bgr, (startX, startY), (endX, endY), (0, 255, 0), 2)
cv2.putText(img_bgr, text, (startX, y), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)

current_time = time.time()

# --- 判斷與顯示狀態 (狀態機防閃爍機制) ---
if face_count > 0:
status_text = f"Status: Face Detected (Count: {face_count})"
status_color = (0, 255, 0) # 綠色

# 狀態機:如果原本沒有人臉,現在偵測到了 -> 觸發語音
if not is_face_present:
is_face_present = True
print(">>> 🟢 狀態切換:進入 ROI,偵測到人臉!")
speech_queue.put("偵測到人臉")

# 更新最後看到人臉的時間
last_face_time = current_time
else:
status_text = "Status: No Face"
status_color = (0, 0, 255) # 紅色

# 狀態機:如果原本有人臉,且離開畫面已經超過 FACE_TIMEOUT 秒 -> 重置狀態
if is_face_present and (current_time - last_face_time > FACE_TIMEOUT):
is_face_present = False
print(f">>> 🔴 狀態切換:人臉已離開 ROI 超過 {FACE_TIMEOUT} 秒,狀態重置!")

# 顯示狀態與人臉個數
cv2.putText(img_bgr, status_text, (20, 90), cv2.FONT_HERSHEY_SIMPLEX, 1.0, status_color, 3)
# 顯示 FPS
cv2.putText(img_bgr, f"FPS: {int(fps)}", (20, 50), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 255, 255), 3)

# --- 發送串流 ---
data = img_bgr.tobytes()
gst_buffer = Gst.Buffer.new_allocate(None, len(data), None)
gst_buffer.fill(0, data)
gst_buffer.pts = buf.pts
gst_buffer.dts = buf.dts
gst_buffer.duration = buf.duration
sender_src.emit("push-buffer", gst_buffer)

# 本地顯示
small_frame = cv2.resize(img_bgr, None, fx=0.5, fy=0.5)
cv2.imshow('Sender (Local)', small_frame)
cv2.waitKey(1)

buf.unmap(mapinfo)

return Gst.FlowReturn.OK

# --- 6. 建立攝影機接收管線 ---
cmd = """
mfvideosrc device-index=0 ! decodebin ! videoconvert ! videoscale ! video/x-raw, width=1280, height=720, format=RGB ! appsink name=mysink emit-signals=True drop=True
"""

pipeline = Gst.parse_launch(cmd)
appsink = pipeline.get_by_name("mysink")
appsink.connect("new-sample", on_new_sample)

print("🚀 系統啟動中... (按 Ctrl+C 停止)")
pipeline.set_state(Gst.State.PLAYING)

loop = GLib.MainLoop()
try:
loop.run()
except KeyboardInterrupt:
print("停止中...")
pipeline.set_state(Gst.State.NULL)
sender_pipeline.set_state(Gst.State.NULL)
speech_queue.put(None) # 關閉語音執行緒
cv2.destroyAllWindows()



執行後,當人臉進去辨識區域ROI後,就會撥放語音 "偵測到人臉"

人臉移出ROI三秒後,再次進入ROI,就會再次撥放語音 "偵測到人臉"

並在畫面上秀出偵測到人臉的數量。


文章標籤
geminiface detect
全站熱搜
創作者介紹
創作者 CuteParrot 的頭像
CuteParrot

馴龍窩

CuteParrot 發表在 痞客邦 留言(0) 人氣(13)