The Agent API provides functionality for developing autonomous AI agents within the Neurenix framework. These agents can perceive their environment, make decisions, and take actions to achieve specific goals. The module supports various agent architectures, from simple reactive agents to complex cognitive agents with planning capabilities.
Neurenix's Agent API is designed to be flexible and extensible, allowing developers to create agents for a wide range of applications, including reinforcement learning, multi-agent systems, and autonomous decision-making.
An agent in Neurenix consists of several key components:
Neurenix supports multiple agent types:
Agents interact with environments, which provide observations and rewards. Neurenix supports various environment types:
neurenix.agent.Agent(name: str = None)
Base class for all agents in Neurenix.
Parameters:
- name: Optional name for the agent
Methods:
- perceive(observation): Process an observation from the environment
- decide(): Make a decision based on current state
- act(action): Execute an action in the environment
- learn(reward): Update the agent based on received reward
- reset(): Reset the agent to its initial state
Example:
import neurenix as nx
from neurenix.agent import Agent
class MyAgent(Agent):
def __init__(self, name=None):
super().__init__(name)
self.state = None
def perceive(self, observation):
self.state = observation
return self.state
def decide(self):
# Simple decision logic
if self.state > 0:
return "move_forward"
else:
return "move_backward"
def act(self, action):
print(f"Executing action: {action}")
return action
def learn(self, reward):
print(f"Received reward: {reward}")
def reset(self):
self.state = None
# Create an agent
agent = MyAgent("my_agent")
# Use the agent
observation = 1.0
state = agent.perceive(observation)
action = agent.decide()
agent.act(action)
agent.learn(0.5)
agent.reset()
neurenix.agent.ReactiveAgent(
policy: Callable[[Any], Any],
name: str = None
)
A simple reactive agent that maps observations directly to actions.
Parameters:
- policy: Function mapping observations to actions
- name: Optional name for the agent
Example:
from neurenix.agent import ReactiveAgent
# Define a simple policy
def simple_policy(observation):
if observation > 0:
return "move_forward"
else:
return "move_backward"
# Create a reactive agent
agent = ReactiveAgent(simple_policy, "reactive_agent")
# Use the agent
observation = 1.0
agent.perceive(observation)
action = agent.decide()
agent.act(action)
neurenix.agent.DeliberativeAgent(
policy: Callable[[Any], Any],
planner: Callable[[Any], List[Any]],
name: str = None
)
An agent that plans ahead before taking actions.
Parameters:
- policy: Function mapping states to actions
- planner: Function generating action plans
- name: Optional name for the agent
Methods:
- plan(): Generate a plan based on current state
Example:
from neurenix.agent import DeliberativeAgent
# Define a policy
def policy(state):
return state["plan"][0] if state["plan"] else "wait"
# Define a planner
def planner(state):
if state["goal"] == "reach_target":
return ["move_forward", "turn_right", "move_forward"]
return []
# Create a deliberative agent
agent = DeliberativeAgent(policy, planner, "deliberative_agent")
# Use the agent
state = {"goal": "reach_target", "position": [0, 0], "plan": []}
agent.perceive(state)
agent.plan() # Generate a plan
action = agent.decide()
agent.act(action)
neurenix.agent.LearningAgent(
policy: Callable[[Any], Any],
learning_algorithm: str,
learning_rate: float = 0.1,
name: str = None
)
An agent that improves its policy through experience.
Parameters:
- policy: Initial policy function
- learning_algorithm: Algorithm for learning ("q_learning", "sarsa", etc.)
- learning_rate: Rate of learning
- name: Optional name for the agent
Methods:
- update_policy(state, action, reward, next_state): Update the policy based on experience
Example:
from neurenix.agent import LearningAgent
# Define an initial policy
def initial_policy(state):
# Random initial policy
import random
actions = ["up", "down", "left", "right"]
return random.choice(actions)
# Create a learning agent
agent = LearningAgent(
policy=initial_policy,
learning_algorithm="q_learning",
learning_rate=0.1,
name="learning_agent"
)
# Use the agent
state = {"position": [0, 0]}
agent.perceive(state)
action = agent.decide()
agent.act(action)
reward = 0.5
next_state = {"position": [0, 1]}
agent.update_policy(state, action, reward, next_state)
neurenix.agent.Environment(name: str = None)
Base class for environments in which agents operate.
Parameters:
- name: Optional name for the environment
Methods:
- reset(): Reset the environment to its initial state
- step(action): Update the environment based on an action
- get_observation(): Get the current observation
- get_reward(action): Get the reward for an action
- is_done(): Check if the episode is complete
Example:
from neurenix.agent import Environment
class GridWorld(Environment):
def __init__(self, width=5, height=5, name=None):
super().__init__(name)
self.width = width
self.height = height
self.agent_pos = [0, 0]
self.target_pos = [width-1, height-1]
def reset(self):
self.agent_pos = [0, 0]
return self.get_observation()
def step(self, action):
if action == "up":
self.agent_pos[1] = max(0, self.agent_pos[1] - 1)
elif action == "down":
self.agent_pos[1] = min(self.height - 1, self.agent_pos[1] + 1)
elif action == "left":
self.agent_pos[0] = max(0, self.agent_pos[0] - 1)
elif action == "right":
self.agent_pos[0] = min(self.width - 1, self.agent_pos[0] + 1)
observation = self.get_observation()
reward = self.get_reward(action)
done = self.is_done()
return observation, reward, done
def get_observation(self):
return {
"agent_position": self.agent_pos,
"target_position": self.target_pos
}
def get_reward(self, action):
if self.agent_pos == self.target_pos:
return 1.0
return -0.01 # Small penalty for each step
def is_done(self):
return self.agent_pos == self.target_pos
# Create an environment
env = GridWorld(width=5, height=5, name="grid_world")
# Reset the environment
observation = env.reset()
# Take a step
action = "right"
next_observation, reward, done = env.step(action)
neurenix.agent.AgentManager(name: str = None)
Manages multiple agents in a shared environment.
Parameters:
- name: Optional name for the manager
Methods:
- add_agent(agent): Add an agent to the manager
- remove_agent(agent): Remove an agent from the manager
- set_environment(environment): Set the shared environment
- step(): Advance all agents by one step
- run(steps): Run the simulation for a specified number of steps
Example:
from neurenix.agent import AgentManager, ReactiveAgent, Environment
# Create agents
agent1 = ReactiveAgent(lambda obs: "right", "agent1")
agent2 = ReactiveAgent(lambda obs: "left", "agent2")
# Create environment
env = GridWorld(width=10, height=10, name="shared_grid")
# Create manager
manager = AgentManager("grid_manager")
manager.add_agent(agent1)
manager.add_agent(agent2)
manager.set_environment(env)
# Run simulation
manager.run(steps=100)
| Feature | Neurenix | TensorFlow |
|---|---|---|
| Agent Framework | Native | TF-Agents |
| Agent Types | Reactive, Deliberative, Hybrid, Learning | Primarily RL-based |
| Environment Integration | Flexible, custom environments | Gym-compatible environments |
| Multi-Agent Support | Native | Limited |
| Planning Capabilities | Built-in | Limited |
Neurenix provides a more comprehensive agent framework compared to TensorFlow's TF-Agents, with support for various agent architectures beyond reinforcement learning. While TF-Agents focuses primarily on RL algorithms, Neurenix's Agent API supports a wider range of agent types, including deliberative agents with planning capabilities and hybrid architectures.
| Feature | Neurenix | PyTorch |
|---|---|---|
| Agent Framework | Native | Third-party libraries |
| Agent Types | Reactive, Deliberative, Hybrid, Learning | Depends on library |
| Environment Integration | Flexible, custom environments | Depends on library |
| Multi-Agent Support | Native | Limited |
| Planning Capabilities | Built-in | Limited |
PyTorch does not provide a native agent framework, relying instead on third-party libraries like RLlib or PyTorch-RL. Neurenix's integrated Agent API offers a more cohesive experience, with consistent interfaces and better integration with the rest of the framework.
| Feature | Neurenix | Scikit-Learn |
|---|---|---|
| Agent Framework | Native | None |
| Agent Types | Reactive, Deliberative, Hybrid, Learning | N/A |
| Environment Integration | Flexible, custom environments | N/A |
| Multi-Agent Support | Native | N/A |
| Planning Capabilities | Built-in | N/A |
Scikit-Learn does not provide agent-based functionality, focusing instead on traditional machine learning algorithms. Neurenix fills this gap with its comprehensive Agent API, enabling the development of autonomous agents for various applications.
Design agents with clear separation of concerns:
from neurenix.agent import Agent
class WellDesignedAgent(Agent):
def __init__(self, name=None):
super().__init__(name)
self.perception_module = PerceptionModule()
self.decision_module = DecisionModule()
self.action_module = ActionModule()
self.learning_module = LearningModule()
def perceive(self, observation):
return self.perception_module.process(observation)
def decide(self):
return self.decision_module.select_action(self.state)
def act(self, action):
return self.action_module.execute(action)
def learn(self, reward):
self.learning_module.update(self.state, self.action, reward)
Use a consistent pattern for environment interaction:
# Create agent and environment
agent = MyAgent()
env = MyEnvironment()
# Reset environment
observation = env.reset()
done = False
while not done:
# Agent perceives the environment
agent.perceive(observation)
# Agent decides on an action
action = agent.decide()
# Agent acts in the environment
agent.act(action)
# Environment updates
observation, reward, done = env.step(action)
# Agent learns from the experience
agent.learn(reward)
For multi-agent systems, establish clear communication protocols:
from neurenix.agent import Agent, AgentManager
class CommunicatingAgent(Agent):
def __init__(self, name=None):
super().__init__(name)
self.messages = []
def send_message(self, recipient, content):
return {"sender": self.name, "recipient": recipient, "content": content}
def receive_message(self, message):
self.messages.append(message)
def decide(self):
# Consider messages in decision-making
# ...
return action
# Create a manager that handles communication
class CommunicatingManager(AgentManager):
def __init__(self, name=None):
super().__init__(name)
self.message_queue = []
def step(self):
# Collect messages from all agents
for agent in self.agents:
if hasattr(agent, "send_message"):
messages = agent.decide_messages()
self.message_queue.extend(messages)
# Deliver messages to recipients
for message in self.message_queue:
recipient_name = message["recipient"]
for agent in self.agents:
if agent.name == recipient_name and hasattr(agent, "receive_message"):
agent.receive_message(message)
self.message_queue = []
# Regular step
super().step()
import neurenix as nx
from neurenix.agent import ReactiveAgent, Environment
# Define a simple grid environment
class GridEnvironment(Environment):
def __init__(self, width=5, height=5):
super().__init__("grid")
self.width = width
self.height = height
self.agent_pos = [0, 0]
self.target_pos = [width-1, height-1]
def reset(self):
self.agent_pos = [0, 0]
return self.get_observation()
def step(self, action):
if action == "up":
self.agent_pos[1] = max(0, self.agent_pos[1] - 1)
elif action == "down":
self.agent_pos[1] = min(self.height - 1, self.agent_pos[1] + 1)
elif action == "left":
self.agent_pos[0] = max(0, self.agent_pos[0] - 1)
elif action == "right":
self.agent_pos[0] = min(self.width - 1, self.agent_pos[0] + 1)
observation = self.get_observation()
reward = self.get_reward(action)
done = self.is_done()
return observation, reward, done
def get_observation(self):
return {
"agent_position": self.agent_pos,
"target_position": self.target_pos
}
def get_reward(self, action):
if self.agent_pos == self.target_pos:
return 1.0
return -0.01
def is_done(self):
return self.agent_pos == self.target_pos
# Define a simple policy
def simple_policy(observation):
agent_pos = observation["agent_position"]
target_pos = observation["target_position"]
# Move horizontally first, then vertically
if agent_pos[0] < target_pos[0]:
return "right"
elif agent_pos[0] > target_pos[0]:
return "left"
elif agent_pos[1] < target_pos[1]:
return "down"
elif agent_pos[1] > target_pos[1]:
return "up"
else:
return "stay"
# Create the agent and environment
agent = ReactiveAgent(simple_policy, "grid_agent")
env = GridEnvironment(width=5, height=5)
# Run the simulation
observation = env.reset()
done = False
total_reward = 0
while not done:
# Agent perceives the environment
agent.perceive(observation)
# Agent decides on an action
action = agent.decide()
# Agent acts in the environment
agent.act(action)
# Environment updates
observation, reward, done = env.step(action)
total_reward += reward
print(f"Position: {env.agent_pos}, Action: {action}, Reward: {reward}")
print(f"Total reward: {total_reward}")
import neurenix as nx
import numpy as np
from neurenix.agent import Agent, Environment
# Define a Q-learning agent
class QLearningAgent(Agent):
def __init__(self, state_size, action_size, learning_rate=0.1, discount_factor=0.9, exploration_rate=1.0, name=None):
super().__init__(name)
self.state_size = state_size
self.action_size = action_size
self.learning_rate = learning_rate
self.discount_factor = discount_factor
self.exploration_rate = exploration_rate
self.exploration_decay = 0.995
self.min_exploration_rate = 0.01
# Initialize Q-table
self.q_table = np.zeros((state_size, action_size))
self.state = None
self.action = None
def perceive(self, observation):
self.state = observation
return self.state
def decide(self):
# Exploration-exploitation trade-off
if np.random.rand() < self.exploration_rate:
# Explore: choose a random action
self.action = np.random.randint(self.action_size)
else:
# Exploit: choose the best action based on Q-values
self.action = np.argmax(self.q_table[self.state])
return self.action
def act(self, action):
return action
def learn(self, reward, next_state, done):
# Q-learning update
best_next_action = np.argmax(self.q_table[next_state])
td_target = reward + self.discount_factor * self.q_table[next_state, best_next_action] * (1 - done)
td_error = td_target - self.q_table[self.state, self.action]
self.q_table[self.state, self.action] += self.learning_rate * td_error
# Update exploration rate
if done:
self.exploration_rate = max(self.min_exploration_rate, self.exploration_rate * self.exploration_decay)
def reset(self):
self.state = None
self.action = None
# Define a simple grid environment
class GridEnvironment(Environment):
def __init__(self, width=5, height=5):
super().__init__("grid")
self.width = width
self.height = height
self.agent_pos = [0, 0]
self.target_pos = [width-1, height-1]
# Define state space as a flattened grid
self.state_size = width * height
self.action_size = 4 # up, down, left, right
def reset(self):
self.agent_pos = [0, 0]
return self.get_state()
def step(self, action):
# Convert action index to direction
if action == 0: # up
self.agent_pos[1] = max(0, self.agent_pos[1] - 1)
elif action == 1: # down
self.agent_pos[1] = min(self.height - 1, self.agent_pos[1] + 1)
elif action == 2: # left
self.agent_pos[0] = max(0, self.agent_pos[0] - 1)
elif action == 3: # right
self.agent_pos[0] = min(self.width - 1, self.agent_pos[0] + 1)
state = self.get_state()
reward = self.get_reward(action)
done = self.is_done()
return state, reward, done
def get_state(self):
# Convert 2D position to 1D state index
return self.agent_pos[1] * self.width + self.agent_pos[0]
def get_reward(self, action):
if self.agent_pos == self.target_pos:
return 1.0
return -0.01
def is_done(self):
return self.agent_pos == self.target_pos
# Create the agent and environment
env = GridEnvironment(width=5, height=5)
agent = QLearningAgent(
state_size=env.state_size,
action_size=env.action_size,
learning_rate=0.1,
discount_factor=0.9,
exploration_rate=1.0,
name="q_learning_agent"
)
# Training loop
num_episodes = 1000
max_steps = 100
for episode in range(num_episodes):
state = env.reset()
agent.perceive(state)
total_reward = 0
done = False
step = 0
while not done and step < max_steps:
# Agent decides on an action
action = agent.decide()
# Agent acts in the environment
agent.act(action)
# Environment updates
next_state, reward, done = env.step(action)
total_reward += reward
# Agent learns from the experience
agent.learn(reward, next_state, done)
# Update state
state = next_state
agent.perceive(state)
step += 1
if episode % 100 == 0:
print(f"Episode {episode}, Total Reward: {total_reward}, Steps: {step}, Exploration Rate: {agent.exploration_rate:.4f}")
# Test the trained agent
state = env.reset()
agent.perceive(state)
agent.exploration_rate = 0 # No exploration during testing
done = False
total_reward = 0
step = 0
print("\nTesting the trained agent:")
while not done and step < max_steps:
action = agent.decide()
agent.act(action)
next_state, reward, done = env.step(action)
total_reward += reward
print(f"Step {step}, Position: {env.agent_pos}, Action: {action}, Reward: {reward}")
state = next_state
agent.perceive(state)
step += 1
print(f"Total reward: {total_reward}, Steps: {step}")
The Agent API in Neurenix provides a comprehensive framework for developing autonomous AI agents. With support for various agent architectures, from simple reactive agents to complex learning agents, the API enables developers to create intelligent systems for a wide range of applications.
Compared to other frameworks, Neurenix's Agent API offers unique advantages in terms of flexibility, integration with the rest of the framework, and support for advanced agent capabilities like planning and multi-agent coordination. These features make Neurenix particularly well-suited for developing autonomous AI systems.