import os
from crewai import Agent, Task, Crew, Process
from crewai_tools import SerperDevTool
from supermemory import Supermemory
from dotenv import load_dotenv
load_dotenv()
class ResearchCrew:
def __init__(self):
self.memory = Supermemory()
self.search_tool = SerperDevTool()
def get_user_context(self, user_id: str, topic: str) -> dict:
"""Retrieve user profile and related research history."""
result = self.memory.profile(
container_tag=user_id,
q=topic,
threshold=0.5
)
return {
"expertise": result.profile.static or [],
"focus": result.profile.dynamic or [],
"history": [m.memory for m in (result.search_results.results or [])[:3]]
}
def create_researcher(self, context: dict) -> Agent:
"""Build a researcher agent with user context."""
expertise_note = ""
if context["expertise"]:
expertise_note = f"The user has this background: {', '.join(context['expertise'])}. Adjust technical depth accordingly."
history_note = ""
if context["history"]:
history_note = f"Previous research on related topics: {'; '.join(context['history'])}"
return Agent(
role="Research Analyst",
goal="Conduct research tailored to the user's expertise level",
backstory=f"""You research topics and synthesize findings into clear summaries.
{expertise_note}
{history_note}""",
tools=[self.search_tool],
verbose=True
)
def create_writer(self, context: dict) -> Agent:
"""Build a writer agent that matches user preferences."""
style_note = "Write in a clear, technical style."
for fact in context.get("expertise", []):
if "non-technical" in fact.lower():
style_note = "Write in plain language, avoiding jargon."
break
return Agent(
role="Content Writer",
goal="Transform research into readable content",
backstory=f"""You write clear, engaging content. {style_note}""",
verbose=True
)
def research(self, user_id: str, topic: str) -> str:
"""Run the research crew and store results."""
context = self.get_user_context(user_id, topic)
researcher = self.create_researcher(context)
writer = self.create_writer(context)
research_task = Task(
description=f"Research the following topic: {topic}",
expected_output="Detailed findings with sources",
agent=researcher
)
writing_task = Task(
description="Write a summary based on the research findings",
expected_output="A clear, structured summary",
agent=writer
)
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, writing_task],
process=Process.sequential,
verbose=True
)
result = crew.kickoff()
# Store for future sessions
self.memory.add(
content=f"Research on '{topic}': {str(result)[:500]}",
container_tag=user_id,
metadata={"type": "research", "topic": topic}
)
return str(result)
if __name__ == "__main__":
crew = ResearchCrew()
# Teach preferences
crew.memory.add(
content="User prefers concise summaries with bullet points",
container_tag="researcher_1"
)
# Run research
result = crew.research("researcher_1", "latest developments in AI agents")
print(result)