Faces/main.py

211 lines
7.6 KiB
Python
Raw Normal View History

2024-03-18 12:08:46 +00:00
import asyncio
import websockets
import ffmpeg
import json
import time
import random
import os
import cv2
import numpy as np
from threading import Thread
from queue import Queue, Empty
from threading import Lock
import pytesseract
import face_recognition
template = cv2.imread('img/img1.png', 0)
#template = cv2.imread('img/license_small.png', 0)
directoryToSave = "C:/Users/ASUS/Desktop/fishrungame/NEWW/saved"
#directoryToSave = "/home/mephi1984/law-video/law-video/saved"
ffmpegPath = "C:\\Users\\ASUS\\Desktop\\fishrungame\\NEWW\\ffm\\ffmpeg-2024-02-19-git-0c8e64e268-full_build\\bin\\ffmpeg.exe"
#ffmpegPath = "/usr/bin/ffmpeg"
pytesseract.pytesseract.tesseract_cmd = r'C:\Users\ASUS\Desktop\fishrungame\treadBibliotek\tesseract.exe'
if not os.path.exists(directoryToSave):
os.makedirs(directoryToSave)
def detect_rectangle(frame, template):
result = cv2.matchTemplate(frame, template, cv2.TM_CCOEFF_NORMED)
_, max_val, _, max_loc = cv2.minMaxLoc(result)
# Check if the maximum correlation coefficient is above the threshold
print(max_val)
if max_val >= 0.2:
# Define the rectangle area
h, w = template.shape[:2]
top_left = max_loc
bottom_right = (top_left[0] + w, top_left[1] + h)
return True, top_left, bottom_right
return False, None, None
def process_image(image, top_left, bottom_right):
# Cut part with ИДЕНТИФИКАЦИОННАЯ КАРТА
card_part = image[top_left[1]:bottom_right[1], top_left[0]:bottom_right[0]]
# Применение OCR
custom_config = r'--oem 3 --psm 6 -l kir+eng+rus'
text = pytesseract.image_to_string(card_part, config=custom_config)
return text, card_part
def handle_output(out, frame_lock, frame_container):
print("handle_output start")
try:
while True:
in_bytes = out.read(640 * 480 * 3)
if not in_bytes:
break
with frame_lock:
frame_container['received_frame'] = np.frombuffer(in_bytes, dtype='uint8').reshape((480, 640, 3))
finally:
print("finally")
async def save_webm_stream(websocket):
json_data_received = False
print("save_webm_stream hello")
random_number = random.randint(0, 999)
current_video_timestamp = str(round(time.time() * 1000)) + str(random_number).zfill(3)
process = (
ffmpeg
.input('pipe:', format='webm')
.output('pipe:', format='rawvideo', pix_fmt='bgr24')
.run_async(pipe_stdin=True, pipe_stdout=True, cmd=ffmpegPath)
)
frame_lock = Lock()
frame_container = {'received_frame': None}
thread = Thread(target=handle_output, args=(process.stdout,frame_lock, frame_container))
thread.start()
stage = 1
count = 0
last_face_location = None
last_face_frame = None
try:
async for message in websocket:
if not json_data_received:
json_data, _, binary_data = message.partition('\0')
if json_data:
try:
json_object = json.loads(json_data)
print("JSON data received:", json_object)
except json.JSONDecodeError:
print("Invalid JSON")
json_data_received = True
if binary_data:
process.stdin.write(message)
count += len(message)
else:
process.stdin.write(message)
count += len(message)
with frame_lock:
local_frame = frame_container['received_frame']
if local_frame is None:
continue
if stage == 1:
gray_frame = cv2.cvtColor(local_frame, cv2.COLOR_BGR2GRAY)
result = cv2.matchTemplate(gray_frame, template, cv2.TM_CCOEFF_NORMED)
_, max_val, _, max_loc = cv2.minMaxLoc(result)
await websocket.send(json.dumps({"stage": 1, "match": max_val}))
if max_val > 0.2:
success, top_left, bottom_right = detect_rectangle(gray_frame, template)
if success:
# Вырезаем прямоугольный объект по найденным координатам
object_image = local_frame[top_left[1]:bottom_right[1], top_left[0]:bottom_right[0]]
# Сохраняем вырезанный объект
cv2.imwrite('object.png', object_image)
faces_locations = face_recognition.face_locations(local_frame)
if len(faces_locations) == 1:
top, right, bottom, left = faces_locations[0] # Распаковываем координаты лица
face_image = local_frame[top:bottom,
left:right] # Извлекаем лицо с использованием координат
cv2.imwrite('output1.png', local_frame)
cv2.imwrite('output2.png', face_image)
lact_face_location = faces_locations
last_face_frame = local_frame.copy()
text, _ = process_image(object_image, (0, 0),
(object_image.shape[1], object_image.shape[0]))
if "идентификационная карта" in text.lower():
stage = 2
else:
os.remove('object.png')
continue
elif stage == 2:
faces_locations = face_recognition.face_locations(local_frame)
face_count = len(faces_locations)
face_width1 = 0
face_width2 = 0
if face_count == 2:
top1, right1, bottom1, left1 = faces_locations[0]
top2, right2, bottom2, left2 = faces_locations[1]
face_width1 = right1 - left1
face_width2 = right2 - left2
if (face_width1 > 30) and (face_width2 > 30):
face_encodings = [face_recognition.face_encodings(local_frame, [face_location])[0] for face_location in
faces_locations]
original_face_encoding = face_recognition.face_encodings(last_face_frame, lact_face_location)
results = face_recognition.compare_faces(face_encodings, original_face_encoding[0])
if all(results):
cv2.imwrite('face_all.png', local_frame)
for i, (top, right, bottom, left) in enumerate(faces_locations):
cv2.imwrite(f'face_{i + 1}.png', local_frame[top:bottom, left:right])
await websocket.send(json.dumps({"stage": 3}))
break
await websocket.send(json.dumps({"stage": 2, "faceCount": face_count, "faceWidth1" : face_width1, "faceWidth2" : face_width2}))
except websockets.exceptions.ConnectionClosed:
print("Connection Closed")
process.stdin.close()
#await process.wait()
print("Video server go!")
start_server = websockets.serve(save_webm_stream, "0.0.0.0", 3001)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()