Chapter 14: Human-Robot Interaction
What You'll Learn
- Natural Language Processing for Robots
- Gesture Recognition and Generation
- Social Cognition and Theory of Mind
- Collaborative Task Planning
- Emotional Intelligence and Expression
- Ethical Considerations in HRI
Introduction
Human-Robot Interaction (HRI) focuses on creating robots that can effectively and naturally interact with humans. For humanoid robots, this involves not only understanding human communication but also responding in socially appropriate ways. This chapter explores the technologies and approaches that enable seamless human-robot collaboration.
1. Natural Language Processing for Robots
1.1 Speech Recognition and Understanding
import speech_recognition as sr
import spacy
from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification
import torch
import numpy as np
class SpeechProcessor:
def __init__(self):
self.recognizer = sr.Recognizer()
self.microphone = sr.Microphone()
self.nlp = spacy.load("en_core_web_lg")
# Intent classification model
self.intent_classifier = pipeline(
"text-classification",
model="microsoft/DialoGPT-medium"
)
# Initialize microphone
with self.microphone as source:
self.recognizer.adjust_for_ambient_noise(source)
def listen_for_speech(self, timeout=5, phrase_time_limit=10):
"""Listen for and recognize speech"""
try:
with self.microphone as source:
print("Listening...")
audio = self.recognizer.listen(
source,
timeout=timeout,
phrase_time_limit=phrase_time_limit
)
# Recognize speech
text = self.recognizer.recognize_google(audio)
print(f"Recognized: {text}")
return text
except sr.WaitTimeoutError:
print("Listening timeout")
return None
except sr.UnknownValueError:
print("Could not understand audio")
return None
except sr.RequestError as e:
print(f"Could not request results: {e}")
return None
def extract_intent_and_entities(self, text):
"""Extract intent and entities from speech text"""
if not text:
return None, []
# Process with spaCy
doc = self.nlp(text)
# Extract entities
entities = []
for ent in doc.ents:
entities.append({
'text': ent.text,
'label': ent.label_,
'start': ent.start_char,
'end': ent.end_char
})
# Extract intent (simplified)
intent = self.classify_intent(text)
# Extract keywords for robot actions
action_keywords = self.extract_action_keywords(doc)
return {
'intent': intent,
'entities': entities,
'action_keywords': action_keywords,
'text': text
}
def classify_intent(self, text):
"""Classify user intent from text"""
# Common intents for humanoid robots
intents = {
'greeting': ['hello', 'hi', 'hey', 'good morning', 'good evening'],
'request': ['please', 'can you', 'could you', 'would you', 'help'],
'question': ['what', 'where', 'when', 'why', 'how', 'who'],
'command': ['do', 'make', 'get', 'bring', 'take', 'move'],
'gratitude': ['thank', 'thanks', 'appreciate'],
'farewell': ['bye', 'goodbye', 'see you', 'good night']
}
text_lower = text.lower()
for intent, keywords in intents.items():
if any(keyword in text_lower for keyword in keywords):
return intent
return 'general'
def extract_action_keywords(self, doc):
"""Extract action-related keywords"""
action_keywords = []
# Action verbs
action_verbs = ['pick', 'place', 'move', 'bring', 'take', 'give', 'show', 'point', 'walk']
for token in doc:
if token.lemma_ in action_verbs:
action_keywords.append({
'word': token.text,
'lemma': token.lemma_,
'type': 'action'
})
# Objects
objects = ['cup', 'bottle', 'ball', 'book', 'phone', 'remote', 'keys']
for ent in doc.ents:
if ent.text.lower() in objects:
action_keywords.append({
'word': ent.text,
'type': 'object'
})
# Locations
locations = ['table', 'counter', 'floor', 'kitchen', 'living room', 'bedroom']
for ent in doc.ents:
if ent.text.lower() in locations:
action_keywords.append({
'word': ent.text,
'type': 'location'
})
return action_keywords
1.2 Dialogue Management
class DialogueManager:
def __init__(self):
self.context = {}
self.conversation_history = []
self.user_profile = {}
# Response templates
self.response_templates = {
'greeting': [
"Hello! How can I help you today?",
"Hi there! What can I do for you?",
"Greetings! How may I assist you?"
],
'request': [
"I'll be happy to help with that.",
"Of course! Let me take care of that for you.",
"Certainly! I can help you with that task."
],
'confirmation': [
"Just to confirm, you want me to {}?",
"So you'd like me to {}? Is that correct?",
"I understand you want me to {}?"
],
'clarification': [
"I'm not sure I understand. Could you clarify?",
"Could you please repeat that or provide more details?",
"I'm not sure what you mean. Can you explain differently?"
],
'completion': [
"Task completed! Is there anything else I can help with?",
"Done! What would you like me to do next?",
"Finished! How else can I assist you?"
]
}
def process_input(self, speech_result):
"""Process user input and generate response"""
if not speech_result:
return None
# Update context
self.context['last_intent'] = speech_result['intent']
self.context['last_entities'] = speech_result['entities']
self.context['last_actions'] = speech_result['action_keywords']
# Store in conversation history
self.conversation_history.append({
'user': speech_result['text'],
'intent': speech_result['intent'],
'entities': speech_result['entities'],
'timestamp': time.time()
})
# Generate response
response = self.generate_response(speech_result)
# Store response in history
self.conversation_history.append({
'robot': response,
'timestamp': time.time()
})
return response
def generate_response(self, speech_result):
"""Generate appropriate response based on input"""
intent = speech_result['intent']
if intent == 'greeting':
return self.select_template('greeting')
elif intent == 'request':
# Check if we have enough information
if self.validate_request(speech_result):
return self.select_template('request')
else:
return self.select_template('clarification')
elif intent == 'question':
return self.answer_question(speech_result)
elif intent == 'command':
return self.handle_command(speech_result)
elif intent == 'gratitude':
return "You're welcome! It's my pleasure to help."
elif intent == 'farewell':
return self.select_template('farewell')
else:
return self.select_template('clarification')
def validate_request(self, speech_result):
"""Validate if request has enough information"""
action_keywords = speech_result['action_keywords']
entities = speech_result['entities']
# Check if we have an action
has_action = any(kw['type'] == 'action' for kw in action_keywords)
# Check if we have an object
has_object = any(kw['type'] == 'object' for kw in action_keywords)
has_object_entity = any(ent['label'] in ['OBJECT', 'PRODUCT'] for ent in entities)
return has_action and (has_object or has_object_entity)
def select_template(self, category):
"""Select random response template"""
import random
return random.choice(self.response_templates[category])
def answer_question(self, speech_result):
"""Answer user questions"""
# This would integrate with a knowledge base or Q&A system
question_text = speech_result['text'].lower()
# Simple rule-based responses
if 'name' in question_text:
return "I'm a humanoid robot designed to assist and interact with humans."
elif 'help' in question_text:
return ("I can help you with various tasks like picking up objects, "
"moving items, providing information, and assisting with daily activities. "
"Just tell me what you need!")
elif 'how are you' in question_text:
return ("I'm functioning optimally! My systems are all running smoothly, "
"and I'm ready to assist you with any task.")
else:
return self.select_template('clarification')
def handle_command(self, speech_result):
"""Handle command instructions"""
action_keywords = speech_result['action_keywords']
# Extract action and object
action = None
obj = None
for kw in action_keywords:
if kw['type'] == 'action':
action = kw['lemma']
elif kw['type'] == 'object':
obj = kw['word']
if action and obj:
# Generate confirmation
return self.select_template('confirmation').format(f"{action} the {obj}")
else:
return self.select_template('clarification')
def get_conversation_summary(self):
"""Get summary of recent conversation"""
if not self.conversation_history:
return "No conversation history"
recent_turns = self.conversation_history[-10:] # Last 5 turns
summary = "Recent conversation:\n"
for turn in recent_turns:
if 'user' in turn:
summary += f"User: {turn['user']}\n"
else:
summary += f"Robot: {turn['robot']}\n"
return summary
2. Gesture Recognition and Generation
2.1 Gesture Recognition
import cv2
import mediapipe as mp
import numpy as np
class GestureRecognizer:
def __init__(self):
self.mp_hands = mp.solutions.hands
self.mp_pose = mp.solutions.pose
self.mp_drawing = mp.solutions.drawing_utils
# Initialize models
self.hands = self.mp_hands.Hands(
static_image_mode=False,
max_num_hands=2,
min_detection_confidence=0.5,
min_tracking_confidence=0.5
)
self.pose = self.mp_pose.Pose(
static_image_mode=False,
model_complexity=1,
enable_segmentation=False,
min_detection_confidence=0.5,
min_tracking_confidence=0.5
)
# Gesture definitions
self.gestures = {
'pointing': self.detect_pointing,
'waving': self.detect_waving,
'thumbs_up': self.detect_thumbs_up,
'open_palm': self.detect_open_palm,
'fist': self.detect_fist,
'peace': self.detect_peace
}
def recognize_gestures(self, frame):
"""Recognize gestures from video frame"""
# Convert BGR to RGB
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Process hand and pose
hand_results = self.hands.process(rgb_frame)
pose_results = self.pose.process(rgb_frame)
recognized_gestures = []
# Process hand gestures
if hand_results.multi_hand_landmarks:
for hand_idx, hand_landmarks in enumerate(hand_results.multi_hand_landmarks):
handedness = hand_results.multi_handedness[hand_idx].classification[0].label
# Draw hand landmarks
self.mp_drawing.draw_landmarks(
frame, hand_landmarks, self.mp_hands.HAND_CONNECTIONS
)
# Detect specific gestures
for gesture_name, detect_func in self.gestures.items():
if detect_func(hand_landmarks):
recognized_gestures.append({
'type': gesture_name,
'hand': handedness.lower(),
'confidence': 0.8,
'landmarks': hand_landmarks
})
# Process pose gestures
if pose_results.pose_landmarks:
self.mp_drawing.draw_landmarks(
frame, pose_results.pose_landmarks, self.mp_pose.POSE_CONNECTIONS
)
# Detect pose gestures
pose_gestures = self.detect_pose_gestures(pose_results.pose_landmarks)
recognized_gestures.extend(pose_gestures)
return frame, recognized_gestures
def detect_pointing(self, hand_landmarks):
"""Detect pointing gesture"""
# Get key landmarks
index_tip = hand_landmarks.landmark[8] # Index finger tip
index_mcp = hand_landmarks.landmark[5] # Index finger MCP
middle_tip = hand_landmarks.landmark[12] # Middle finger tip
middle_mcp = hand_landmarks.landmark[9] # Middle finger MCP
# Check if index finger is extended
index_extended = index_tip.y < index_mcp.y
# Check if middle finger is not extended
middle_not_extended = middle_tip.y > middle_mcp.y
# Check if other fingers are curled
ring_tip = hand_landmarks.landmark[16]
ring_mcp = hand_landmarks.landmark[13]
pinky_tip = hand_landmarks.landmark[20]
pinky_mcp = hand_landmarks.landmark[17]
ring_curled = ring_tip.y > ring_mcp.y
pinky_curled = pinky_tip.y > pinky_mcp.y
return index_extended and middle_not_extended and ring_curled and pinky_curled
def detect_waving(self, hand_landmarks):
"""Detect waving gesture (simplified)"""
# This would require temporal analysis
# For now, check if hand is open and raised
wrist = hand_landmarks.landmark[0]
middle_tip = hand_landmarks.landmark[12]
# Hand raised
hand_raised = wrist.y < middle_tip.y
# Open hand (simplified check)
index_tip = hand_landmarks.landmark[8]
middle_tip = hand_landmarks.landmark[12]
ring_tip = hand_landmarks.landmark[16]
pinky_tip = hand_landmarks.landmark[20]
open_hand = (abs(index_tip.y - middle_tip.y) < 0.1 and
abs(middle_tip.y - ring_tip.y) < 0.1 and
abs(ring_tip.y - pinky_tip.y) < 0.1)
return hand_raised and open_hand
def detect_thumbs_up(self, hand_landmarks):
"""Detect thumbs up gesture"""
thumb_tip = hand_landmarks.landmark[4]
thumb_mcp = hand_landmarks.landmark[2]
# Thumb extended upward
thumb_up = thumb_tip.y < thumb_mcp.y
# Other fingers curled
index_tip = hand_landmarks.landmark[8]
index_mcp = hand_landmarks.landmark[5]
index_curled = index_tip.y > index_mcp.y
return thumb_up and index_curled
def detect_open_palm(self, hand_landmarks):
"""Detect open palm gesture"""
# Check if all fingertips are extended
tips = [8, 12, 16, 20] # Index, middle, ring, pinky tips
mcps = [5, 9, 13, 17] # Corresponding MCP joints
all_extended = True
for tip, mcp in zip(tips, mcps):
if hand_landmarks.landmark[tip].y > hand_landmarks.landmark[mcp].y:
all_extended = False
break
return all_extended
def detect_fist(self, hand_landmarks):
"""Detect closed fist"""
# Check if all fingertips are below their respective MCPs
tips = [8, 12, 16, 20] # Index, middle, ring, pinky tips
mcps = [5, 9, 13, 17] # Corresponding MCP joints
all_curled = True
for tip, mcp in zip(tips, mcps):
if hand_landmarks.landmark[tip].y < hand_landmarks.landmark[mcp].y:
all_curled = False
break
return all_curled
def detect_peace(self, hand_landmarks):
"""Detect peace/peace sign gesture"""
index_tip = hand_landmarks.landmark[8]
index_mcp = hand_landmarks.landmark[5]
middle_tip = hand_landmarks.landmark[12]
middle_mcp = hand_landmarks.landmark[9]
ring_tip = hand_landmarks.landmark[16]
ring_mcp = hand_landmarks.landmark[13]
# Index and middle extended
index_extended = index_tip.y < index_mcp.y
middle_extended = middle_tip.y < middle_mcp.y
# Ring and pinky curled
ring_curled = ring_tip.y > ring_mcp.y
return index_extended and middle_extended and ring_curled
def detect_pose_gestures(self, pose_landmarks):
"""Detect full-body pose gestures"""
gestures = []
# Check for standing arms raised
left_shoulder = pose_landmarks.landmark[11]
left_wrist = pose_landmarks.landmark[15]
right_shoulder = pose_landmarks.landmark[12]
right_wrist = pose_landmarks.landmark[16]
arms_raised = (left_wrist.y < left_shoulder.y and
right_wrist.y < right_shoulder.y)
if arms_raised:
gestures.append({
'type': 'arms_raised',
'confidence': 0.7
})
return gestures
def get_gesture_direction(self, gesture_info, frame_shape):
"""Get direction of pointing gesture"""
if gesture_info['type'] != 'pointing':
return None
# Get index finger tip position
index_tip = gesture_info['landmarks'].landmark[8]
# Convert to pixel coordinates
h, w = frame_shape[:2]
px, py = int(index_tip.x * w), int(index_tip.y * h)
# Determine direction relative to center
center_x, center_y = w // 2, h // 2
dx = px - center_x
dy = py - center_y
# Determine primary direction
if abs(dx) > abs(dy):
direction = 'right' if dx > 0 else 'left'
else:
direction = 'down' if dy > 0 else 'up'
return direction, (px, py)
2.2 Gesture Generation
class GestureGenerator:
def __init__(self):
# Predefined gesture sequences
self.gesture_library = {
'wave': self.generate_wave_sequence,
'nod': self.generate_nod_sequence,
'shake_head': self.generate_shake_sequence,
'point': self.generate_point_sequence,
'thumbs_up': self.generate_thumbs_up_sequence,
'shrug': self.generate_shrug_sequence,
'handshake': self.generate_handshake_sequence
}
# Joint limits and constraints
self.joint_limits = {
'head_pitch': [-30, 45], # degrees
'head_yaw': [-90, 90],
'head_roll': [-45, 45],
'left_shoulder_pitch': [-180, 180],
'left_shoulder_roll': [-90, 90],
'left_elbow': [-180, 0],
'left_wrist': [-90, 90],
'right_shoulder_pitch': [-180, 180],
'right_shoulder_roll': [-90, 90],
'right_elbow': [-180, 0],
'right_wrist': [-90, 90]
}
def generate_gesture(self, gesture_name, duration=2.0, smooth=True):
"""Generate joint trajectory for specified gesture"""
if gesture_name not in self.gesture_library:
print(f"Gesture '{gesture_name}' not found")
return None
trajectory = self.gesture_library[gesture_name](duration)
if smooth:
trajectory = self.smooth_trajectory(trajectory)
return trajectory
def generate_wave_sequence(self, duration):
"""Generate waving gesture"""
fps = 30
num_frames = int(duration * fps)
trajectory = []
# Initial position (arm raised)
initial_pose = {
'right_shoulder_pitch': 90,
'right_shoulder_roll': 30,
'right_elbow': -90,
'right_wrist': 0,
'head_pitch': 0,
'head_yaw': 0
}
# Generate waving motion
for i in range(num_frames):
t = i / fps
# Waving frequency
wave_freq = 2.0 # Hz
wave_amplitude = 30 # degrees
pose = initial_pose.copy()
pose['right_wrist'] = wave_amplitude * np.sin(2 * np.pi * wave_freq * t)
trajectory.append({
'time': t,
'pose': pose
})
return trajectory
def generate_nod_sequence(self, duration):
"""Generate nodding gesture"""
fps = 30
num_frames = int(duration * fps)
trajectory = []
for i in range(num_frames):
t = i / fps
# Nodding motion
nod_freq = 1.5 # Hz
nod_amplitude = 15 # degrees
pose = {
'head_pitch': nod_amplitude * np.sin(2 * np.pi * nod_freq * t),
'head_yaw': 0,
'head_roll': 0
}
trajectory.append({
'time': t,
'pose': pose
})
return trajectory
def generate_shake_sequence(self, duration):
"""Generate head shaking gesture"""
fps = 30
num_frames = int(duration * fps)
trajectory = []
for i in range(num_frames):
t = i / fps
# Shaking motion
shake_freq = 2.0 # Hz
shake_amplitude = 20 # degrees
pose = {
'head_pitch': 0,
'head_yaw': shake_amplitude * np.sin(2 * np.pi * shake_freq * t),
'head_roll': 0
}
trajectory.append({
'time': t,
'pose': pose
})
return trajectory
def generate_point_sequence(self, duration):
"""Generate pointing gesture"""
fps = 30
num_frames = int(duration * fps)
trajectory = []
# Phases: raise arm -> point -> hold -> lower
phases = [0.2, 0.4, 0.3, 0.1] # Fraction of duration
for i in range(num_frames):
t = i / fps
progress = t / duration
if progress < phases[0]:
# Raise arm
alpha = progress / phases[0]
pose = self.interpolate_poses(
self.get_rest_pose(),
self.get_pointing_pose(),
alpha
)
elif progress < sum(phases[:2]):
# Point
pose = self.get_pointing_pose()
elif progress < sum(phases[:3]):
# Hold
pose = self.get_pointing_pose()
else:
# Lower arm
alpha = (progress - sum(phases[:3])) / phases[3]
pose = self.interpolate_poses(
self.get_pointing_pose(),
self.get_rest_pose(),
alpha
)
trajectory.append({
'time': t,
'pose': pose
})
return trajectory
def get_rest_pose(self):
"""Get neutral/rest pose"""
return {
'head_pitch': 0,
'head_yaw': 0,
'head_roll': 0,
'left_shoulder_pitch': 20,
'left_shoulder_roll': 10,
'left_elbow': -20,
'left_wrist': 0,
'right_shoulder_pitch': 20,
'right_shoulder_roll': -10,
'right_elbow': -20,
'right_wrist': 0
}
def get_pointing_pose(self):
"""Get pointing pose"""
return {
'head_pitch': 0,
'head_yaw': 0,
'head_roll': 0,
'left_shoulder_pitch': 20,
'left_shoulder_roll': 10,
'left_elbow': -20,
'left_wrist': 0,
'right_shoulder_pitch': 90,
'right_shoulder_roll': 0,
'right_elbow': 0,
'right_wrist': 0
}
def interpolate_poses(self, pose1, pose2, alpha):
"""Interpolate between two poses"""
interpolated = {}
for joint in pose1:
interpolated[joint] = pose1[joint] * (1 - alpha) + pose2[joint] * alpha
return interpolated
def smooth_trajectory(self, trajectory, window_size=5):
"""Apply smoothing filter to trajectory"""
if len(trajectory) < window_size:
return trajectory
smoothed = []
for i in range(len(trajectory)):
# Get window around current frame
start = max(0, i - window_size // 2)
end = min(len(trajectory), i + window_size // 2 + 1)
# Average poses in window
smoothed_pose = {}
for joint in trajectory[i]['pose']:
values = [traj['pose'][joint] for traj in trajectory[start:end]]
smoothed_pose[joint] = np.mean(values)
smoothed.append({
'time': trajectory[i]['time'],
'pose': smoothed_pose
})
return smoothed
def generate_contextual_gesture(self, intent, entities):
"""Generate gesture based on conversation context"""
if intent == 'greeting':
return self.generate_gesture('wave', duration=1.5)
elif intent == 'request':
return self.generate_gesture('nod', duration=1.0)
elif intent == 'confirmation':
return self.generate_gesture('thumbs_up', duration=1.0)
elif intent == 'farewell':
return self.generate_gesture('wave', duration=2.0)
elif intent == 'uncertainty':
return self.generate_gesture('shrug', duration=1.5)
else:
# No specific gesture
return None
3. Social Cognition and Theory of Mind
3.1 Attention Estimation
class AttentionEstimator:
def __init__(self):
self.face_detector = cv2.CascadeClassifier(
cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'
)
self.eye_detector = cv2.CascadeClassifier(
cv2.data.haarcascades + 'haarcascade_eye.xml'
)
# Load eye gaze estimation model
self.gaze_model = self.load_gaze_model()
def load_gaze_model(self):
"""Load pre-trained gaze estimation model"""
# This would load a CNN model for gaze estimation
# For now, return None as placeholder
return None
def estimate_attention(self, frame):
"""Estimate where person is looking"""
# Detect faces
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
faces = self.face_detector.detectMultiScale(gray, 1.1, 4)
attention_info = []
for (x, y, w, h) in faces:
face_roi = frame[y:y+h, x:x+w]
# Detect eyes
eyes = self.eye_detector.detectMultiScale(face_roi)
if len(eyes) >= 2:
# Estimate gaze direction
gaze_direction = self.estimate_gaze_direction(face_roi, eyes)
# Calculate 3D gaze point (simplified)
gaze_point = self.calculate_gaze_point(
(x + w//2, y + h//2),
gaze_direction,
frame.shape
)
attention_info.append({
'face_location': (x, y, w, h),
'gaze_direction': gaze_direction,
'gaze_point': gaze_point,
'attention_target': self.infer_attention_target(gaze_point, frame.shape)
})
return attention_info
def estimate_gaze_direction(self, face_roi, eyes):
"""Estimate gaze direction from eye regions"""
# Sort eyes by x coordinate (left then right)
eyes = sorted(eyes, key=lambda e: e[0])
left_eye, right_eye = eyes[0], eyes[1]
# Extract eye regions
left_eye_roi = face_roi[left_eye[1]:left_eye[1]+left_eye[3],
left_eye[0]:left_eye[0]+left_eye[2]]
right_eye_roi = face_roi[right_eye[1]:right_eye[1]+right_eye[3],
right_eye[0]:right_eye[0]+right_eye[2]]
# Simplified gaze estimation (would use CNN in practice)
# Estimate pupil position
left_pupil = self.estimate_pupil_position(left_eye_roi)
right_pupil = self.estimate_pupil_position(right_eye_roi)
# Calculate average gaze direction
avg_pupil_x = (left_pupil[0] + right_pupil[0]) / 2
avg_pupil_y = (left_pupil[1] + right_pupil[1]) / 2
# Convert to gaze direction
eye_center_x = (left_eye[0] + left_eye[2]/2 + right_eye[0] + right_eye[2]/2) / 2
eye_center_y = (left_eye[1] + left_eye[3]/2 + right_eye[1] + right_eye[3]/2) / 2
gaze_x = (avg_pupil_x - eye_center_x) / eye_center_x
gaze_y = (avg_pupil_y - eye_center_y) / eye_center_y
return (gaze_x, gaze_y)
def estimate_pupil_position(self, eye_roi):
"""Estimate pupil position in eye region"""
# Convert to grayscale
gray_eye = cv2.cvtColor(eye_roi, cv2.COLOR_BGR2GRAY)
# Apply threshold to find dark regions (pupils)
_, threshold = cv2.threshold(gray_eye, 50, 255, cv2.THRESH_BINARY_INV)
# Find contours
contours, _ = cv2.findContours(threshold, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if contours:
# Find largest contour (assumed to be pupil)
largest_contour = max(contours, key=cv2.contourArea)
# Calculate center of contour
M = cv2.moments(largest_contour)
if M["m00"] != 0:
cx = int(M["m10"] / M["m00"])
cy = int(M["m01"] / M["m00"])
return (cx, cy)
# Fallback: return center of eye region
return (eye_roi.shape[1] // 2, eye_roi.shape[0] // 2)
def calculate_gaze_point(self, face_center, gaze_direction, frame_shape):
"""Calculate 3D point where gaze intersects with a plane"""
# Simplified calculation assuming looking at a plane at z=1
fx, fy = 500, 500 # Focal lengths (approximate)
cx, cy = face_center
# Gaze direction in image coordinates
gx, gy = gaze_direction
# Calculate intersection with plane
# This is simplified - would need proper camera calibration in practice
gaze_point_x = cx + gx * 100
gaze_point_y = cy + gy * 100
# Clamp to frame bounds
gaze_point_x = max(0, min(frame_shape[1], gaze_point_x))
gaze_point_y = max(0, min(frame_shape[0], gaze_point_y))
return (int(gaze_point_x), int(gaze_point_y))
def infer_attention_target(self, gaze_point, frame_shape):
"""Infer what the person is looking at"""
x, y = gaze_point
h, w = frame_shape[:2]
# Define regions of interest
regions = {
'robot_face': (w//2 - 50, h//2 - 50, 100, 100),
'left': (0, 0, w//3, h),
'center': (w//3, 0, w//3, h),
'right': (2*w//3, 0, w//3, h),
'up': (0, 0, w, h//3),
'down': (0, 2*h//3, w, h//3)
}
# Check which region contains the gaze point
for region_name, (rx, ry, rw, rh) in regions.items():
if rx <= x < rx + rw and ry <= y < ry + rh:
return region_name
return 'unknown'
def track_joint_attention(self, robot_gaze_point, human_attention_info):
"""Check if human and robot are attending to same object"""
if not human_attention_info:
return False
for attention in human_attention_info:
human_gaze = attention['gaze_point']
# Calculate distance between gaze points
distance = np.sqrt(
(robot_gaze_point[0] - human_gaze[0])**2 +
(robot_gaze_point[1] - human_gaze[1])**2
)
# Threshold for joint attention
if distance < 50: # pixels
return True
return False
3.2 Intention Recognition
class IntentionRecognizer:
def __init__(self):
# Action-intention mapping
self.intention_patterns = {
'hand_over_object': [
'extend_arm',
'open_hand',
'move_toward_person'
],
'request_object': [
'point_to_object',
'make_eye_contact',
'open_palm'
],
'give_instructions': [
'raise_hand',
'make_gesture',
'maintain_eye_contact'
],
'ask_for_help': [
'raise_both_hands',
'look_around',
'approach_robot'
]
}
# Context for intention recognition
self.context = {
'current_task': None,
'recent_actions': [],
'environment': {}
}
def recognize_intention(self, detected_actions, speech_content=None, context=None):
"""Recognize human intention from actions and speech"""
if context:
self.context.update(context)
# Initialize intention scores
intention_scores = {intention: 0 for intention in self.intention_patterns}
# Score based on action patterns
for action in detected_actions:
action_type = action['type']
for intention, pattern in self.intention_patterns.items():
if action_type in pattern:
intention_scores[intention] += 1
# Consider speech content
if speech_content:
speech_intentions = self.extract_intentions_from_speech(speech_content)
for intention in speech_intentions:
if intention in intention_scores:
intention_scores[intention] += 2 # Higher weight for speech
# Apply context
intention_scores = self.apply_context(intention_scores)
# Get highest scoring intention
if intention_scores:
best_intention = max(intention_scores, key=intention_scores.get)
confidence = intention_scores[best_intention] / sum(intention_scores.values())
if confidence > 0.3: # Threshold
return {
'intention': best_intention,
'confidence': confidence,
'scores': intention_scores
}
return {
'intention': 'unknown',
'confidence': 0,
'scores': intention_scores
}
def extract_intentions_from_speech(self, speech_text):
"""Extract intentions from speech content"""
intentions = []
text_lower = speech_text.lower()
# Keyword-based intention extraction
if any(word in text_lower for word in ['take', 'here', 'hold']):
intentions.append('hand_over_object')
if any(word in text_lower for word in ['can', 'could', 'please', 'help']):
intentions.append('request_object')
if any(word in text_lower for word in ['show', 'do', 'how to']):
intentions.append('give_instructions')
if any(word in text_lower for word in ['help me', 'assist', 'support']):
intentions.append('ask_for_help')
return intentions
def apply_context(self, intention_scores):
"""Apply contextual information to intention scores"""
context_scores = intention_scores.copy()
# Adjust based on current task
if self.context.get('current_task') == 'collaborative_assembly':
context_scores['hand_over_object'] *= 1.5
context_scores['request_object'] *= 1.2
elif self.context.get('current_task') == 'learning_demonstration':
context_scores['give_instructions'] *= 2.0
# Consider recent actions (temporal consistency)
recent_intentions = [action.get('intention') for action in self.context.get('recent_actions', [])]
if recent_intentions:
last_intention = recent_intentions[-1]
if last_intention in context_scores:
context_scores[last_intention] *= 1.3 # Favor continuity
return context_scores
def predict_next_action(self, current_intention):
"""Predict likely next action based on intention"""
action_predictions = {
'hand_over_object': [
{'action': 'receive_object', 'probability': 0.6},
{'action': 'confirm_receipt', 'probability': 0.3},
{'action': 'examine_object', 'probability': 0.1}
],
'request_object': [
{'action': 'retrieve_object', 'probability': 0.7},
{'action': 'clarify_request', 'probability': 0.2},
{'action': 'request_location', 'probability': 0.1}
],
'give_instructions': [
{'action': 'acknowledge', 'probability': 0.5},
{'action': 'ask_question', 'probability': 0.3},
{'action': 'prepare_to_follow', 'probability': 0.2}
],
'ask_for_help': [
{'action': 'offer_assistance', 'probability': 0.6},
{'action': 'assess_situation', 'probability': 0.3},
{'action': 'request_details', 'probability': 0.1}
]
}
return action_predictions.get(current_intention, [])
def update_context(self, new_info):
"""Update context with new information"""
self.context.update(new_info)
# Update recent actions
if 'action' in new_info:
self.context['recent_actions'].append({
'action': new_info['action'],
'timestamp': time.time(),
'intention': new_info.get('intention')
})
# Keep only recent actions (last 10)
if len(self.context['recent_actions']) > 10:
self.context['recent_actions'] = self.context['recent_actions'][-10:]
4. Collaborative Task Planning
4.1 Task Allocation and Coordination
class TaskPlanner:
def __init__(self):
self.robot_capabilities = {
'perception': 0.9,
'manipulation': 0.8,
'navigation': 0.95,
'communication': 0.85,
'heavy_lifting': 0.7
}
self.task_requirements = {
'object_search': ['perception', 'navigation'],
'object_picking': ['perception', 'manipulation'],
'object_delivery': ['manipulation', 'navigation'],
'heavy_lifting': ['heavy_lifting', 'manipulation'],
'information_providing': ['communication', 'perception']
}
def decompose_task(self, high_level_task):
"""Decompose high-level task into subtasks"""
task_templates = {
'clean_room': [
{'name': 'scan_room', 'type': 'perception'},
{'name': 'identify_objects', 'type': 'perception'},
{'name': 'pick_up_trash', 'type': 'manipulation'},
{'name': 'move_objects', 'type': 'manipulation'}
],
'prepare_snack': [
{'name': 'locate_ingredients', 'type': 'perception'},
{'name': 'fetch_items', 'type': 'navigation'},
{'name': 'prepare_food', 'type': 'manipulation'},
{'name': 'serve_snack', 'type': 'manipulation'}
],
'help_with_cooking': [
{'name': 'read_recipe', 'type': 'perception'},
{'name': 'fetch_ingredients', 'type': 'navigation'},
{'name': 'hand_over_tools', 'type': 'manipulation'},
{'name': 'clean_workspace', 'type': 'manipulation'}
]
}
return task_templates.get(high_level_task, [])
def allocate_tasks(self, subtasks, human_capabilities):
"""Allocate tasks between human and robot"""
allocation = []
for subtask in subtasks:
# Calculate robot capability score
required_capabilities = self.task_requirements.get(subtask['type'], [])
robot_score = sum(self.robot_capabilities.get(cap, 0) for cap in required_capabilities) / len(required_capabilities)
# Calculate human capability score (estimated)
human_score = sum(human_capabilities.get(cap, 0.5) for cap in required_capabilities) / len(required_capabilities)
# Determine allocation
if robot_score > human_score + 0.2:
# Robot is significantly better
agent = 'robot'
confidence = robot_score
elif human_score > robot_score + 0.2:
# Human is significantly better
agent = 'human'
confidence = human_score
else:
# Both are similar - consider collaboration
agent = 'collaborative'
confidence = (robot_score + human_score) / 2
allocation.append({
'subtask': subtask,
'agent': agent,
'confidence': confidence,
'robot_score': robot_score,
'human_score': human_score
})
return allocation
def generate_collaboration_plan(self, task_allocation):
"""Generate execution plan for collaborative tasks"""
plan = {
'steps': [],
'dependencies': {},
'handoffs': []
}
# Order tasks and identify dependencies
for i, allocation in enumerate(task_allocation):
step = {
'id': i,
'task': allocation['subtask']['name'],
'agent': allocation['agent'],
'estimated_duration': self.estimate_task_duration(allocation['subtask']),
'parallel_with': [],
'handoff_to': None
}
# Add dependencies
if i > 0:
step['depends_on'] = [i-1]
# Check for handoffs
if allocation['agent'] == 'collaborative':
step['requires_coordination'] = True
if i > 0 and task_allocation[i-1]['agent'] != 'collaborative':
plan['handoffs'].append({
'from': task_allocation[i-1]['agent'],
'to': 'both',
'at_step': i
})
plan['steps'].append(step)
# Optimize for parallel execution
plan = self.optimize_parallel_execution(plan)
return plan
def estimate_task_duration(self, subtask):
"""Estimate time required for subtask"""
base_durations = {
'perception': 2.0, # seconds
'navigation': 3.0,
'manipulation': 4.0,
'communication': 1.0
}
base = base_durations.get(subtask['type'], 3.0)
# Add randomness (20% variation)
variation = np.random.uniform(0.8, 1.2)
return base * variation
def optimize_parallel_execution(self, plan):
"""Identify tasks that can be executed in parallel"""
for i, step in enumerate(plan['steps']):
for j, other_step in enumerate(plan['steps']):
if i != j:
# Check if tasks can be parallel
if self.can_be_parallel(step, other_step, plan):
step['parallel_with'].append(j)
return plan
def can_be_parallel(self, step1, step2, plan):
"""Check if two steps can be executed in parallel"""
# Check for dependencies
if 'depends_on' in step1 and step2['id'] in step1['depends_on']:
return False
if 'depends_on' in step2 and step1['id'] in step2['depends_on']:
return False
# Check resource conflicts
if step1['agent'] == step2['agent'] and step1['agent'] != 'collaborative':
return False
# Check spatial conflicts (simplified)
step1_location = self.get_task_location(step1['task'])
step2_location = self.get_task_location(step2['task'])
if step1_location == step2_location and step1_location != 'different':
return False
return True
def get_task_location(self, task_name):
"""Get typical location for task"""
location_map = {
'scan_room': 'central',
'identify_objects': 'variable',
'pick_up_trash': 'floor',
'locate_ingredients': 'kitchen',
'fetch_items': 'variable',
'hand_over_tools': 'workspace'
}
return location_map.get(task_name, 'different')
4.2 Coordination and Synchronization
class TaskCoordinator:
def __init__(self):
self.current_plan = None
self.execution_status = {}
self.communication_queue = []
def execute_collaborative_plan(self, plan, robot_controller, human_interface):
"""Execute collaborative task plan"""
self.current_plan = plan
# Initialize execution status
for step in plan['steps']:
self.execution_status[step['id']] = {
'status': 'pending',
'start_time': None,
'end_time': None,
'agent': step['agent']
}
# Execute steps
for step in plan['steps']:
self.execute_step(step, robot_controller, human_interface)
return self.get_execution_summary()
def execute_step(self, step, robot_controller, human_interface):
"""Execute individual step"""
step_id = step['id']
# Check dependencies
if not self.check_dependencies(step):
print(f"Dependencies not met for step {step_id}")
return False
# Update status
self.execution_status[step_id]['status'] = 'executing'
self.execution_status[step_id]['start_time'] = time.time()
# Execute based on agent
if step['agent'] == 'robot':
success = self.execute_robot_step(step, robot_controller)
elif step['agent'] == 'human':
success = self.execute_human_step(step, human_interface)
elif step['agent'] == 'collaborative':
success = self.execute_collaborative_step(step, robot_controller, human_interface)
else:
success = False
# Update status
self.execution_status[step_id]['status'] = 'completed' if success else 'failed'
self.execution_status[step_id]['end_time'] = time.time()
return success
def check_dependencies(self, step):
"""Check if all dependencies are satisfied"""
if 'depends_on' not in step:
return True
for dep_id in step['depends_on']:
if self.execution_status[dep_id]['status'] != 'completed':
return False
return True
def execute_robot_step(self, step, robot_controller):
"""Execute robot-only step"""
print(f"Robot executing: {step['task']}")
# Execute task based on type
if 'navigate' in step['task']:
success = robot_controller.navigate_to_location(step.get('location'))
elif 'pick' in step['task']:
success = robot_controller.pick_object(step.get('object'))
elif 'place' in step['task']:
success = robot_controller.place_object(step.get('location'))
else:
# Generic task execution
success = robot_controller.execute_skill(step['task'])
return success
def execute_human_step(self, step, human_interface):
"""Execute human-only step"""
print(f"Waiting for human to: {step['task']}")
# Provide instruction to human
instruction = self.generate_human_instruction(step)
human_interface.provide_instruction(instruction)
# Wait for human confirmation
human_ready = human_interface.wait_for_confirmation(timeout=30)
return human_ready
def execute_collaborative_step(self, step, robot_controller, human_interface):
"""Execute collaborative step"""
print(f"Collaborative execution: {step['task']}")
# Synchronize human and robot
if not self.synchronize_agents(step, robot_controller, human_interface):
return False
# Define roles
roles = self.define_collaborative_roles(step)
# Execute with coordination
success = self.coordinated_execution(
step,
roles,
robot_controller,
human_interface
)
return success
def synchronize_agents(self, step, robot_controller, human_interface):
"""Synchronize human and robot before collaborative task"""
# Establish communication
human_interface.initiate_communication(
f"Ready to collaborate on: {step['task']}?"
)
# Wait for human readiness
human_ready = human_interface.wait_for_readiness()
if human_ready:
# Robot signals readiness
robot_controller.signal_ready()
return True
return False
def define_collaborative_roles(self, step):
"""Define roles for collaborative task"""
# Role definition based on task type
role_map = {
'hand_over': {
'robot': 'object_holder',
'human': 'object_receiver'
},
'joint_lifting': {
'robot': 'primary_support',
'human': 'secondary_support'
},
'guided_assembly': {
'robot': 'part_provider',
'human': 'assembler'
},
'collaborative_cleaning': {
'robot': 'collector',
'human': 'organizer'
}
}
task_type = self.infer_task_type(step['task'])
return role_map.get(task_type, {'robot': 'assistant', 'human': 'leader'})
def infer_task_type(self, task_name):
"""Infer task type from task name"""
if 'hand' in task_name or 'pass' in task_name:
return 'hand_over'
elif 'lift' in task_name or 'carry' in task_name:
return 'joint_lifting'
elif 'assemble' in task_name or 'build' in task_name:
return 'guided_assembly'
elif 'clean' in task_name or 'organize' in task_name:
return 'collaborative_cleaning'
else:
return 'general'
def coordinated_execution(self, step, roles, robot_controller, human_interface):
"""Execute task with coordination between agents"""
# Exchange signals
exchange_successful = self.exchange_coordination_signals(
step,
roles,
robot_controller,
human_interface
)
if not exchange_successful:
return False
# Execute coordinated actions
if roles['robot'] == 'object_holder':
# Robot holds object, human receives
success = self.execute_handover_sequence(
step, robot_controller, human_interface
)
elif roles['robot'] == 'primary_support':
# Robot provides primary support
success = self.execute_joint_lifting_sequence(
step, robot_controller, human_interface
)
else:
# Generic coordinated execution
success = self.execute_generic_coordination(
step, robot_controller, human_interface
)
return success
def exchange_coordination_signals(self, step, roles, robot_controller, human_interface):
"""Exchange signals for coordination"""
# Robot signals intention
robot_signal = {
'action': step['task'],
'role': roles['robot'],
'timing': 'ready'
}
human_interface.send_robot_signal(robot_signal)
# Wait for human response
human_response = human_interface.wait_for_signal(timeout=5)
if human_response and human_response.get('status') == 'ready':
return True
return False
def generate_human_instruction(self, step):
"""Generate instruction for human task"""
instructions = {
'fetch': 'Please fetch the specified object',
'hold': 'Please hold this object steady',
'guide': 'Please guide me to the location',
'verify': 'Please verify this is correct',
'adjust': 'Please make adjustments as needed'
}
for key in instructions:
if key in step['task']:
return instructions[key]
return f"Please: {step['task']}"
def get_execution_summary(self):
"""Get summary of plan execution"""
summary = {
'total_steps': len(self.current_plan['steps']),
'completed': 0,
'failed': 0,
'total_time': 0,
'step_details': []
}
start_time = None
end_time = None
for step in self.current_plan['steps']:
status = self.execution_status[step['id']]
if status['status'] == 'completed':
summary['completed'] += 1
elif status['status'] == 'failed':
summary['failed'] += 1
if status['start_time']:
if start_time is None or status['start_time'] < start_time:
start_time = status['start_time']
if status['end_time'] and (end_time is None or status['end_time'] > end_time):
end_time = status['end_time']
summary['step_details'].append({
'step': step['task'],
'agent': status['agent'],
'status': status['status'],
'duration': (status['end_time'] - status['start_time']) if status['end_time'] else None
})
if start_time and end_time:
summary['total_time'] = end_time - start_time
return summary
5. Emotional Intelligence and Expression
5.1 Emotion Recognition
class EmotionRecognizer:
def __init__(self):
# Load face expression model
self.face_detector = cv2.CascadeClassifier(
cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'
)
# Load emotion recognition model
self.emotion_model = self.load_emotion_model()
# Emotion labels
self.emotions = ['neutral', 'happy', 'sad', 'angry', 'fear', 'surprise', 'disgust']
# Physiological emotion indicators
self.voice_emotion_analyzer = VoiceEmotionAnalyzer()
self.gesture_emotion_analyzer = GestureEmotionAnalyzer()
def load_emotion_model(self):
"""Load pre-trained emotion recognition model"""
# This would load a CNN model trained on FER2013 or similar dataset
# For now, return placeholder
return None
def recognize_emotions(self, frame, audio_data=None, gesture_data=None):
"""Recognize emotions from multiple modalities"""
emotion_scores = {emotion: 0 for emotion in self.emotions}
# Face-based emotion recognition
face_emotions = self.recognize_facial_emotions(frame)
if face_emotions:
for emotion, score in face_emotions.items():
emotion_scores[emotion] += score * 0.4 # Weight for face
# Voice-based emotion recognition
if audio_data:
voice_emotions = self.voice_emotion_analyzer.analyze(audio_data)
if voice_emotions:
for emotion, score in voice_emotions.items():
emotion_scores[emotion] += score * 0.3 # Weight for voice
# Gesture-based emotion recognition
if gesture_data:
gesture_emotions = self.gesture_emotion_analyzer.analyze(gesture_data)
if gesture_emotions:
for emotion, score in gesture_emotions.items():
emotion_scores[emotion] += score * 0.3 # Weight for gesture
# Normalize scores
total_score = sum(emotion_scores.values())
if total_score > 0:
for emotion in emotion_scores:
emotion_scores[emotion] /= total_score
# Get dominant emotion
dominant_emotion = max(emotion_scores, key=emotion_scores.get)
confidence = emotion_scores[dominant_emotion]
return {
'dominant_emotion': dominant_emotion,
'confidence': confidence,
'all_scores': emotion_scores,
'sources': {
'face': face_emotions,
'voice': voice_emotions if audio_data else None,
'gesture': gesture_emotions if gesture_data else None
}
}
def recognize_facial_emotions(self, frame):
"""Recognize emotions from facial expressions"""
# Convert to grayscale
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# Detect faces
faces = self.face_detector.detectMultiScale(gray, 1.1, 4)
emotions_detected = []
for (x, y, w, h) in faces:
# Extract face ROI
face_roi = gray[y:y+h, x:x+w]
# Resize to model input size
face_resized = cv2.resize(face_roi, (48, 48))
# Normalize
face_normalized = face_resized / 255.0
# Reshape for model input
face_input = face_normalized.reshape(1, 48, 48, 1)
# Predict emotion (simplified - would use actual model)
emotion_prediction = self.predict_emotion(face_input)
emotions_detected.append({
'location': (x, y, w, h),
'emotions': emotion_prediction
})
# Return average emotions if multiple faces
if emotions_detected:
avg_emotions = {}
for emotion in self.emotions:
scores = [det['emotions'].get(emotion, 0) for det in emotions_detected]
avg_emotions[emotion] = np.mean(scores)
return avg_emotions
return None
def predict_emotion(self, face_input):
"""Predict emotion from face (placeholder implementation)"""
# This would use the actual loaded model
# For now, return random scores
scores = np.random.rand(len(self.emotions))
scores = scores / scores.sum() # Normalize
return dict(zip(self.emotions, scores))
class VoiceEmotionAnalyzer:
def __init__(self):
self.emotion_labels = ['neutral', 'happy', 'sad', 'angry', 'fear', 'surprise']
def analyze(self, audio_data):
"""Analyze emotion in voice"""
# Extract acoustic features
features = self.extract_acoustic_features(audio_data)
# Classify emotion (simplified)
emotion_scores = self.classify_voice_emotion(features)
return emotion_scores
def extract_acoustic_features(self, audio_data):
"""Extract acoustic features for emotion recognition"""
# Features to extract:
# - Pitch (fundamental frequency)
# - Energy/intensity
# - Speaking rate
# - Spectral features
# - Voice quality measures
features = {
'pitch_mean': np.random.normal(150, 30), # Hz
'pitch_std': np.random.normal(20, 5),
'energy_mean': np.random.normal(0.5, 0.1),
'energy_std': np.random.normal(0.1, 0.02),
'speaking_rate': np.random.normal(3.5, 0.5), # syllables/sec
'spectral_centroid': np.random.normal(2000, 500),
'spectral_rolloff': np.random.normal(4000, 1000)
}
return features
def classify_voice_emotion(self, features):
"""Classify emotion from acoustic features"""
# Simplified rule-based classification
emotion_scores = {}
# Happy: higher pitch, more energy variation
if features['pitch_mean'] > 160 and features['energy_std'] > 0.1:
emotion_scores['happy'] = 0.7
# Sad: lower pitch, less energy
if features['pitch_mean'] < 140 and features['energy_mean'] < 0.4:
emotion_scores['sad'] = 0.6
# Angry: high energy, fast speaking rate
if features['energy_mean'] > 0.6 and features['speaking_rate'] > 4:
emotion_scores['angry'] = 0.6
# Fear: high pitch, fast rate
if features['pitch_mean'] > 170 and features['speaking_rate'] > 4.5:
emotion_scores['fear'] = 0.5
# Surprise: sudden pitch changes
if features['pitch_std'] > 30:
emotion_scores['surprise'] = 0.6
# Default: neutral
if not emotion_scores:
emotion_scores['neutral'] = 0.8
return emotion_scores
class GestureEmotionAnalyzer:
def __init__(self):
self.emotion_gestures = {
'happy': ['open_palm', 'arms_raised', 'upward_movement'],
'sad': ['slumped_posture', 'slow_movements', 'head_down'],
'angry': ['fist', 'abrupt_movements', 'tension'],
'fear': ['trembling', 'protective_posture', 'rapid_movements'],
'surprise': ['wide_eyes', 'raised_eyebrows', 'sudden_movement']
}
def analyze(self, gesture_data):
"""Analyze emotion from gestures"""
emotion_scores = {}
detected_gestures = gesture_data.get('gestures', [])
for gesture in detected_gestures:
for emotion, associated_gestures in self.emotion_gestures.items():
if gesture['type'] in associated_gestures:
emotion_scores[emotion] = emotion_scores.get(emotion, 0) + 0.2
# Consider posture
posture = gesture_data.get('posture', {})
if posture.get('slumped', False):
emotion_scores['sad'] = emotion_scores.get('sad', 0) + 0.3
if posture.get('tense', False):
emotion_scores['angry'] = emotion_scores.get('angry', 0) + 0.2
return emotion_scores
5.2 Emotion Expression
class EmotionExpression:
def __init__(self):
# Facial expression parameters
self.expressions = {
'happy': {
'mouth_shape': 'smile',
'eye_opening': 0.8,
'eyebrow_position': 'raised',
'head_tilt': 0.1
},
'sad': {
'mouth_shape': 'frown',
'eye_opening': 0.3,
'eyebrow_position': 'lowered',
'head_tilt': -0.2
},
'angry': {
'mouth_shape': 'tight',
'eye_opening': 0.9,
'eyebrow_position': 'lowered_centered',
'head_tilt': 0
},
'surprise': {
'mouth_shape': 'open',
'eye_opening': 1.0,
'eyebrow_position': 'raised',
'head_tilt': 0.1
},
'neutral': {
'mouth_shape': 'relaxed',
'eye_opening': 0.5,
'eyebrow_position': 'neutral',
'head_tilt': 0
}
}
# Body language parameters
self.body_expressions = {
'happy': {
'posture': 'upright',
'arm_position': 'relaxed',
'movement': 'smooth',
'energy': 'high'
},
'sad': {
'posture': 'slumped',
'arm_position': 'down',
'movement': 'slow',
'energy': 'low'
},
'angry': {
'posture': 'tense',
'arm_position': 'crossed',
'movement': 'jerky',
'energy': 'high'
},
'surprise': {
'posture': 'upright',
'arm_position': 'raised',
'movement': 'quick',
'energy': 'medium'
}
}
def express_emotion(self, emotion, intensity=1.0, duration=2.0):
"""Generate expression for specified emotion"""
if emotion not in self.expressions:
print(f"Unknown emotion: {emotion}")
return None
# Get facial parameters
facial_params = self.expressions[emotion].copy()
for param in facial_params:
if isinstance(facial_params[param], float):
facial_params[param] *= intensity
# Get body language parameters
body_params = self.body_expressions.get(emotion, self.body_expressions['neutral'])
# Generate expression sequence
expression_sequence = self.generate_expression_sequence(
facial_params,
body_params,
duration
)
return expression_sequence
def generate_expression_sequence(self, facial_params, body_params, duration):
"""Generate time-sequenced expression"""
fps = 30
num_frames = int(duration * fps)
sequence = []
# Generate keyframes
keyframes = [
{'time': 0.0, 'intensity': 0.0}, # Start neutral
{'time': 0.2, 'intensity': 1.0}, # Fade in
{'time': duration * 0.8, 'intensity': 1.0}, # Hold
{'time': duration, 'intensity': 0.0} # Fade out
]
for frame in range(num_frames):
t = frame / fps
# Interpolate intensity
intensity = self.interpolate_intensity(t, keyframes)
# Apply to parameters
frame_expression = {
'time': t,
'facial': self.interpolate_facial_expression(facial_params, intensity),
'body': self.interpolate_body_expression(body_params, intensity)
}
sequence.append(frame_expression)
return sequence
def interpolate_intensity(self, t, keyframes):
"""Interpolate intensity at time t"""
for i in range(len(keyframes) - 1):
if keyframes[i]['time'] <= t <= keyframes[i + 1]['time']:
# Linear interpolation
t1, i1 = keyframes[i]['time'], keyframes[i]['intensity']
t2, i2 = keyframes[i + 1]['time'], keyframes[i + 1]['intensity']
alpha = (t - t1) / (t2 - t1)
return i1 + alpha * (i2 - i1)
return 0
def interpolate_facial_expression(self, base_params, intensity):
"""Apply intensity to facial expression"""
expression = {}
for param, value in base_params.items():
if isinstance(value, (int, float)):
expression[param] = value * intensity
else:
expression[param] = value if intensity > 0.5 else 'neutral'
return expression
def interpolate_body_expression(self, base_params, intensity):
"""Apply intensity to body expression"""
expression = {}
for param, value in base_params.items():
expression[param] = value if intensity > 0.3 else 'neutral'
return expression
def generate_contextual_response(self, human_emotion, context):
"""Generate appropriate emotional response based on human emotion"""
response_rules = {
'happy': {
'empathy': 'happy',
'social_norm': 'happy'
},
'sad': {
'empathy': 'concerned',
'social_norm': 'supportive'
},
'angry': {
'empathy': 'calm',
'social_norm': 'apologetic'
},
'fear': {
'empathy': 'reassuring',
'social_norm': 'confident'
},
'surprise': {
'empathy': 'curious',
'social_norm': 'engaged'
}
}
# Determine response strategy
if context.get('empathy_mode', True):
strategy = 'empathy'
else:
strategy = 'social_norm'
target_emotion = response_rules.get(human_emotion, {}).get(strategy, 'neutral')
# Adjust intensity based on context
base_intensity = 0.7
if context.get('formal_setting', False):
base_intensity *= 0.5
elif context.get('informal_setting', False):
base_intensity *= 1.2
return self.express_emotion(target_emotion, intensity=base_intensity, duration=2.0)
def blend_emotions(self, emotions_with_weights):
"""Blend multiple emotions with specified weights"""
blended_expression = {
'facial': {},
'body': {}
}
total_weight = sum(emotions_with_weights.values())
for emotion, weight in emotions_with_weights.items():
if emotion in self.expressions:
factor = weight / total_weight
# Blend facial expressions
facial = self.expressions[emotion]
for param, value in facial.items():
if isinstance(value, (int, float)):
blended_expression['facial'][param] = \
blended_expression['facial'].get(param, 0) + value * factor
# Blend body expressions
if emotion in self.body_expressions:
body = self.body_expressions[emotion]
for param, value in body.items():
if param not in blended_expression['body']:
blended_expression['body'][param] = {}
if value in blended_expression['body'][param]:
blended_expression['body'][param][value] = \
blended_expression['body'][param].get(value, 0) + factor
else:
blended_expression['body'][param][value] = factor
# Resolve blended body expressions
for param in blended_expression['body']:
if isinstance(blended_expression['body'][param], dict):
# Choose most likely state
max_state = max(blended_expression['body'][param],
key=blended_expression['body'][param].get)
blended_expression['body'][param] = max_state
return {
'facial': blended_expression['facial'],
'body': blended_expression['body']
}
6. Complete HRI System Integration
6.1 Integrated Interaction System
class IntegratedHRISystem:
def __init__(self):
# Initialize all HRI components
self.speech_processor = SpeechProcessor()
self.dialogue_manager = DialogueManager()
self.gesture_recognizer = GestureRecognizer()
self.gesture_generator = GestureGenerator()
self.attention_estimator = AttentionEstimator()
self.intention_recognizer = IntentionRecognizer()
self.task_planner = TaskPlanner()
self.task_coordinator = TaskCoordinator()
self.emotion_recognizer = EmotionRecognizer()
self.emotion_expression = EmotionExpression()
# Interaction state
self.interaction_state = {
'mode': 'idle',
'current_task': None,
'user_emotion': 'neutral',
'joint_attention': False,
'trust_level': 0.5
}
# Communication channels
self.robot_controller = None # Would be actual robot interface
self.sensors = None # Would be actual sensor interface
def start_interaction(self):
"""Start interactive session"""
print("Starting Human-Robot Interaction System")
print("Say 'hello' or wave to begin...")
# Initialize camera
cap = cv2.VideoCapture(0)
while True:
# Capture frame
ret, frame = cap.read()
if not ret:
break
# Process multimodal input
self.process_multimodal_input(frame)
# Update interaction state
self.update_interaction_state()
# Generate appropriate response
response = self.generate_response()
# Execute response
if response:
self.execute_response(response)
# Display status
self.display_interaction_status(frame)
# Check for exit command
if self.interaction_state['mode'] == 'exit':
break
cv2.waitKey(30)
cap.release()
cv2.destroyAllWindows()
def process_multimodal_input(self, frame):
"""Process all input modalities"""
# Speech input (non-blocking)
speech_text = None
if np.random.random() < 0.01: # Simulate speech detection
speech_text = self.speech_processor.listen_for_speech(timeout=1)
# Gesture recognition
processed_frame, gestures = self.gesture_recognizer.recognize_gestures(frame)
# Attention estimation
attention_info = self.attention_estimator.estimate_attention(frame)
# Emotion recognition
emotion_info = self.emotion_recognize emotions(frame)
# Store input
self.current_input = {
'frame': frame,
'speech': speech_text,
'gestures': gestures,
'attention': attention_info,
'emotions': emotion_info,
'timestamp': time.time()
}
def update_interaction_state(self):
"""Update interaction state based on input"""
input_data = self.current_input
# Process speech
if input_data['speech']:
speech_result = self.speech_processor.extract_intent_and_entities(
input_data['speech']
)
self.interaction_state['last_speech_intent'] = speech_result['intent']
# Process gestures
if input_data['gestures']:
self.interaction_state['last_gesture'] = input_data['gestures'][0]['type']
# Update attention
if input_data['attention']:
robot_gaze = (320, 240) # Center of frame
self.interaction_state['joint_attention'] = \
self.attention_estimator.track_joint_attention(
robot_gaze,
input_data['attention']
)
# Update emotion
if input_data['emotions']:
self.interaction_state['user_emotion'] = \
input_data['emotions']['dominant_emotion']
# Recognize intention
speech_intent = self.interaction_state.get('last_speech_intent')
if speech_intent or input_data['gestures']:
intention = self.intention_recognizer.recognize_intention(
input_data['gestures'],
input_data['speech']
)
self.interaction_state['recognized_intention'] = intention
def generate_response(self):
"""Generate appropriate response based on state"""
# Determine interaction mode
mode = self.determine_interaction_mode()
if mode == 'greeting':
return {
'type': 'greeting',
'speech': self.dialogue_manager.select_template('greeting'),
'gesture': 'wave',
'expression': 'happy'
}
elif mode == 'task_execution':
return self.generate_task_response()
elif mode == 'conversation':
return self.generate_conversational_response()
elif mode == 'emotional_support':
return self.generate_emotional_response()
return None
def determine_interaction_mode(self):
"""Determine current interaction mode"""
state = self.interaction_state
# Check for greeting
if (state.get('last_speech_intent') == 'greeting' or
state.get('last_gesture') == 'waving'):
return 'greeting'
# Check for task request
if (state.get('last_speech_intent') == 'request' or
state.get('recognized_intention', {}).get('intention') == 'request_object'):
return 'task_execution'
# Check for emotional distress
if state['user_emotion'] in ['sad', 'angry', 'fear']:
return 'emotional_support'
# Default to conversation
if state.get('last_speech_intent'):
return 'conversation'
return 'idle'
def generate_task_response(self):
"""Generate response for task execution"""
intention = self.interaction_state.get('recognized_intention', {})
task_type = intention.get('intention', 'general')
# Generate task plan
subtasks = self.task_planner.decompose_task(task_type)
human_capabilities = {'perception': 0.7, 'manipulation': 0.8}
allocation = self.task_planner.allocate_tasks(subtasks, human_capabilities)
plan = self.task_planner.generate_collaboration_plan(allocation)
return {
'type': 'task',
'speech': f"I'll help you with {task_type}. Let's work together.",
'plan': plan,
'expression': 'neutral'
}
def generate_conversational_response(self):
"""Generate conversational response"""
speech_text = self.current_input.get('speech')
if speech_text:
speech_result = self.speech_processor.extract_intent_and_entities(speech_text)
response = self.dialogue_manager.process_input(speech_result)
return {
'type': 'conversation',
'speech': response,
'gesture': self.gesture_generator.generate_contextual_gesture(
speech_result['intent'],
speech_result['entities']
),
'expression': 'neutral'
}
return None
def generate_emotional_response(self):
"""Generate emotional support response"""
user_emotion = self.interaction_state['user_emotion']
# Generate empathetic response
empathetic_expressions = {
'sad': 'concerned',
'angry': 'calm',
'fear': 'reassuring'
}
target_expression = empathetic_expressions.get(user_emotion, 'supportive')
responses = {
'sad': "I notice you seem upset. Is there anything I can do to help?",
'angry': "I can see you're frustrated. Let's take a moment and address the issue calmly.",
'fear': "You seem worried. I'm here to help and we can face this together."
}
return {
'type': 'emotional_support',
'speech': responses.get(user_emotion, "How are you feeling?"),
'expression': target_expression,
'gesture': 'gentle_nod'
}
def execute_response(self, response):
"""Execute generated response"""
if not response:
return
# Execute speech response
if response.get('speech'):
self.speak(response['speech'])
# Execute gesture
if response.get('gesture'):
gesture_sequence = self.gesture_generator.generate_gesture(
response['gesture']
)
if gesture_sequence and self.robot_controller:
self.robot_controller.execute_gesture(gesture_sequence)
# Execute facial expression
if response.get('expression'):
expression_sequence = self.emotion_expression.express_emotion(
response['expression']
)
if expression_sequence and self.robot_controller:
self.robot_controller.set_facial_expression(expression_sequence)
# Execute task
if response.get('plan'):
self.execute_collaborative_task(response['plan'])
def execute_collaborative_task(self, plan):
"""Execute collaborative task plan"""
# This would integrate with actual robot controller
print(f"Executing collaborative task with {len(plan['steps'])} steps")
self.task_coordinator.current_plan = plan
def speak(self, text):
"""Text-to-speech output"""
print(f"Robot: {text}")
# Would call actual TTS system
def display_interaction_status(self, frame):
"""Display interaction status on frame"""
# Add status information to frame
status_text = [
f"Mode: {self.interaction_state['mode']}",
f"Emotion: {self.interaction_state['user_emotion']}",
f"Joint Attention: {self.interaction_state['joint_attention']}"
]
y_offset = 30
for text in status_text:
cv2.putText(frame, text, (10, y_offset),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
y_offset += 30
cv2.imshow('HRI System', frame)
# Main execution
if __name__ == "__main__":
hri_system = IntegratedHRISystem()
hri_system.start_interaction()
Conclusion
This chapter explored the multifaceted domain of Human-Robot Interaction for humanoid robots, covering:
- Natural Language Processing: Enabling conversational interaction
- Gesture Recognition: Understanding non-verbal communication
- Social Cognition: Developing theory of mind capabilities
- Collaborative Planning: Coordinating joint human-robot tasks
- Emotional Intelligence: Recognizing and expressing emotions
- System Integration: Creating comprehensive interactive systems
Effective HRI requires seamless integration of multiple perception, cognition, and action systems. As humanoid robots become more sophisticated, their ability to understand and respond to human social cues will be crucial for acceptance and effectiveness in real-world applications.
Key Takeaways
- Multimodal Understanding: Combining speech, gestures, and context improves comprehension
- Social Norms: Robots must understand and follow social conventions
- Adaptive Behavior: Responses should adapt to individual users and situations
- Emotional Awareness: Recognizing and appropriately responding to emotions is essential
- Collaboration: Successful HRI requires natural coordination and turn-taking
- Trust Building: Consistent, predictable behavior builds user trust
Previous: Chapter 13 - Machine Learning in Robotics | Next: Chapter 15 - Future of Physical AI