Based on my research, I'll create a structured POC that connects to a real Mem0 client with Neo4j as the graph store. Here's a complete implementation:
This POC demonstrates how to implement the proposed contextual AI assistant using Mem0's hybrid memory architecture with Neo4j as the graph database backend.
contextual-assistant-poc/
├── requirements.txt
├── .env
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── config.py
│ ├── memory/
│ │ ├── __init__.py
│ │ └── mem0_client.py
│ ├── ingestion/
│ │ ├── __init__.py
│ │ ├── processor.py
│ │ └── parsers/
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── messaging.py
│ │ └── github.py
│ ├── api/
│ │ ├── __init__.py
│ │ ├── routes.py
│ │ └── models.py
│ └── query/
│ ├── __init__.py
│ ├── processor.py
│ └── strategies.py
└── tests/
├── __init__.py
├── test_ingest.py
└── test_query.py
First, let's create our requirements.txt file:
# requirements.txt
fastapi==0.104.0
uvicorn==0.23.2
python-dotenv==1.0.0
mem0ai==0.1.81
httpx==0.25.0
pydantic==2.4.2
neo4j==5.14.0
Next, let's set up our environment variables in .env:
# .env
OPENAI_API_KEY=your_openai_api_key
NEO4J_URI=bolt://localhost:7687
NEO4J_USER=neo4j
NEO4J_PASSWORD=password
Let's create our configuration module:
# app/config.py
import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# API settings
API_HOST = os.getenv("API_HOST", "0.0.0.0")
API_PORT = int(os.getenv("API_PORT", "8000"))
# Neo4j settings
NEO4J_URI = os.getenv("NEO4J_URI", "bolt://localhost:7687")
NEO4J_USER = os.getenv("NEO4J_USER", "neo4j")
NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD", "password")
# OpenAI settings
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
# Mem0 settings
MEM0_API_KEY = os.getenv("MEM0_API_KEY") # Optional if using hosted Mem0
Now, let's create our Mem0 client integration with Neo4j:
# app/memory/mem0_client.py
import os
from typing import Dict, List, Any, Optional
import datetime
from mem0ai import Memory
class Mem0Client:
"""Client for interacting with Mem0 memory system with Neo4j as graph store"""
def __init__(self, user_id: str):
"""Initialize Mem0 client with Neo4j configuration"""
self.user_id = user_id
# Initialize Mem0 client with Neo4j as graph store
self.memory = Memory(
graph_store_config={
"provider": "neo4j",
"uri": os.getenv("NEO4J_URI", "bolt://localhost:7687"),
"username": os.getenv("NEO4J_USER", "neo4j"),
"password": os.getenv("NEO4J_PASSWORD", "password"),
"database": "neo4j" # Default Neo4j database
}
)
def add(self, entry: Dict[str, Any]) -> str:
"""Add a memory entry to Mem0"""
# Add timestamp if not present
if "timestamp" not in entry:
entry["timestamp"] = datetime.datetime.now().isoformat()
# Add memory to Mem0
memory_id = self.memory.add(
content=entry.get("raw_content", ""),
metadata={
"app_type": entry.get("app_type"),
"entities": entry.get("entities", []),
"relationships": entry.get("relationships", []),
"metadata": entry.get("metadata", {})
},
user_id=self.user_id
)
# Store relationships in Neo4j graph
self._store_relationships(entry.get("relationships", []), entry.get("entities", []))
return memory_id
def _store_relationships(self, relationships: List[Dict[str, Any]], entities: List[Dict[str, Any]]):
"""Store relationships in Neo4j graph"""
# Create a mapping of entity IDs to entity objects
entity_map = {entity["id"]: entity for entity in entities if "id" in entity}
# Process each relationship
for rel in relationships:
source_id = rel.get("source")
target_id = rel.get("target")
rel_type = rel.get("type")
if not all([source_id, target_id, rel_type]):
continue
# Get source and target entities
source_entity = entity_map.get(source_id)
target_entity = entity_map.get(target_id)
# If target is "me", create a special case
if target_id == "me":
target_entity = {"id": "me", "type": "user", "name": "Me"}
# Skip if either entity is missing
if not source_entity or not target_entity:
continue
# Create relationship in Neo4j via Mem0's graph store
self.memory.add_relationship(
source_id=source_id,
target_id=target_id,
relationship_type=rel_type,
metadata={
"source_type": source_entity.get("type"),
"target_type": target_entity.get("type"),
"timestamp": datetime.datetime.now().isoformat()
},
user_id=self.user_id
)
def search(self, query: str, context_type: str = "vector",
filters: Optional[Dict[str, Any]] = None, limit: int = 10) -> List[Dict[str, Any]]:
"""Search for relevant information in Mem0"""
filters = filters or {}
if context_type == "vector":
# Search in vector store
results = self.memory.search(
query=query,
user_id=self.user_id,
limit=limit,
filters=filters
)
return results
elif context_type == "graph":
# Search in graph store
# Convert filters to Cypher query conditions
conditions = []
for key, value in filters.items():
conditions.append(f"n.{key} = '{value}'")
conditions_str = " AND ".join(conditions) if conditions else ""
where_clause = f"WHERE {conditions_str}" if conditions_str else ""
# Build Cypher query based on the query intent
if "team" in query.lower() and "member" in query.lower():
# Query for team members
cypher_query = f"""
MATCH (p)-[r:member_of]->(t)
{where_clause}
RETURN p, r, t
LIMIT {limit}
"""
elif "unreplied" in query.lower() and "message" in query.lower():
# Query for unreplied messages
cypher_query = f"""
MATCH (p)-[r:authored]->(m)
WHERE m.status = 'unreplied'
{where_clause}
RETURN p, r, m
LIMIT {limit}
"""
elif "review" in query.lower() and ("pr" in query.lower() or "pull request" in query.lower()):
# Query for PRs needing review
cypher_query = f"""
MATCH (pr)
WHERE pr.type = 'pull_request' AND pr.review_status = 'pending'
{where_clause}
RETURN pr
LIMIT {limit}
"""
else:
# Generic relationship query
cypher_query = f"""
MATCH (n)-[r]->(m)
{where_clause}
RETURN n, r, m
LIMIT {limit}
"""
# Execute Cypher query via Mem0's graph store
results = self.memory.query_graph(
cypher_query=cypher_query,
user_id=self.user_id
)
return results
elif context_type == "key_value":
# Get direct key-value lookups
# In Mem0, this is done through metadata filtering
results = self.memory.search(
query=query,
user_id=self.user_id,
limit=limit,
filters=filters
)
return results
return []
def get(self, memory_id: str) -> Optional[Dict[str, Any]]:
"""Get a specific memory by ID"""
return self.memory.get(memory_id, user_id=self.user_id)
def get_many(self, memory_ids: List[str]) -> List[Dict[str, Any]]:
"""Get multiple memories by ID"""
results = []
for memory_id in memory_ids:
memory = self.get(memory_id)
if memory:
results.append(memory)
return results
Now, let's implement the ingestion pipeline:
# app/ingestion/parsers/base.py
from abc import ABC, abstractmethod
from typing import Dict, List, Any
class BaseParser(ABC):
"""Base class for screen content parsers"""
@abstractmethod
def parse(self, content: str, metadata: Dict[str, Any]) -> Dict[str, Any]:
"""Parse screen content and extract entities and relationships"""
pass
@abstractmethod
def get_app_type(self) -> str:
"""Get the application type this parser handles"""
pass
# app/ingestion/parsers/messaging.py
import uuid
from typing import Dict, List, Any
from .base import BaseParser
class MessagingParser(BaseParser):
"""Parser for messaging applications (WhatsApp, Telegram, etc.)"""
def get_app_type(self) -> str:
return "messaging"
def parse(self, content: str, metadata: Dict[str, Any]) -> Dict[str, Any]:
"""Parse messaging content and extract entities and relationships"""
entities = []
relationships = []
# Extract messages and people
lines = content.split("\\\\n")
current_sender = None
for line in lines:
if ":" in line:
parts = line.split(":", 1)
if len(parts) == 2:
sender = parts[0].strip()
message = parts[1].strip()
# Add person entity if new
if sender != current_sender:
current_sender = sender
person_id = f"person_{uuid.uuid4()}"
entities.append({
"type": "person",
"id": person_id,
"name": sender
})
else:
# Find existing person entity
person_id = next((e["id"] for e in entities if e["type"] == "person" and e["name"] == sender), None)
if not person_id:
person_id = f"person_{uuid.uuid4()}"
entities.append({
"type": "person",
"id": person_id,
"name": sender
})
# Add message entity
message_id = f"message_{uuid.uuid4()}"
entities.append({
"type": "message",
"id": message_id,
"content": message,
"sender": sender,
"status": "unreplied" if sender != "Me" and "?" in message else "normal",
"app": metadata.get("app_name", "messaging")
})
# Add relationship between person and message
relationships.append({
"source": person_id,
"target": message_id,
"type": "authored"
})
# If the message is not from "Me", create a received_by relationship
if sender != "Me":
relationships.append({
"source": message_id,
"target": "me",
"type": "received_by"
})
return {
"entities": entities,
"relationships": relationships
}