加上可以用滑鼠左鍵拉偵測ROI範圍
執行
python ai_face_stream_native-v4.py
| import os import sys import time import numpy as np import cv2 import threading import queue # --- 0. 語音功能設定 (TTS) --- speech_queue = queue.Queue() is_face_present = False # 記錄目前是否有人臉的狀態 (狀態機) last_face_time = 0 # 記錄最後一次看到人臉的時間 FACE_TIMEOUT = 3.0 # 離開畫面超過 3 秒才重置狀態 def tts_worker(): """獨立的語音執行緒,避免阻塞影像串流""" if os.name == 'nt': # Windows 環境:直接使用原生的 SAPI5,避開 pyttsx3 的卡死 Bug try: import pythoncom import win32com.client # 必須在 Thread 中初始化 COM pythoncom.CoInitialize() speaker = win32com.client.Dispatch("SAPI.SpVoice") print("✅ 成功載入 Windows 原生語音引擎 (win32com)") while True: text = speech_queue.get() if text is None: break print(f"🗣️ [語音播報] {text}") speaker.Speak(text) return except Exception as e: print(f"⚠️ win32com 語音初始化失敗: {e},將嘗試備用方案...") # 備用方案:pyttsx3 try: import pyttsx3 except ImportError: print("❌ 找不到語音模組,語音功能停用。") return while True: text = speech_queue.get() if text is None: break print(f"🗣️ [語音播報] {text}") try: # 每次發聲前重新 init,發聲完後銷毀,避免 runAndWait 卡死 engine = pyttsx3.init() engine.say(text) engine.runAndWait() except Exception as e: print(f"TTS 錯誤: {e}") # 啟動語音執行緒 tts_thread = threading.Thread(target=tts_worker, daemon=True) tts_thread.start() # --- 1. Windows 環境設定 --- if os.name == 'nt': gst_root = r"C:\Program Files\gstreamer\1.0\msvc_x86_64" path_bin = os.path.join(gst_root, "bin") path_plugins = os.path.join(gst_root, "lib", "gstreamer-1.0") if os.path.exists(path_bin): os.add_dll_directory(path_bin) os.environ['PATH'] = path_bin + ";" + os.environ['PATH'] if os.path.exists(path_plugins): os.environ['GST_PLUGIN_PATH'] = path_plugins try: import gi gi.require_version('Gst', '1.0') from gi.repository import Gst, GLib except ImportError: print("❌ GStreamer 載入失敗") sys.exit(1) Gst.init(None) # --- 2. 設定接收端 IP --- RECEIVER_IP = "127.0.0.1" RECEIVER_PORT = 5000 # --- 3. 建立發送管線 (Sender Pipeline) --- sender_cmd = f""" appsrc name=mysource format=3 is-live=True ! video/x-raw,format=BGR,width=1280,height=720,framerate=30/1 ! videoconvert ! openh264enc complexity=0 bitrate=3000000 ! rtph264pay config-interval=1 pt=96 ! udpsink host={RECEIVER_IP} port={RECEIVER_PORT} sync=false """ print("建立發送管線...") try: sender_pipeline = Gst.parse_launch(sender_cmd) sender_src = sender_pipeline.get_by_name("mysource") sender_pipeline.set_state(Gst.State.PLAYING) print(f"📡 串流發射準備就緒 -> {RECEIVER_IP}:{RECEIVER_PORT}") except Exception as e: print(f"❌ 發送管線建立失敗: {e}") sys.exit(1) # --- 4. 載入 AI 模型 --- protoPath = "deploy.prototxt" modelPath = "res10_300x300_ssd_iter_140000.caffemodel" if not os.path.exists(protoPath) or not os.path.exists(modelPath): print("❌ 找不到模型檔案") sys.exit(1) net = cv2.dnn.readNetFromCaffe(protoPath, modelPath) print("✅ AI 模型載入成功!") prev_frame_time = 0 # --- 定義滑鼠拖曳 ROI 的變數與回呼函數 --- drawing = False # 記錄是否正在拖曳滑鼠 ix, iy = -1, -1 # 初始預設的偵測區域 ROI_X1, ROI_Y1 = 320, 120 ROI_X2, ROI_Y2 = 960, 600 def draw_roi(event, x, y, flags, param): """處理滑鼠事件,供使用者自定義 ROI 範圍""" global ix, iy, drawing, ROI_X1, ROI_Y1, ROI_X2, ROI_Y2 # 由於在畫面上顯示的 img (small_frame) 被縮小為 0.5 倍 # 這裡必須將滑鼠坐標 x, y 乘以 2 才能對應回原始 1280x720 的影像座標 real_x = x * 2 real_y = y * 2 if event == cv2.EVENT_LBUTTONDOWN: drawing = True ix, iy = real_x, real_y ROI_X1, ROI_Y1 = ix, iy ROI_X2, ROI_Y2 = ix, iy elif event == cv2.EVENT_MOUSEMOVE: if drawing: ROI_X2, ROI_Y2 = real_x, real_y elif event == cv2.EVENT_LBUTTONUP: drawing = False ROI_X2, ROI_Y2 = real_x, real_y window_initialized = False # 紀錄控制面板是否已初始化 UI_WINDOW_NAME = 'AI Face Stream' # 統一的 UI 視窗名稱 # --- 5. 核心處理函數 --- def on_new_sample(sink): global prev_frame_time, is_face_present, last_face_time, window_initialized global ROI_X1, ROI_Y1, ROI_X2, ROI_Y2, drawing sample = sink.emit("pull-sample") buf = sample.get_buffer() caps = sample.get_caps() height = caps.get_structure(0).get_value('height') width = caps.get_structure(0).get_value('width') result, mapinfo = buf.map(Gst.MapFlags.READ) if result: new_frame_time = time.time() fps = 1 / (new_frame_time - prev_frame_time) if (new_frame_time - prev_frame_time) > 0 else 0 prev_frame_time = new_frame_time # 轉成圖片 img_rgb = np.ndarray((height, width, 3), buffer=mapinfo.data, dtype=np.uint8) img_bgr = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2BGR) # --- 初始化整合介面 (只執行一次) --- if not window_initialized: cv2.namedWindow(UI_WINDOW_NAME, cv2.WINDOW_NORMAL) # 留一點高度空間給滑桿 cv2.resizeWindow(UI_WINDOW_NAME, 800, 700) # Zoom: 10~30 代表 1.0x ~ 3.0x cv2.createTrackbar('Zoom (x10)', UI_WINDOW_NAME, 10, 30, lambda x: None) # 銳利度: 0~10 代表 0 ~ 1.0 的權重 cv2.createTrackbar('Sharpness', UI_WINDOW_NAME, 0, 10, lambda x: None) # 飽和度: 0~200 代表 0.0x ~ 2.0x (100 為正常) cv2.createTrackbar('Saturation %', UI_WINDOW_NAME, 100, 200, lambda x: None) # RGB 調整: 0~200 代表 0.0x ~ 2.0x (100 為正常) cv2.createTrackbar('R %', UI_WINDOW_NAME, 100, 200, lambda x: None) cv2.createTrackbar('G %', UI_WINDOW_NAME, 100, 200, lambda x: None) cv2.createTrackbar('B %', UI_WINDOW_NAME, 100, 200, lambda x: None) # 註冊滑鼠回呼事件 cv2.setMouseCallback(UI_WINDOW_NAME, draw_roi) window_initialized = True # --- 讀取控制面板數值 --- try: zoom_val = max(1.0, cv2.getTrackbarPos('Zoom (x10)', UI_WINDOW_NAME) / 10.0) sharp_val = cv2.getTrackbarPos('Sharpness', UI_WINDOW_NAME) / 10.0 sat_val = cv2.getTrackbarPos('Saturation %', UI_WINDOW_NAME) / 100.0 r_val = cv2.getTrackbarPos('R %', UI_WINDOW_NAME) / 100.0 g_val = cv2.getTrackbarPos('G %', UI_WINDOW_NAME) / 100.0 b_val = cv2.getTrackbarPos('B %', UI_WINDOW_NAME) / 100.0 except cv2.error: # 避免視窗尚未完全就緒時讀取報錯 zoom_val, sharp_val, sat_val = 1.0, 0.0, 1.0 r_val, g_val, b_val = 1.0, 1.0, 1.0 # --- 影像處理 --- # 1. Zoom (縮放:裁切畫面正中央後放大回原尺寸) if zoom_val > 1.0: new_w, new_h = int(width / zoom_val), int(height / zoom_val) left = (width - new_w) // 2 top = (height - new_h) // 2 cropped = img_bgr[top:top+new_h, left:left+new_w] img_bgr = cv2.resize(cropped, (width, height)) # 2. RGB 調整 (乘上對應的比例) if r_val != 1.0 or g_val != 1.0 or b_val != 1.0: img_bgr = np.clip(img_bgr * np.array([b_val, g_val, r_val]), 0, 255).astype(np.uint8) # 3. Saturation (飽和度) if sat_val != 1.0: hsv = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2HSV).astype(np.float32) hsv[:, :, 1] *= sat_val hsv[:, :, 1] = np.clip(hsv[:, :, 1], 0, 255) img_bgr = cv2.cvtColor(hsv.astype(np.uint8), cv2.COLOR_HSV2BGR) # 4. Sharpness (銳利度) if sharp_val > 0: kernel = np.array([[-1, -1, -1], [-1, 9, -1], [-1, -1, -1]]) sharpened = cv2.filter2D(img_bgr, -1, kernel) img_bgr = cv2.addWeighted(img_bgr, 1.0 - sharp_val, sharpened, sharp_val, 0) # --- 處理與檢查 ROI 邊界防呆 --- # 確保 x1 一定小於 x2,以應對由右下往左上反向拖曳的狀況 cur_x1, cur_x2 = min(ROI_X1, ROI_X2), max(ROI_X1, ROI_X2) cur_y1, cur_y2 = min(ROI_Y1, ROI_Y2), max(ROI_Y1, ROI_Y2) # 限制範圍不能超出實際影像大小 cur_x1 = max(0, min(cur_x1, width - 1)) cur_y1 = max(0, min(cur_y1, height - 1)) cur_x2 = max(0, min(cur_x2, width - 1)) cur_y2 = max(0, min(cur_y2, height - 1)) # 避免畫出的框太小或變為單點導致程式或 AI 崩潰 (最少 10x10) if cur_x2 - cur_x1 < 10: cur_x2 = cur_x1 + 10 if cur_x1 + 10 < width else cur_x2 if cur_y2 - cur_y1 < 10: cur_y2 = cur_y1 + 10 if cur_y1 + 10 < height else cur_y2 roi_w = cur_x2 - cur_x1 roi_h = cur_y2 - cur_y1 # 裁切出特定區域 (ROI) roi_img = img_bgr[cur_y1:cur_y2, cur_x1:cur_x2] # --- AI 偵測 (針對 ROI) --- blob = cv2.dnn.blobFromImage(cv2.resize(roi_img, (300, 300)), 1.0, (300, 300), (104.0, 177.0, 123.0)) net.setInput(blob) detections = net.forward() face_count = 0 # 畫出偵測區域的框線 # 如果使用者正在拖曳中,框線換成橘黃色,放開後變藍色 roi_color = (0, 165, 255) if drawing else (255, 0, 0) cv2.rectangle(img_bgr, (cur_x1, cur_y1), (cur_x2, cur_y2), roi_color, 2) cv2.putText(img_bgr, "Detection Area", (cur_x1, cur_y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, roi_color, 2) for i in range(0, detections.shape[2]): confidence = detections[0, 0, i, 2] if confidence > 0.6: face_count += 1 # 計算在 ROI 內的座標 box = detections[0, 0, i, 3:7] * np.array([roi_w, roi_h, roi_w, roi_h]) (startX, startY, endX, endY) = box.astype("int") # 將 ROI 的座標轉換回「原始全畫面」的座標,才能畫在正確位置 startX += cur_x1 startY += cur_y1 endX += cur_x1 endY += cur_y1 text = f"{confidence * 100:.2f}%" y = startY - 10 if startY - 10 > 10 else startY + 10 cv2.rectangle(img_bgr, (startX, startY), (endX, endY), (0, 255, 0), 2) cv2.putText(img_bgr, text, (startX, y), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) current_time = time.time() # --- 判斷與顯示狀態 (狀態機防閃爍機制) --- if face_count > 0: status_text = f"Status: Face Detected (Count: {face_count})" status_color = (0, 255, 0) # 綠色 # 狀態機:如果原本沒有人臉,現在偵測到了 -> 觸發語音 if not is_face_present: is_face_present = True print(">>> 🟢 狀態切換:進入 ROI,偵測到人臉!") speech_queue.put("偵測到人臉") # 更新最後看到人臉的時間 last_face_time = current_time else: status_text = "Status: No Face" status_color = (0, 0, 255) # 紅色 # 狀態機:如果原本有人臉,且離開畫面已經超過 FACE_TIMEOUT 秒 -> 重置狀態 if is_face_present and (current_time - last_face_time > FACE_TIMEOUT): is_face_present = False print(f">>> 🔴 狀態切換:人臉已離開 ROI 超過 {FACE_TIMEOUT} 秒,狀態重置!") # 顯示狀態、人臉個數與操作提示 cv2.putText(img_bgr, status_text, (20, 90), cv2.FONT_HERSHEY_SIMPLEX, 1.0, status_color, 3) cv2.putText(img_bgr, f"FPS: {int(fps)}", (20, 50), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 255, 255), 3) cv2.putText(img_bgr, "Tip: Drag mouse to draw Detection Area", (20, 130), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (200, 200, 200), 2) # --- 發送串流 --- data = img_bgr.tobytes() gst_buffer = Gst.Buffer.new_allocate(None, len(data), None) gst_buffer.fill(0, data) gst_buffer.pts = buf.pts gst_buffer.dts = buf.dts gst_buffer.duration = buf.duration sender_src.emit("push-buffer", gst_buffer) # 本地顯示 (整合在同一個 UI) # 注意: 我們在這裡將畫面縮小為 0.5,所以滑鼠回呼事件需要乘以 2 small_frame = cv2.resize(img_bgr, None, fx=0.5, fy=0.5) cv2.imshow(UI_WINDOW_NAME, small_frame) cv2.waitKey(1) buf.unmap(mapinfo) return Gst.FlowReturn.OK # --- 6. 建立攝影機接收管線 --- cmd = """ mfvideosrc device-index=0 ! decodebin ! videoconvert ! videoscale ! video/x-raw, width=1280, height=720, format=RGB ! appsink name=mysink emit-signals=True drop=True """ pipeline = Gst.parse_launch(cmd) appsink = pipeline.get_by_name("mysink") appsink.connect("new-sample", on_new_sample) print("🚀 系統啟動中... (按 Ctrl+C 停止)") pipeline.set_state(Gst.State.PLAYING) loop = GLib.MainLoop() try: loop.run() except KeyboardInterrupt: print("停止中...") pipeline.set_state(Gst.State.NULL) sender_pipeline.set_state(Gst.State.NULL) speech_queue.put(None) # 關閉語音執行緒 cv2.destroyAllWindows() |


留言功能已依作者設定調整顯示方式