# vision_publish_red_blob.py — Mac (Python 3)
# Largest-red-blob detection → MQTT command ("left" / "right" / "red" / "lost")
# Also publishes eyaw ∈ [-1,1] and normalized area for future PID use.
import cv2, numpy as np, time, json
import paho.mqtt.client as mqtt
import secrets # expects: mqtt_url, mqtt_username, mqtt_password
# ===== MQTT (HiveMQ Cloud or your broker) =====
BROKER = "860ad10a759d4944a8fb19d293ee23e1.s1.eu.hivemq.cloud"
PORT = 8883
USERNAME = "me352025"
PASSWORD = "cee0preK!"
TOPIC = "/ME35/motorControl"
# ===== Camera & Detection =====
CAM_INDEX = 0 # change to 1 if using iPhone/virtual cam
MIN_PIXELS = 800 # min contour area to consider "seen" (adjust as needed)
BAND_PX = 250 # deadband around center (pixels) → "red" when inside band
LOOP_DELAY = 0.25 #5 FPS
# Morphology kernel
KERNEL = np.ones((5, 5), np.uint8)
# ----- MQTT setup -----
client = mqtt.Client(client_id="mac--pub", clean_session=True)
client.username_pw_set(USERNAME, PASSWORD)
client.tls_set() # use system CA
client.tls_insecure_set(False)
client.connect(BROKER, PORT, keepalive=30)
client.loop_start()
# ----- Camera -----
cam = cv2.VideoCapture(CAM_INDEX, cv2.CAP_AVFOUNDATION)
if not cam.isOpened():
cam = cv2.VideoCapture(CAM_INDEX)
if not cam.isOpened():
raise SystemExit(f"Error: Could not open camera index {CAM_INDEX}")
try:
while True:
ok, frame = cam.read()
if not ok:
print("Error: Failed to capture frame")
break
h, w = frame.shape[:2]
frame_center = w // 2
# --- Red extraction: R - G - B ---
b, g, r = cv2.split(frame)
red_filtered = cv2.subtract(r, g)
red_filtered = cv2.subtract(red_filtered, b)
# --- Threshold to binary mask ---
_, red_mask = cv2.threshold(red_filtered, 50, 255, cv2.THRESH_BINARY)
# --- Morphological cleanup ---
red_mask = cv2.morphologyEx(red_mask, cv2.MORPH_OPEN, KERNEL)
red_mask = cv2.morphologyEx(red_mask, cv2.MORPH_CLOSE, KERNEL)
# --- Find largest contour ---
contours, _ = cv2.findContours(red_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
mode = "lost"
eyaw = None
area_norm = 0.0
result = frame.copy()
if contours:
biggest = max(contours, key=cv2.contourArea)
area = float(cv2.contourArea(biggest))
if area > MIN_PIXELS:
x, y, bw, bh = cv2.boundingRect(biggest)
cv2.rectangle(result, (x, y), (x + bw, y + bh), (255, 0, 0), 2)
cv2.drawContours(result, [biggest], -1, (0, 255, 0), 2)
M = cv2.moments(biggest)
if M["m00"] != 0:
cx = int(M["m10"] / M["m00"])
cy = int(M["m01"] / M["m00"])
cv2.circle(result, (cx, cy), 7, (255, 255, 255), -1)
# Decide discrete mode by band
if cx < frame_center - BAND_PX:
mode = "left"
elif cx > frame_center + BAND_PX:
mode = "right"
else:
mode = "red"
# Normalized lateral error for PID (optional)
eyaw = (cx - frame_center) / (w / 2.0)
# Normalized area as distance proxy
area_norm = area / float(w * h)
# HUD text
cv2.putText(result, f"Area: {int(area)} eyaw: {0 if eyaw is None else eyaw:.2f}",
(x, max(20, y - 10)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
# Center lines / band
cv2.line(result, (frame_center, 0), (frame_center, h), (0, 255, 255), 1)
cv2.line(result, (frame_center - BAND_PX, 0), (frame_center - BAND_PX, h), (255, 255, 0), 1)
cv2.line(result, (frame_center + BAND_PX, 0), (frame_center + BAND_PX, h), (255, 255, 0), 1)
# --- Publish decision ---
payload = {"mode": mode, "eyaw": eyaw, "area": area_norm, "ts": time.time()}
client.publish(TOPIC, json.dumps(payload), qos=0, retain=False)
print("Sent:", payload)
# --- Visualization ---
cv2.imshow("Original Camera", frame)
cv2.imshow("Red Mask", red_mask)
cv2.imshow("Biggest Red Object", result)
if (cv2.waitKey(1) & 0xFF) == ord('q'):
break
time.sleep(LOOP_DELAY)
finally:
cam.release()
cv2.destroyAllWindows()
client.loop_stop()
client.disconnect()
# esp32_listen_red.py — MicroPython on Robo ESP32
import network, time, json
from machine import Pin, PWM
from umqtt.simple import MQTTClient
import secrets
from motordriverfollow import Motor, Robot # <-- keep your motor driver file unchanged
# Motor1 = Motor(14, 27, 25, 26) # Right
# Motor2 = Motor(12, 13, 21, 22)
ourRobot = Robot(14, 27, 25, 26, 13, 12, 21, 22)
# m1a, m1b, m1e1, m1e2, m2a, m2b, m2e1, m2e2):
class BuzzerListener:
def __init__(self):
# Wi-Fi (open network, no password)
self.SSID = secrets.SSID # e.g. "Tufts_Wireless"
# MQTT settings
self.MQTT_BROKER = secrets.mqtt_url
self.MQTT_PORT = 8883
self.MQTT_USERNAME = secrets.mqtt_username
self.MQTT_PASSWORD = secrets.mqtt_password
self.TOPIC_SUB = "/ME35/motorControl"
self.CLIENT_ID = "esp32-buzzer"
# Buzzer (optional)
self.BUZZER_PIN = 23
self.buzzer = PWM(Pin(self.BUZZER_PIN), freq=2000, duty=0)
self.client = None
self.last_ping_ms = time.ticks_ms()
# ---------------- Wi-Fi ----------------
def connect_wifi(self):
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
print("Connecting to WiFi:", self.SSID)
wlan.connect(self.SSID) # no password
while not wlan.isconnected():
time.sleep_ms(300)
print("WiFi connected! IP:", wlan.ifconfig()[0])
# ---------------- MQTT ----------------
def mqtt_connect(self):
self.client = MQTTClient(
client_id=self.CLIENT_ID,
server=self.MQTT_BROKER,
port=self.MQTT_PORT,
user=self.MQTT_USERNAME,
password=self.MQTT_PASSWORD,
ssl=True,
ssl_params={'server_hostname': self.MQTT_BROKER}
)
self.client.set_callback(self.on_message)
self.client.connect()
self.client.subscribe(self.TOPIC_SUB)
print("Subscribed to", self.TOPIC_SUB)
def mqtt_reconnect(self):
try:
if self.client:
try:
self.client.disconnect()
except:
pass
self.mqtt_connect()
except Exception as e:
print("Re-connect failed:", e)
def on_message(self, topic, msg):
"""Called when a message arrives on /ME35/motorControl"""
try:
data = json.loads(msg)
except Exception:
return
mode = str(data.get("mode", "")).lower().strip()
print(mode)
if mode == "red":
print("🔴 Red detected — move!")
ourRobot.robotForward()
elif mode == "left":
print("Left detected")
ourRobot.robotLeft()
elif mode == "right":
print("Right detected")
ourRobot.robotRight()
elif mode == "lost":
print("lost detected")
ourRobot.robotStopboo()
# ---------------- Main loop ----------------
def run(self):
self.connect_wifi()
self.mqtt_connect()
print("Ready. Waiting for MQTT messages…")
while True:
try:
self.client.check_msg() # non-blocking
except Exception as e:
print("MQTT error:", e)
self.mqtt_reconnect()
# Keep TLS alive (prevents the 20–30s dropout)
now = time.ticks_ms()
if time.ticks_diff(now, self.last_ping_ms) > 25000:
try:
self.client.ping()
except Exception as e:
print("Ping failed:", e)
self.mqtt_reconnect()
self.last_ping_ms = now
time.sleep_ms(50)
# --- main ---
BuzzerListener().run()