Gestenerkennung Server

Dieser Teil ist nur optional und noch in Entwicklung und ist nicht als Zielsetzung für das ursprüngliche Projekt anzusehen.

Achtung: Hier werden nur die neuen Teile, die für die Client-Server Verbindung (Auswertung durch Backend) verwendet werden, kommentiert. Die restlichen Funktionen sind in der lokalen Gestenerkennung ausführlich kommentiert.

Sie müssen wie auch für die lokale Gestenerkennung im Terminal mittels "pip install" die Bibliotheken Socket und Pickle installieren.

import cv2 
import mediapipe as mp  
import numpy as np  
import time 
import socket  # Bibliothek für Datenübertragung über Netzwerke
import pickle  # Bibliothek zum übertragen von Python-Objekten, wie z.B. einem Frame

# Initialisierung von Mediapipe Hands
mp_haende = mp.solutions.hands
hand = mp_haende.Hands(max_num_hands=1)
mp_markierung = mp.solutions.drawing_utils

# Initialisierung von Variablen für Handbewegungen
lr = 0
ou = 0
vh = 0
zero = 0
aktive_bewegung = False
alte_pos = None

# Schwellwerte für Bewegungen
min_bewegung = 0.005
fingerabstand_geschlossen = 0.075

# Reverse-Proxy, der sich speichert welcher Client mit dem Server verbunden ist, damit die Daten an einen Client ohne feste IP / Port zurückgeschickt werden können.
def start_proxy(localport, remotehost, remoteport):
    serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  # Erstellen eines TCP-Sockets für die Übertragung der Daten
    serversocket.bind(('0.0.0.0', localport))  # Binden des Sockets an alle IP-Adressen und den lokalen Port
    serversocket.listen(1)  # Prüft auf eine eingehende Verbindungen
    print(f'Proxy listening on port {localport}')

    while True:
        clientsocket, client_address = serversocket.accept()  # Akzeptieren einer eingehenden Verbindung
        print(f'Connection from {client_address}')
        handle_client(clientsocket, remotehost, remoteport)  # Weiterleiten der Verbindung an handle_client

# Wenn der Reverse-Proxy erfolgreich eine Verbindung mit einem Client hergestellt hat, empfängt er Daten vom Client, verarbeitet sie und schickt sie mittels dieser Methode und vorher festgelegtem Reverse-Proxy zurück
def handle_client(clientsocket, remotehost, remoteport):
    remotesocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  # Erstellen eines TCP-Sockets
    remotesocket.connect((remotehost, remoteport))  # Verbinden mit dem Remote-Server 

    while True:
        try:
            # Empfängt einen einzelnen Frame vom Client und speichert ihn in der Variable frame
            received = clientsocket.recv(4096)
            frame = pickle.loads(received)  # Zum senden wurde der Frame in Datenpackte geteilt, welche nun wieder zusammengesetzt werden und dann als ganzes in die Variable frame gespeichert werden.

            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)  
            erkennung = hand.process(frame_rgb)  

            data = [] 

            if erkennung.multi_hand_landmarks:  
                for hand_punkte in erkennung.multi_hand_landmarks:
                    mp_markierung.draw_landmarks(frame, hand_punkte, mp_haende.HAND_CONNECTIONS) 

                    handgelenk_cords = hand_punkte.landmark[mp_haende.HandLandmark.WRIST]
                    aktuelle_pos = np.array([handgelenk_cords.x, handgelenk_cords.y, handgelenk_cords.z])

                    daumen = hand_punkte.landmark[mp_haende.HandLandmark.THUMB_TIP]
                    zeigefinger = hand_punkte.landmark[mp_haende.HandLandmark.INDEX_FINGER_TIP]

                    daumen_zeigefinger_entfernung = np.linalg.norm(
                        np.array([daumen.x, daumen.y, daumen.z]) - np.array([zeigefinger.x, zeigefinger.y, zeigefinger.z])
                    )

                    geschlossen = daumen_zeigefinger_entfernung < fingerabstand_geschlossen  # Überprüfen, ob die Hand geschlossen ist

                    if alte_pos is not None: 
                        bewegung = aktuelle_pos - alte_pos
                        bewegungs_linie = np.linalg.norm(bewegung)

                        aktive_bewegung = bewegungs_linie > min_bewegung  # Überprüfen, ob die Bewegung signifikant ist

                        if aktive_bewegung:
                            lr = 2 if bewegung[0] > 0 else 1
                        else:
                            lr = 0

                        if aktive_bewegung:
                            ou = 2 if bewegung[1] > 0 else 1
                        else:
                            ou = 0

                        if aktive_bewegung:
                            vh = 2 if bewegung[2] > 0 else 1
                        else: 
                            vh = 0

                    geschlossenint = 1 if geschlossen else 2

                    data = [geschlossenint, 0, vh, ou, ou, lr] 

                alte_pos = aktuelle_pos  

            if not data:  # Überprüfen, ob das Array data initialisiert wurde, falls nicht ist die Bedingung wahr und die Schleife wird mittels break abgebrochen
                break

            clientsocket.sendall(pickle.dumps(data))  # Daten an den Client senden

        except Exception as e:
            print(f'Error: {e}')
            break  # Bei einem Fehler wird die Schleife beendet

    # Verbindung zum Client beenden
    clientsocket.close()
    remotesocket.close()

start_proxy(51000, 'server.tesy-robot.de', 80)

Last updated