Loading...
# langgraph_workflow.py
from langgraph.graph import StateGraph, END
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import HumanMessage, AIMessage
from typing import TypedDict, Annotated, Sequence
import operator
class AgentState(TypedDict):
"""State schema for multi-agent workflow"""
messages: Annotated[Sequence[HumanMessage | AIMessage], operator.add]
current_agent: str
context: dict
research_results: list
code_output: str
review_status: str
def researcher_node(state: AgentState) -> AgentState:
"""Research agent node - gathers information"""
llm = ChatAnthropic(model="claude-sonnet-4-5", temperature=0.3)
research_prompt = f"""
You are a research specialist. Based on this request:
{state['messages'][-1].content}
Conduct thorough research and provide:
1. Key concepts and technologies involved
2. Best practices and patterns
3. Potential challenges and solutions
4. Relevant documentation and examples
"""
response = llm.invoke([HumanMessage(content=research_prompt)])
state['research_results'].append({
'agent': 'researcher',
'findings': response.content
})
state['current_agent'] = 'planner'
return state
def planner_node(state: AgentState) -> AgentState:
"""Planning agent node - creates execution plan"""
llm = ChatAnthropic(model="claude-sonnet-4-5", temperature=0.2)
planning_prompt = f"""
Based on research findings:
{state['research_results'][-1]['findings']}
Create a detailed implementation plan:
1. Break down into specific tasks
2. Identify dependencies
3. Suggest optimal execution order
4. Define success criteria
"""
response = llm.invoke([HumanMessage(content=planning_prompt)])
state['messages'].append(AIMessage(content=response.content))
state['current_agent'] = 'coder'
return state
def coder_node(state: AgentState) -> AgentState:
"""Coding agent node - implements solution"""
llm = ChatAnthropic(model="claude-sonnet-4-5", temperature=0.1)
coding_prompt = f"""
Implementation plan:
{state['messages'][-1].content}
Write production-ready code:
1. Follow best practices from research
2. Include error handling
3. Add comprehensive comments
4. Implement all planned features
"""
response = llm.invoke([HumanMessage(content=coding_prompt)])
state['code_output'] = response.content
state['current_agent'] = 'reviewer'
return state
def reviewer_node(state: AgentState) -> AgentState:
"""Review agent node - validates implementation"""
llm = ChatAnthropic(model="claude-sonnet-4-5", temperature=0.2)
review_prompt = f"""
Review this implementation:
{state['code_output']}
Check for:
1. Code quality and best practices
2. Error handling and edge cases
3. Performance considerations
4. Security vulnerabilities
5. Documentation completeness
Provide: APPROVED or NEEDS_REVISION with specific feedback
"""
response = llm.invoke([HumanMessage(content=review_prompt)])
state['review_status'] = 'APPROVED' if 'APPROVED' in response.content else 'NEEDS_REVISION'
state['messages'].append(AIMessage(content=response.content))
return state
def should_revise(state: AgentState) -> str:
"""Conditional routing - revise or complete"""
if state['review_status'] == 'NEEDS_REVISION':
return 'coder' # Send back to coder
return 'end'
# Build the workflow graph
workflow = StateGraph(AgentState)
# Add nodes
workflow.add_node('researcher', researcher_node)
workflow.add_node('planner', planner_node)
workflow.add_node('coder', coder_node)
workflow.add_node('reviewer', reviewer_node)
# Define edges
workflow.set_entry_point('researcher')
workflow.add_edge('researcher', 'planner')
workflow.add_edge('planner', 'coder')
workflow.add_edge('coder', 'reviewer')
# Conditional edge for revision loop
workflow.add_conditional_edges(
'reviewer',
should_revise,
{
'coder': 'coder',
'end': END
}
)
# Compile the graph
app = workflow.compile()
# Execute workflow
initial_state = {
'messages': [HumanMessage(content="Build a REST API for user authentication with JWT")],
'current_agent': 'researcher',
'context': {},
'research_results': [],
'code_output': '',
'review_status': ''
}
result = app.invoke(initial_state)
print(f"Final output: {result['code_output']}")
print(f"Review: {result['review_status']}")# crewai_orchestration.py
from crewai import Agent, Task, Crew, Process
from langchain_anthropic import ChatAnthropic
from langchain_community.tools import DuckDuckGoSearchRun
from langchain.tools import tool
# Initialize LLM
llm = ChatAnthropic(model="claude-sonnet-4-5", temperature=0.3)
# Define custom tools
@tool
def code_analyzer(code: str) -> str:
"""Analyze code for quality, security, and performance issues"""
# Implementation here
return f"Analysis results for code: {code[:100]}..."
@tool
def test_generator(code: str) -> str:
"""Generate comprehensive test cases for given code"""
# Implementation here
return f"Generated tests for: {code[:100]}..."
# Define agents with specific roles
research_agent = Agent(
role='Senior Research Analyst',
goal='Conduct thorough research on technical topics and provide comprehensive insights',
backstory="""You are a seasoned research analyst with expertise in software
architecture and emerging technologies. You excel at gathering information
from multiple sources and synthesizing it into actionable insights.""",
tools=[DuckDuckGoSearchRun()],
llm=llm,
verbose=True,
allow_delegation=False
)
architect_agent = Agent(
role='Software Architect',
goal='Design scalable, maintainable system architectures',
backstory="""You are an experienced software architect who specializes in
designing distributed systems. You consider scalability, security, and
maintainability in every design decision.""",
llm=llm,
verbose=True,
allow_delegation=True
)
developer_agent = Agent(
role='Senior Full-Stack Developer',
goal='Implement high-quality, production-ready code',
backstory="""You are a senior developer with 10+ years of experience. You
write clean, well-tested code following SOLID principles and best practices.
You always include error handling and comprehensive documentation.""",
tools=[code_analyzer],
llm=llm,
verbose=True,
allow_delegation=False
)
qa_agent = Agent(
role='QA Engineer',
goal='Ensure code quality through comprehensive testing',
backstory="""You are a meticulous QA engineer who believes in thorough testing.
You create comprehensive test suites covering unit, integration, and edge cases.
You catch bugs before they reach production.""",
tools=[test_generator, code_analyzer],
llm=llm,
verbose=True,
allow_delegation=False
)
devops_agent = Agent(
role='DevOps Engineer',
goal='Create robust CI/CD pipelines and deployment strategies',
backstory="""You are a DevOps expert focused on automation and reliability.
You design CI/CD pipelines, implement monitoring, and ensure smooth deployments
with zero downtime.""",
llm=llm,
verbose=True,
allow_delegation=False
)
# Define sequential tasks
research_task = Task(
description="""Research best practices for building a scalable microservices
architecture with Node.js, including:
1. Service communication patterns
2. Data consistency strategies
3. Authentication and authorization
4. Monitoring and observability
Provide a comprehensive research report.""",
agent=research_agent,
expected_output="Detailed research report with best practices and recommendations"
)
architecture_task = Task(
description="""Based on the research findings, design a complete microservices
architecture including:
1. Service boundaries and responsibilities
2. Communication protocols (REST, gRPC, message queues)
3. Data storage strategy
4. Security architecture
5. Scalability considerations
Create detailed architecture diagrams and documentation.""",
agent=architect_agent,
expected_output="Complete architecture design with diagrams and documentation"
)
implementation_task = Task(
description="""Implement the core services based on the architecture design:
1. User service with authentication
2. API Gateway with rate limiting
3. Service discovery and registration
4. Shared middleware and utilities
Include comprehensive error handling and logging.""",
agent=developer_agent,
expected_output="Production-ready code for core microservices"
)
testing_task = Task(
description="""Create comprehensive test suite for all implemented services:
1. Unit tests for business logic
2. Integration tests for service communication
3. End-to-end tests for critical flows
4. Performance and load tests
Ensure >80% code coverage.""",
agent=qa_agent,
expected_output="Complete test suite with coverage reports"
)
deployment_task = Task(
description="""Design and implement CI/CD pipeline:
1. Automated builds and tests
2. Docker containerization
3. Kubernetes deployment manifests
4. Monitoring and alerting setup
5. Blue-green deployment strategy
Include deployment documentation.""",
agent=devops_agent,
expected_output="Complete CI/CD pipeline with deployment documentation"
)
# Create crew with sequential process
crew = Crew(
agents=[research_agent, architect_agent, developer_agent, qa_agent, devops_agent],
tasks=[research_task, architecture_task, implementation_task, testing_task, deployment_task],
process=Process.sequential,
verbose=True
)
# Execute the crew
result = crew.kickoff()
print(f"\n\nFinal Result:\n{result}")# hybrid_orchestration.py
from langgraph.graph import StateGraph, END
from crewai import Agent, Task, Crew
from typing import TypedDict, List
import asyncio
class HybridState(TypedDict):
task_description: str
research_data: dict
crew_output: str
validation_result: str
iterations: int
class HybridOrchestrator:
def __init__(self):
self.max_iterations = 3
self.graph = self._build_graph()
def _build_graph(self) -> StateGraph:
"""Build hybrid workflow graph"""
workflow = StateGraph(HybridState)
workflow.add_node('research', self.research_node)
workflow.add_node('crew_execution', self.crew_node)
workflow.add_node('validation', self.validation_node)
workflow.set_entry_point('research')
workflow.add_edge('research', 'crew_execution')
workflow.add_edge('crew_execution', 'validation')
workflow.add_conditional_edges(
'validation',
self.should_continue,
{
'crew_execution': 'crew_execution',
'end': END
}
)
return workflow.compile()
def research_node(self, state: HybridState) -> HybridState:
"""LangGraph research phase"""
# Use LangGraph for complex research workflow
state['research_data'] = {
'context': f"Research for: {state['task_description']}",
'findings': 'Comprehensive research results...'
}
return state
def crew_node(self, state: HybridState) -> HybridState:
"""CrewAI execution phase"""
# Create specialized crew based on research
agents = self._create_specialized_agents(state['research_data'])
tasks = self._create_tasks(state['research_data'])
crew = Crew(
agents=agents,
tasks=tasks,
process=Process.sequential
)
result = crew.kickoff()
state['crew_output'] = result
state['iterations'] += 1
return state
def validation_node(self, state: HybridState) -> HybridState:
"""Validation phase"""
# Validate crew output
is_valid = self._validate_output(state['crew_output'])
state['validation_result'] = 'VALID' if is_valid else 'INVALID'
return state
def should_continue(self, state: HybridState) -> str:
"""Determine if iteration should continue"""
if state['validation_result'] == 'VALID':
return 'end'
if state['iterations'] >= self.max_iterations:
return 'end'
return 'crew_execution'
def execute(self, task: str) -> str:
"""Execute hybrid orchestration"""
initial_state = {
'task_description': task,
'research_data': {},
'crew_output': '',
'validation_result': '',
'iterations': 0
}
result = self.graph.invoke(initial_state)
return result['crew_output']
# Usage
orchestrator = HybridOrchestrator()
result = orchestrator.execute(
"Build a real-time analytics dashboard with WebSocket support"
)
print(f"Final output: {result}")# agent_memory.py
from langchain.memory import ConversationBufferMemory, ConversationSummaryMemory
from langchain_anthropic import ChatAnthropic
from typing import Dict, List
import json
class AgentMemoryManager:
def __init__(self):
self.llm = ChatAnthropic(model="claude-sonnet-4-5")
self.agent_memories = {}
self.shared_context = {}
def create_agent_memory(self, agent_id: str, memory_type: str = 'buffer'):
"""Create memory for specific agent"""
if memory_type == 'buffer':
self.agent_memories[agent_id] = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True
)
elif memory_type == 'summary':
self.agent_memories[agent_id] = ConversationSummaryMemory(
llm=self.llm,
memory_key="chat_history",
return_messages=True
)
def update_shared_context(self, key: str, value: any):
"""Update shared context accessible to all agents"""
self.shared_context[key] = value
def get_agent_context(self, agent_id: str) -> Dict:
"""Get combined context for agent"""
agent_memory = self.agent_memories.get(agent_id)
context = {
'shared': self.shared_context,
'agent_history': agent_memory.load_memory_variables({}) if agent_memory else {}
}
return context
def save_interaction(self, agent_id: str, human_input: str, ai_output: str):
"""Save interaction to agent memory"""
memory = self.agent_memories.get(agent_id)
if memory:
memory.save_context(
{"input": human_input},
{"output": ai_output}
)
# Usage in multi-agent workflow
memory_manager = AgentMemoryManager()
# Create memories for each agent
for agent_id in ['researcher', 'planner', 'coder', 'reviewer']:
memory_manager.create_agent_memory(agent_id, 'summary')
# Update shared context
memory_manager.update_shared_context('project_requirements', {
'framework': 'FastAPI',
'database': 'PostgreSQL',
'auth': 'JWT'
})
# Agents access context
context = memory_manager.get_agent_context('coder')
print(f"Coder context: {context}"){
"maxTokens": 4000,
"temperature": 0.3,
"systemPrompt": "You are a multi-agent orchestration specialist focused on building complex workflows with LangGraph and CrewAI"
}LangGraph state transitions failing with cyclic dependency errors
Define StateGraph with explicit node order. Use conditional edges with return values. Avoid circular END node references. Debug with: graph.get_graph().draw_mermaid() to visualize flow.
CrewAI agents not communicating results between sequential tasks
Use Crew task context propagation. Set task.context=[previous_task] to pass outputs. Verify agent role definitions. Check: crew.kickoff() returns final task output. Enable verbose=True for debugging.
Multi-agent orchestration stuck in infinite loop or deadlock
Add max_iterations limit to graph. Implement timeout with asyncio.wait_for(). Use checkpoint persistence to resume. Set: recursion_limit=50 in graph config. Monitor state transitions with logging.
Agent coordination failing with inconsistent shared state updates
Use centralized StateManager with locking. Implement atomic state transitions. Serialize updates with queue. For LangGraph: use CompiledStateGraph.update_state(). Enable state versioning for rollback.
Memory overflow when processing large agent conversation histories
Use sliding window for context (last 10 messages). Summarize old messages. Store full history in DB. Set max_tokens per agent. Clear with: agent.memory.clear() after tasks.
Loading reviews...
AutoGen v0.4 conversation agent specialist using actor model architecture for building multi-turn dialogue systems with cross-language messaging and real-time tool invocation
MCP Skills integration specialist for remote server configuration, tool permissions, multi-MCP orchestration, and Claude Desktop ecosystem workflows.
Slash command specialist for creating and orchestrating custom Claude workflows with dynamic arguments, conditional logic, and multi-step automation.
Growing community of AI engineers actively building with Claude
Live in 5 minutes • Growing community