diff --git a/mbodied/EKF/__init__.py b/mbodied/EKF/__init__.py new file mode 100644 index 0000000..568bf9c --- /dev/null +++ b/mbodied/EKF/__init__.py @@ -0,0 +1,3 @@ +from .extended_kalman_filter import ExtendedKalmanFilter + +__all__ = ["ExtendedKalmanFilter"] \ No newline at end of file diff --git a/mbodied/EKF/extended_kalman_filter.py b/mbodied/EKF/extended_kalman_filter.py new file mode 100644 index 0000000..46e9b77 --- /dev/null +++ b/mbodied/EKF/extended_kalman_filter.py @@ -0,0 +1,205 @@ +import numpy as np + +class ExtendedKalmanFilter: + def __init__( + self, + state_dim, + control_dim, + observation_dim, + initial_state=None, + initial_covariance=None, + process_noise_cov=None, + measurement_noise_cov=None, + uncertainty_percentage=0.1, + is_linear_f=False, + is_linear_h=False + ): + """ + For reference, see: https://gaoyichao.com/Xiaotu/resource/refs/PR.MIT.en.pdf chapter 3.3 + + Initializes the EKF with dimensions for the state, control input, and observation vectors. + + Args: + - state_dim (int): Dimension of the state vector. + - control_dim (int): Dimension of the control vector. + - observation_dim (int): Dimension of the observation vector. + - initial_state (np.ndarray or None): Optional initial state vector. + - initial_covariance (np.ndarray or None): Optional initial covariance matrix. + - uncertainty_percentage (float): Percentage for initial state uncertainty. + """ + self.state_dim = state_dim + self.control_dim = control_dim + self.observation_dim = observation_dim + self.is_linear_f = is_linear_f + self.is_linear_h = is_linear_h + + if initial_state is not None: + self.x = self.to_column_vector(initial_state) + else: + self.x = np.zeros((state_dim, 1)) + + if initial_covariance is not None: + self.P = initial_covariance + else: + if np.all(self.x == 0): + self.P = np.eye(state_dim) + else: + variances = np.array([uncertainty_percentage * abs(value) for value in self.x.flatten()]) + self.P = np.diag(variances) + + self.Q = process_noise_cov if process_noise_cov is not None else np.eye(state_dim) * 0.5 + self.R = measurement_noise_cov if measurement_noise_cov is not None else np.eye(observation_dim) * 0.8 + + self.F = np.eye(state_dim) + self.B = np.zeros((state_dim, control_dim)) + self.H = np.eye(observation_dim, state_dim) + + def predict(self, u): + """ + Prediction step of the EKF. + + Args: + - u (np.array): Control input vector. + """ + u = self.to_column_vector(u) + self.x = self.f(self.x, u) # ¯μt = g(ut, μt−1) + + F_jacobian = self.jacobian_f(self.x, u) # Gt + + self.P = F_jacobian @ self.P @ F_jacobian.T + self.Q # ¯Σt = Gt Σt−1 GTt + Rt + + def update(self, z): + """ + Update step of the EKF based on observation z. + + Args: + - z (np.array): Observation vector. + """ + z = self.to_column_vector(z) + z_pred = self.h(self.x) # h(¯μt) + + H_jacobian = self.jacobian_h(self.x) # Ht + + y = z - z_pred # zt - h(¯μt) + + S = H_jacobian @ self.P @ H_jacobian.T + self.R # Ht ¯Σt HTt + Qt + + K = self.P @ H_jacobian.T @ np.linalg.inv(S) # Kt + + self.x = self.x + K @ y # μt = ¯μt + Kt(zt − h(¯μt)) + + self.P = (np.eye(self.state_dim) - K @ H_jacobian) @ self.P # Σt = (I − Kt Ht) ¯Σt + + def f(self, x, u): + """ + Non-linear state transition function (robot motion). + + Args: + - x (np.array): Current state vector. + - u (np.array): Control input vector. + + Returns: + - (np.array): Predicted next state. + """ + return self.F @ x + self.B @ u + + def jacobian_f(self, x, u, epsilon=1e-5): + """ + Compute the Jacobian of the state transition function f with respect to the state x. + + Args: + - x (np.array): Current state vector. + - u (np.array): Control input vector. + + Returns: + - (np.array): Jacobian matrix of f with respect to x. + """ + if self.is_linear_f: + return self.F + + state_dim = x.shape[0] + F_jacobian = np.zeros((state_dim, state_dim)) + + for i in range(state_dim): + x_perturbed = np.copy(x) + x_perturbed[i] += epsilon + + F_jacobian[:, i] = (self.f(x_perturbed, u) - self.f(x, u)).flatten() / epsilon + + return F_jacobian + + def h(self, x): + """ + Non-linear observation function. To be defined by the specific system model. + + Args: + - x (np.array): Current state vector. + + Returns: + - (np.array): Predicted observation. + """ + return self.H @ x + + def jacobian_h(self, x, epsilon=1e-5): + """ + Compute the Jacobian of the observation function h with respect to the state x. + + Args: + - x (np.array): Current state vector. + + Returns: + - (np.array): Jacobian matrix of h with respect to x. + """ + if self.is_linear_h: + return self.H + + observation_dim = self.h(x).shape[0] + state_dim = x.shape[0] + H_jacobian = np.zeros((observation_dim, state_dim)) + + for i in range(state_dim): + x_perturbed = np.copy(x) + x_perturbed[i] += epsilon + + H_jacobian[:, i] = (self.h(x_perturbed) - self.h(x)).flatten() / epsilon + + return H_jacobian + + def get_state(self): + """ + Returns the current state estimate. + + Returns: + np.ndarray: The current state estimate vector. + """ + return self.x + + def get_covariance(self): + """ + Returns the current covariance matrix. + + Returns: + np.ndarray: The current state covariance matrix. + """ + return self.P + + def get_measurement_prediction(self): + """ + Computes the predicted measurement based on the current state estimate. + + Returns: + np.ndarray: The predicted measurement vector. + """ + return self.h(self.x) + + def to_column_vector(self, v): + """ + Converts a vector to a column vector if it isn't already. + + Args: + - v (np.array): Input vector. + + Returns: + - (np.array): Column vector. + # """ + return v.reshape(-1, 1) if v.ndim == 1 else v \ No newline at end of file diff --git a/mbodied/EKF/tests/test_ekf.py b/mbodied/EKF/tests/test_ekf.py new file mode 100644 index 0000000..e2e00d9 --- /dev/null +++ b/mbodied/EKF/tests/test_ekf.py @@ -0,0 +1,85 @@ +from mbodied.EKF.extended_kalman_filter import ExtendedKalmanFilter +from mbodied.EKF.world import world_to_vector +from mbodied.types.sense.world import World, WorldObject, BBox2D, BBox3D, Pose, PixelCoords +from mbodied.types.sense.vision import Image +from mbodied.types.motion.control import HandControl +from mbodied.EKF.trajectory import HandControlVector +import numpy as np + +def test_ekf(): + world = World( + image=Image(path="resources/color_image.png"), + objects=[ + WorldObject( + name="box", + bbox_2d=BBox2D(10, 20, 50, 60), + bbox_3d=BBox3D(0, 0, 0, 1, 1, 1), + pose=Pose(x=1.0, y=2.0, z=3.0, roll=0.1, pitch=0.2, yaw=0.3), + pixel_coords=PixelCoords(100, 150) + ), + WorldObject( + name="sphere", + bbox_2d=BBox2D(15, 25, 55, 65), + bbox_3d=BBox3D(1, 1, 1, 2, 2, 2), + pose=Pose(x=4.0, y=5.0, z=6.0, roll=0.4, pitch=0.5, yaw=0.6), + pixel_coords=PixelCoords(110, 160) + ) + ] + ) + + hand = HandControl.unflatten([20, 30, 10, 40, 50, 70, 1]) + + world_vector = world_to_vector(world) + motion_vector = HandControlVector(hand).to_vector() + state_vector = np.concatenate([world_vector, motion_vector]) + + state_dim = state_vector.size + control_dim = motion_vector.size + observation_dim = state_vector.size + Q = np.eye(state_dim) * 0.1 + R = np.eye(observation_dim) * 0.5 + + ekf = ExtendedKalmanFilter(state_dim, control_dim, observation_dim, initial_state=state_vector, process_noise_cov=Q, measurement_noise_cov=R) + + num_iterations = 10 + innovations = [] + state_estimates = [] + robot_poses = [] + + for i in range(num_iterations): + print(f"\n--- Iteration {i + 1} ---") + + control_input = np.random.randint(1, 100, size=7) + + ekf.predict(control_input) + + predicted_state = ekf.get_state().flatten() + print(f"Predicted State: {predicted_state}") + + noise = np.random.normal(0, 0.1, observation_dim) + simulated_observation = predicted_state + noise + print(f"Simulated Observation: {simulated_observation}") + + ekf.update(simulated_observation) + + updated_state = ekf.get_state().flatten() + print(f"Updated State Estimate: {updated_state}") + + predicted_measurement = ekf.get_measurement_prediction() + innovation = simulated_observation - predicted_measurement + innovations.append(innovation) + print(f"Innovation: {innovation}") + + state_estimates.append(updated_state) + + robot_pose = { + "position": updated_state[-7:-4], + "orientation": updated_state[-4:-1] + } + robot_poses.append(robot_pose) + + print(f"Robot POSE: {robot_poses}") + + return state_estimates, innovations + +test_ekf() diff --git a/mbodied/EKF/tests/test_tree.py b/mbodied/EKF/tests/test_tree.py new file mode 100644 index 0000000..934cc60 --- /dev/null +++ b/mbodied/EKF/tests/test_tree.py @@ -0,0 +1,60 @@ +import unittest +from unittest import mock +from typing import List +from mbodied.EKF.tree_of_thought import TreeOfThought +from mbodied.agents.language import LanguageAgent + +@mock.patch( + "mbodied.agents.language.language_agent.LanguageAgent.act", + side_effect = [ + '["move hand to the right", "close hand"]', + '["move hand forward", "open hand"]', + '["rotate wrist clockwise", "close hand"]', + '["move hand up", "open hand"]', + '["move hand down", "close hand"]', + '["slide hand left", "hold position"]', + '["move hand diagonally right-up", "open hand"]', + '["move hand diagonally left-down", "close hand"]', + '["shake hand", "open hand"]', + '["tap surface lightly", "open hand"]', + '["clench fist", "hold position"]', + '["move hand back", "close hand"]', + '["wave hand", "open hand"]', + '["stretch fingers", "hold position"]', + '["move hand in a circle", "close hand"]', + None +] +) +def test_tree_of_thought(mock_act): + + language_agent = mock.Mock() + + language_agent.act.side_effect = [ + '["move hand to the right", "close hand"]', + '["move hand forward", "open hand"]', + ] + + def parse_json_output(response: str) -> List[str]: + return eval(response) + + language_agent = LanguageAgent() + + tot = TreeOfThought(language_agent, depth=2) + + root = tot.generate_thoughts("What should the robot do next?", parse_function=parse_json_output) + + assert root.state == "move hand to the right", f"Expected root state to be 'move hand to the right', got {root.state}" + + assert len(root.children) == 1, f"Expected root to have 2 children, got {len(root.children)}" + + first_child = root.children[0] + assert first_child.state == "close hand", f"Expected first child state to be 'close hand', got {first_child.state}" + + nested_child = root.children[0].children[0] + assert nested_child.state == "move hand forward", f"Expected second child state to be 'move hand forward', got {nested_child.state}" + + tot.traverse_tree(root) + + +if __name__ == "__main__": + unittest.main() diff --git a/mbodied/EKF/trajectory/__init__.py b/mbodied/EKF/trajectory/__init__.py new file mode 100644 index 0000000..8dd1443 --- /dev/null +++ b/mbodied/EKF/trajectory/__init__.py @@ -0,0 +1,25 @@ +from .motion_vector import ( + LocationAngleVector, + PoseVector, + JointControlVector, + FullJointControlVector, + HandControlVector, + HeadControlVector, + MobileSingleHandControlVector, + MobileSingleArmControlVector, + MobileBimanualArmControlVector, + HumanoidControlVector +) + +__all__ = [ + "LocationAngleVector", + "PoseVector", + "JointControlVector", + "FullJointControlVector", + "HandControlVector", + "HeadControlVector", + "MobileSingleHandControlVector", + "MobileSingleArmControlVector", + "MobileBimanualArmControlVector", + "HumanoidControlVector" +] \ No newline at end of file diff --git a/mbodied/EKF/trajectory/motion_vector.py b/mbodied/EKF/trajectory/motion_vector.py new file mode 100644 index 0000000..81f41e7 --- /dev/null +++ b/mbodied/EKF/trajectory/motion_vector.py @@ -0,0 +1,129 @@ +import numpy as np +from mbodied.types.motion.control import ( + LocationAngle, + Pose, + JointControl, + FullJointControl, + HandControl, + HeadControl, + MobileSingleHandControl, + MobileSingleArmControl, + MobileBimanualArmControl, + HumanoidControl +) + + +class LocationAngleVector: + def __init__(self, base: LocationAngle): + self.base = base + + def to_vector(self): + return np.array([ + self.base.x, + self.base.y, + self.base.theta + ]) + + +class PoseVector: + def __init__(self, pose: Pose): + self.pose = pose + + def to_vector(self): + return np.array([ + self.pose.x, + self.pose.y, + self.pose.z, + self.pose.roll, + self.pose.pitch, + self.pose.yaw + ]) + + +class JointControlVector: + def __init__(self, joint_control: JointControl): + self.joint_control = joint_control + + def to_vector(self): + return np.array([self.joint_control.value]) + + +class FullJointControlVector: + def __init__(self, full_joint_control: FullJointControl): + self.full_joint_control = full_joint_control + + def to_vector(self): + joint_values = [joint.value for joint in self.full_joint_control.joints] + return np.array(joint_values) + + +class HandControlVector: + def __init__(self, hand_control: HandControl): + self.hand_control = hand_control + + def to_vector(self): + pose_vector = PoseVector(self.hand_control.pose).to_vector() + grasp_value = np.array([self.hand_control.grasp.value]) + return np.concatenate([pose_vector, grasp_value]) + + +class HeadControlVector: + def __init__(self, head_control: HeadControl): + self.head_control = head_control + + def to_vector(self): + return np.array([ + self.head_control.tilt.value, + self.head_control.pan.value + ]) + + +class MobileSingleHandControlVector: + def __init__(self, mobile_control: MobileSingleHandControl): + self.mobile_control = mobile_control + + def to_vector(self): + base_vector = LocationAngleVector(self.mobile_control.base).to_vector() if self.mobile_control.base else np.array([]) + hand_vector = HandControlVector(self.mobile_control.hand).to_vector() if self.mobile_control.hand else np.array([]) + head_vector = HeadControlVector(self.mobile_control.head).to_vector() if self.mobile_control.head else np.array([]) + + return np.concatenate([base_vector, hand_vector, head_vector]) + + +class MobileSingleArmControlVector: + def __init__(self, mobile_control: MobileSingleArmControl): + self.mobile_control = mobile_control + + def to_vector(self): + base_vector = LocationAngleVector(self.mobile_control.base).to_vector() if self.mobile_control.base else np.array([]) + arm_vector = FullJointControlVector(self.mobile_control.arm).to_vector() if self.mobile_control.arm else np.array([]) + head_vector = HeadControlVector(self.mobile_control.head).to_vector() if self.mobile_control.head else np.array([]) + + return np.concatenate([base_vector, arm_vector, head_vector]) + + +class MobileBimanualArmControlVector: + def __init__(self, mobile_bimanual_control: MobileBimanualArmControl): + self.mobile_bimanual_control = mobile_bimanual_control + + def to_vector(self): + base_vector = LocationAngleVector(self.mobile_bimanual_control.base).to_vector() if self.mobile_bimanual_control.base else np.array([]) + left_arm_vector = FullJointControlVector(self.mobile_bimanual_control.left_arm).to_vector() if self.mobile_bimanual_control.left_arm else np.array([]) + right_arm_vector = FullJointControlVector(self.mobile_bimanual_control.right_arm).to_vector() if self.mobile_bimanual_control.right_arm else np.array([]) + head_vector = HeadControlVector(self.mobile_bimanual_control.head).to_vector() if self.mobile_bimanual_control.head else np.array([]) + + return np.concatenate([base_vector, left_arm_vector, right_arm_vector, head_vector]) + + +class HumanoidControlVector: + def __init__(self, humanoid_control: HumanoidControl): + self.humanoid_control = humanoid_control + + def to_vector(self): + left_arm_vector = FullJointControlVector(self.humanoid_control.left_arm).to_vector() if self.humanoid_control.left_arm else np.array([]) + right_arm_vector = FullJointControlVector(self.humanoid_control.right_arm).to_vector() if self.humanoid_control.right_arm else np.array([]) + left_leg_vector = FullJointControlVector(self.humanoid_control.left_leg).to_vector() if self.humanoid_control.left_leg else np.array([]) + right_leg_vector = FullJointControlVector(self.humanoid_control.right_leg).to_vector() if self.humanoid_control.right_leg else np.array([]) + head_vector = HeadControlVector(self.humanoid_control.head).to_vector() if self.humanoid_control.head else np.array([]) + + return np.concatenate([left_arm_vector, right_arm_vector, left_leg_vector, right_leg_vector, head_vector]) diff --git a/mbodied/EKF/tree_of_thought/__init__.py b/mbodied/EKF/tree_of_thought/__init__.py new file mode 100644 index 0000000..910762f --- /dev/null +++ b/mbodied/EKF/tree_of_thought/__init__.py @@ -0,0 +1,3 @@ +from .tree_of_thought import TreeNode, TreeOfThought + +__all__ = ["TreeNode", "TreeOfThought"] \ No newline at end of file diff --git a/mbodied/EKF/tree_of_thought/tree_of_thought.py b/mbodied/EKF/tree_of_thought/tree_of_thought.py new file mode 100644 index 0000000..4a872d7 --- /dev/null +++ b/mbodied/EKF/tree_of_thought/tree_of_thought.py @@ -0,0 +1,190 @@ +from typing import List, Callable, Optional, Any +from mbodied.agents.language import LanguageAgent + +class TreeNode: + """ + A class representing a node in the Tree of Thought. + Reference: https://arxiv.org/pdf/2305.10601 + + Attributes: + ----------- + state : str + The state or thought represented by this node. + children : List[TreeNode] + The list of child nodes, representing further thoughts or decisions. + """ + def __init__(self, state: str) -> None: + """ + Initialize the TreeNode with the given state. + + Parameters: + ----------- + state : str + The thought or state at this node. + """ + self.state = state + self.children: List[TreeNode] = [] + + def add_child(self, child_state: str) -> 'TreeNode': + """ + Add a child node with the given state to this node. + + Parameters: + ----------- + child_state : str + The state or thought to add as a child. + + Returns: + -------- + TreeNode + The newly added child node. + """ + child_node = TreeNode(child_state) + self.children.append(child_node) + return child_node + +class TreeOfThought: + """ + A class that generates and manages a Tree of Thought (ToT) using a LanguageAgent. + + Attributes: + ----------- + language_agent : LanguageAgent + The language agent used to generate decisions or text. + depth : int + The maximum depth of the tree to explore (i.e., how many steps forward to think). + root : Optional[TreeNode] + The root node of the decision tree, representing the initial thought. + """ + + def __init__(self, language_agent: 'LanguageAgent', depth: int = 3) -> None: + """ + Initialize the Tree of Thought with a given LanguageAgent. + + Parameters: + ----------- + language_agent : LanguageAgent + The agent used to generate decisions or thoughts. + depth : int, optional + The maximum depth of the thought tree to expand (default is 3). + """ + self.language_agent = language_agent + self.depth = depth + self.root: Optional[TreeNode] = None + self.parse_function = None + + def generate_thoughts( + self, + prompt: str, + parse_function: Optional[Callable[[str], List[str]]] = None, + max_depth: Optional[int] = None, + image = None + ) -> TreeNode: + """ + Generate a Tree of Thought based on the given prompt. + + Parameters: + ----------- + prompt : str + The initial prompt or command used to generate the first thought in the tree. + parse_function : Optional[Callable[[str], List[str]]], optional + A user-defined function to process the output of the LanguageAgent. + This function takes the raw output as input and returns a list of thoughts or child states. + max_depth : Optional[int], optional + The maximum depth to expand the tree (overrides self.depth if provided). + + Returns: + -------- + TreeNode + The root node of the generated Tree of Thought. + """ + if max_depth is None: + max_depth = self.depth + + initial_response: str = self.language_agent.act(prompt, image=image) + + if parse_function: + self.parse_function = parse_function + initial_thoughts: List[str] = self.parse_function(initial_response) + else: + initial_thoughts = [initial_response.strip()] + + self.root = TreeNode(initial_thoughts[0]) + + self.expand(self.root, initial_thoughts[1:], max_depth) + + return self.root + + def expand(self, node: TreeNode, thoughts: List[str], depth: int) -> None: + """ + Recursively expand the Tree of Thought from the given node. + + Parameters: + ----------- + node : TreeNode + The current node being expanded. + thoughts : List[str] + The list of thoughts or child states to expand from. + depth : int + The remaining depth to expand (i.e., how many steps ahead to think). + """ + if depth == 0 or not thoughts: + return + + for thought in thoughts: + print(thought) + child: TreeNode = node.add_child(thought) + + new_response: str = self.language_agent.act(thought) + + if new_response: + if hasattr(self, 'parse_function') and self.parse_function: + new_thoughts: List[str] = self.parse_function(new_response) + else: + new_thoughts = [new_response.strip()] + print(new_thoughts) + else: + new_thoughts = new_response + + self.expand(child, new_thoughts, depth - 1) + + def traverse_tree(self, node: Optional[TreeNode] = None, depth: int = 0) -> None: + """ + Traverse the Tree of Thought from the given node and print the decisions at each level. + + Parameters: + ----------- + node : Optional[TreeNode], optional + The starting node for traversal (default is None, which uses the root). + depth : int, optional + The current depth in the tree (used for printing indentation). + """ + if node is None: + node = self.root + + print(" " * depth + f"Thought: {node.state}") + + for child in node.children: + self.traverse_tree(child, depth + 1) + + +# Example usage: + +# if __name__ == "__main__": +# from mbodied.agents.language import LanguageAgent + +# context = [ +# {"role": "user", "content": "What should the robot do next?"}, +# {"role": "assistant", "content": "Understood!"} +# ] +# language_agent = LanguageAgent(context=context, api_key="your_api_key", model_src="openai") + +# def parse_json_output(response: str) -> List[str]: +# response = response.replace("```json", "").replace("```", "").strip() +# return json.loads(response) + +# tot = TreeOfThought(language_agent, depth=3) + +# root = tot.generate_thoughts("What should the robot do next?", parse_function=parse_json_output) + +# tot.traverse_tree(root) diff --git a/mbodied/EKF/world/__init__.py b/mbodied/EKF/world/__init__.py new file mode 100644 index 0000000..8883272 --- /dev/null +++ b/mbodied/EKF/world/__init__.py @@ -0,0 +1,3 @@ +from .world_vector import world_to_vector + +__all__ = ["world_to_vector"] \ No newline at end of file diff --git a/mbodied/EKF/world/world_vector.py b/mbodied/EKF/world/world_vector.py new file mode 100644 index 0000000..88f6ef9 --- /dev/null +++ b/mbodied/EKF/world/world_vector.py @@ -0,0 +1,75 @@ +import numpy as np +from mbodied.types.sense.world import WorldObject, World + +def world_object_to_vector(world_object: WorldObject) -> np.ndarray: + """ + Converts a WorldObject to a vector representation. + + Args: + world_object (WorldObject): The object in the world to be converted. + + Returns: + np.ndarray: A vector representing the object's state. + """ + vector = [] + + if world_object.bbox_2d: + vector.extend([world_object.bbox_2d.x1, world_object.bbox_2d.y1, + world_object.bbox_2d.x2, world_object.bbox_2d.y2]) + else: + vector.extend([0, 0, 0, 0]) + + if world_object.bbox_3d: + vector.extend([world_object.bbox_3d.x1, world_object.bbox_3d.y1, world_object.bbox_3d.z1, + world_object.bbox_3d.x2, world_object.bbox_3d.y2, world_object.bbox_3d.z2]) + else: + vector.extend([0, 0, 0, 0, 0, 0]) + + if world_object.pose: + vector.extend([ + world_object.pose.x, + world_object.pose.y, + world_object.pose.z, + world_object.pose.roll, + world_object.pose.pitch, + world_object.pose.yaw + ]) + else: + vector.extend([0, 0, 0, 0, 0, 0, 0]) + + if world_object.pixel_coords: + vector.extend([world_object.pixel_coords.u, world_object.pixel_coords.v]) + else: + vector.extend([0, 0]) + + return np.array(vector) + + +def world_to_vector(world: World, include_image: bool = False, include_depth: bool = False, include_annotated: bool = False) -> np.ndarray: + """ + Converts the entire World object to a vector representation by + processing each WorldObject. + + Args: + world (World): The world to be converted. + include_image (bool): If True, include the image array in the vector. Defaults to False. + include_depth (bool): If True, include the depth array in the vector. Defaults to False. + include_annotated (bool): If True, include the annotated array in the vector. Defaults to False. + + Returns: + np.ndarray: A vector representing the state of the world. + """ + world_vector = [] + + if include_image and world.image: + world_vector.extend(world.image.array.flatten()) + if include_depth and world.depth: + world_vector.extend(world.depth.array.flatten()) + if include_annotated and world.annotated: + world_vector.extend(world.annotated.array.flatten()) + + for obj in world.objects: + obj_vector = world_object_to_vector(obj) + world_vector.extend(obj_vector) + + return np.array(world_vector)