File: gesture_server_mqtt.py - Simpan di server
import paho.mqtt.client as mqtt
import json
import time
# MQTT Configuration
MQTT_BROKER = "broker.hivemq.com"
MQTT_PORT = 1883
MQTT_TOPIC = "PerintahGestureTubuh"
# Gesture to Hex mapping
GESTURE_HEX_CODES = {
0: "0x01", 1: "0x02", 2: "0x03", 3: "0x04", 4: "0x05", 5: "0x06",
6: "0x07", 7: "0x08", 8: "0x09", 9: "0x0A", 10: "0x0B", 11: "0x0C",
# ... lengkapi sampai 63
63: "0x40"
}
GESTURE_NAMES = {
0: "Standing_Neutral", 1: "Arms_Up", 2: "Arms_Out", 3: "Right_Arm_Up",
# ... lengkapi sampai 63
63: "Celebration"
}
class MQTTGestureClient:
def __init__(self):
self.client = mqtt.Client()
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
def on_connect(self, client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
client.subscribe(MQTT_TOPIC)
else:
print(f"Failed to connect, return code {rc}")
def on_message(self, client, userdata, msg):
try:
payload = msg.payload.decode()
print(f"Received MQTT message: {payload}")
# Process gesture command
if payload.startswith("$PerintahGesture"):
parts = payload.split(",")
if len(parts) == 3:
gesture_name = parts[1]
hex_code = parts[2]
self.execute_gesture_command(gesture_name, hex_code)
except Exception as e:
print(f"Error processing MQTT message: {e}")
def execute_gesture_command(self, gesture_name, hex_code):
"""Execute machine command based on gesture"""
print(f"Executing gesture: {gesture_name} with hex code: {hex_code}")
# Add your machine control logic here
commands = {
"0x01": "MACHINE_STANDBY",
"0x02": "ARM_UP_ALL",
"0x03": "ARM_EXTEND_ALL",
# ... tambahkan semua command
}
command = commands.get(hex_code, "UNKNOWN_COMMAND")
print(f"Machine Command: {command}")
def send_gesture_command(self, gesture_id, confidence):
"""Send gesture command via MQTT"""
if confidence > 0.7: # Minimum confidence threshold
hex_code = GESTURE_HEX_CODES.get(gesture_id, "0x00")
gesture_name = GESTURE_NAMES.get(gesture_id, "Unknown")
message = f"$PerintahGesture,{gesture_name},{hex_code}"
self.client.publish(MQTT_TOPIC, message)
print(f"MQTT Sent: {message}")
def connect(self):
self.client.connect(MQTT_BROKER, MQTT_PORT, 60)
self.client.loop_start()
def disconnect(self):
self.client.loop_stop()
self.client.disconnect()
# Usage example
if __name__ == "__main__":
mqtt_client = MQTTGestureClient()
mqtt_client.connect()
try:
while True:
# Simulate gesture detection
time.sleep(5)
except KeyboardInterrupt:
mqtt_client.disconnect()