Based on your requirements, I'll provide a complete implementation plan for building a GraphRAG system using Mem0's graph memory functionality. This solution will focus exclusively on Mem0 for the knowledge graph component without relying on other frameworks.
graphmem/
├── app.py # FastAPI application
├── ingest.py # Ingestion logic
├── query.py # Query processing
├── parsers/ # Application-specific parsers
│ ├── __init__.py
│ ├── base.py # Base parser class
│ ├── whatsapp.py # WhatsApp parser
│ ├── github.py # GitHub parser
│ └── generic.py # Generic content parser
├── memory/ # Memory management
│ ├── __init__.py
│ └── graph_memory.py # Mem0 graph memory implementation
├── utils/ # Utility functions
│ ├── __init__.py
│ └── helpers.py # Helper functions
├── config.py # Configuration
├── requirements.txt # Dependencies
└── demo.ipynb # Demo notebook
Create a requirements.txt file:
mem0ai==0.2.4
fastapi==0.104.1
uvicorn==0.24.0
python-dotenv==1.0.0
pydantic==2.4.2
openai==1.3.0
pillow==10.1.0
pytesseract==0.3.10
Install the dependencies:
pip install -r requirements.txt
# memory/graph_memory.py
from typing import Dict, Any, List, Optional, Tuple
import logging
from mem0 import Memory
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class GraphMemory:
"""Wrapper around Mem0's memory for our application."""
def __init__(self, user_id: str = "user123"):
"""
Initialize the graph memory.
Args:
user_id: Identifier for the user
"""
self.memory = Memory()
self.user_id = user_id
# Initialize a unique session for this instance
self.session_id = f"session_{user_id}"
def add_entity(self, entity: Dict[str, Any]) -> None:
"""
Add an entity to the graph memory.
Args:
entity: Dictionary containing entity information
"""
try:
# Create a unique entity ID
entity_id = f"{entity['type']}:{entity['entity']}"
# Store entity in memory
self.memory.add(
content=f"Entity: {entity['entity']} is a {entity['type']}. {entity['description']}",
user_id=self.user_id,
metadata={
"type": "entity",
"entity_type": entity["type"],
"entity_id": entity_id,
"entity_name": entity["entity"],
"source": entity.get("source", "unknown"),
"description": entity["description"]
},
context_type="graph"
)
logger.info(f"Added entity: {entity['entity']}")
except Exception as e:
logger.error(f"Error adding entity: {str(e)}")
raise
def add_relationship(self, relationship: Dict[str, Any]) -> None:
"""
Add a relationship to the graph memory.
Args:
relationship: Dictionary containing relationship information
"""
try:
# Create a unique relationship ID
rel_id = f"{relationship['source']}_{relationship['relation']}_{relationship['target']}"
# Store relationship in memory
self.memory.add(
content=f"Relationship: {relationship['source']} {relationship['relation']} {relationship['target']}. {relationship['description']}",
user_id=self.user_id,
metadata={
"type": "relationship",
"relationship_id": rel_id,
"source": relationship["source"],
"target": relationship["target"],
"relation": relationship["relation"],
"source_app": relationship.get("source_app", "unknown"),
"description": relationship["description"],
**{k: v for k, v in relationship.items() if k not in ["source", "target", "relation", "description", "source_app"]}
},
context_type="graph"
)
logger.info(f"Added relationship: {relationship['source']} {relationship['relation']} {relationship['target']}")
except Exception as e:
logger.error(f"Error adding relationship: {str(e)}")
raise
def search_entities(self, query: str, entity_type: Optional[str] = None) -> List[Dict[str, Any]]:
"""
Search for entities in the graph memory.
Args:
query: Search query
entity_type: Optional type to filter results
Returns:
List of matching entities
"""
try:
# Create filter based on entity type
filter_dict = {"type": "entity"}
if entity_type:
filter_dict["entity_type"] = entity_type
# Search memory
results = self.memory.search(
query,
user_id=self.user_id,
context_type="graph",
metadata_filter=filter_dict
)
# Format results
entities = []
for result in results:
metadata = result.metadata
entities.append({
"entity": metadata.get("entity_name", ""),
"type": metadata.get("entity_type", ""),
"description": metadata.get("description", ""),
"source": metadata.get("source", "unknown"),
"score": result.score
})
return entities
except Exception as e:
logger.error(f"Error searching entities: {str(e)}")
return []
def search_relationships(self, query: str, relation_type: Optional[str] = None) -> List[Dict[str, Any]]:
"""
Search for relationships in the graph memory.
Args:
query: Search query
relation_type: Optional relation type to filter results
Returns:
List of matching relationships
"""
try:
# Create filter based on relation type
filter_dict = {"type": "relationship"}
if relation_type:
filter_dict["relation"] = relation_type
# Search memory
results = self.memory.search(
query,
user_id=self.user_id,
context_type="graph",
metadata_filter=filter_dict
)
# Format results
relationships = []
for result in results:
metadata = result.metadata
relationships.append({
"source": metadata.get("source", ""),
"target": metadata.get("target", ""),
"relation": metadata.get("relation", ""),
"description": metadata.get("description", ""),
"source_app": metadata.get("source_app", "unknown"),
"score": result.score,
**{k: v for k, v in metadata.items() if k not in ["source", "target", "relation", "description", "source_app", "type", "relationship_id"]}
})
return relationships
except Exception as e:
logger.error(f"Error searching relationships: {str(e)}")
return []
def get_team_members(self) -> List[str]:
"""
Get all team members from the graph memory.
Returns:
List of team member names
"""
# Search for team member relationships
relationships = self.search_relationships("team_member", "team_member")
# Extract unique team members
team_members = set()
for rel in relationships:
if rel["source"] != "User":
team_members.add(rel["source"])
if rel["target"] != "User":
team_members.add(rel["target"])
return list(team_members)
def get_unreplied_messages(self) -> List[Dict[str, Any]]:
"""
Get all unreplied messages from the graph memory.
Returns:
List of unreplied message details
"""
# Search for unreplied message relationships
relationships = self.search_relationships("unreplied message", "sent_unreplied_message")
# Format the results
unreplied_messages = []
for rel in relationships:
unreplied_messages.append({
"sender": rel["source"],
"message": rel.get("message_content", ""),
"timestamp": rel.get("timestamp", ""),
"source_app": rel["source_app"]
})
return unreplied_messages
def get_prs_needing_review(self) -> List[Dict[str, Any]]:
"""
Get all PRs needing review from the graph memory.
Returns:
List of PR details
"""
# Search for PR review relationships
relationships = self.search_relationships("PR needs review", "needs_review")
# Format the results
prs = []
for rel in relationships:
prs.append({
"pr": rel["source"],
"repository": rel.get("repository", ""),
"url": rel.get("url", ""),
"description": rel["description"]
})
return prs
# parsers/base.py
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Tuple
class BaseParser(ABC):
"""Base class for all content parsers."""
@abstractmethod
def parse(self, content: Dict[str, Any]) -> Tuple[List[Dict], List[Dict]]:
"""
Parse content and extract entities and relationships.
Args:
content: Dict containing screen content
Returns:
Tuple of (entities, relationships)
- entities: List of dicts with keys 'entity', 'type', 'description', 'source'
- relationships: List of dicts with keys 'source', 'target', 'relation', 'description', 'source_app'
"""
pass
def get_app_type(self, content: Dict[str, Any]) -> str:
"""
Determine the application type from content.
Args:
content: Dict containing screen content
Returns:
String identifying the application type
"""
app_name = content.get('application', '').lower()
if 'whatsapp' in app_name:
return 'whatsapp'
elif any(x in app_name for x in ['github', 'gitlab']):
return 'github'
else:
return 'generic'
# parsers/generic.py
import re
from typing import Dict, Any, List, Tuple
from .base import BaseParser
class GenericParser(BaseParser):
"""Parser for generic content."""
def parse(self, content: Dict[str, Any]) -> Tuple[List[Dict], List[Dict]]:
"""Parse generic content to extract entities and relationships."""
entities = []
relationships = []
# Extract text content
text = self._extract_text(content)
# Extract potential entities (names, organizations)
potential_entities = self._extract_potential_entities(text)
for entity, entity_type in potential_entities:
entities.append({
'entity': entity,
'type': entity_type,
'description': f"Entity found in {content.get('application', 'unknown application')}",
'source': content.get('application', 'unknown')
})
# Extract potential relationships (simplified)
for i, entity1 in enumerate(potential_entities):
for entity2 in potential_entities[i+1:]:
if self._are_related(entity1[0], entity2[0], text):
relationships.append({
'source': entity1[0],
'target': entity2[0],
'relation': 'mentioned_with',
'description': f"Entities mentioned together in {content.get('application', 'unknown application')}",
'source_app': content.get('application', 'unknown')
})
return entities, relationships
def _extract_text(self, content: Dict[str, Any]) -> str:
"""Extract all text from content."""
text = ""
# Extract direct text
if 'text' in content:
text += content['text'] + " "
# Extract text from elements
if 'elements' in content:
for element in content['elements']:
if 'content' in element and isinstance(element['content'], str):
text += element['content'] + " "
return text
def _extract_potential_entities(self, text: str) -> List[Tuple[str, str]]:
"""Extract potential entities from text."""
entities = []
# Simple pattern for names (capitalized words)
name_pattern = r'\\\\b[A-Z][a-z]+ [A-Z][a-z]+\\\\b'
names = re.findall(name_pattern, text)
for name in names:
entities.append((name, 'PERSON'))
# Simple pattern for organizations (capitalized words with non-space chars)
org_pattern = r'\\\\b[A-Z][A-Za-z]+([ \\\\-][A-Z][A-Za-z]+)+\\\\b'
orgs = re.findall(org_pattern, text)
for org in orgs:
if org not in names: # Avoid duplicates
entities.append((org, 'ORGANIZATION'))
return entities
def _are_related(self, entity1: str, entity2: str, text: str) -> bool:
"""Check if two entities are related in the text."""
# Simple heuristic: check if entities appear within 50 characters of each other
entity1_pos = text.find(entity1)
entity2_pos = text.find(entity2)
if entity1_pos == -1 or entity2_pos == -1:
return False
return abs(entity1_pos - entity2_pos) < 50
# parsers/whatsapp.py
from typing import Dict, Any, List, Tuple
from .base import BaseParser
import re
class WhatsAppParser(BaseParser):
"""Parser for WhatsApp content."""
def parse(self, content: Dict[str, Any]) -> Tuple[List[Dict], List[Dict]]:
"""Parse WhatsApp content to extract entities and relationships."""
entities = []
relationships = []
# Extract text content
text = self._extract_text(content)
# Extract chat participants
participants = self._extract_participants(content)
# Add participants as entities
for participant in participants:
entities.append({
'entity': participant,
'type': 'PERSON',
'description': f"WhatsApp contact",
'source': 'whatsapp'
})
# Extract messages
messages = self._extract_messages(content)
# Process messages to find unreplied ones
for message in messages:
if not message.get('replied', True):
# Create relationship for unreplied message
if message.get('sender') and message.get('sender') != 'User':
relationships.append({
'source': message.get('sender', 'Unknown'),
'target': 'User',
'relation': 'sent_unreplied_message',
'description': f"Unreplied message: {message.get('content', '')}",
'source_app': 'whatsapp',
'timestamp': message.get('timestamp'),
'message_content': message.get('content', '')
})
return entities, relationships
def _extract_text(self, content: Dict[str, Any]) -> str:
"""Extract all text from content."""
text = ""
# Extract direct text
if 'text' in content:
text += content['text'] + " "
# Extract text from elements
if 'elements' in content:
for element in content['elements']:
if 'content' in element and isinstance(element['content'], str):
text += element['content'] + " "
return text
def _extract_participants(self, content: Dict[str, Any]) -> List[str]:
"""Extract chat participants from WhatsApp content."""
participants = []
# Extract from header elements
if 'elements' in content:
for element in content['elements']:
if element.get('type') == 'header' and 'content' in element:
# Typically the chat header contains the name of the contact or group
participants.append(element['content'])
# Extract from message senders
messages = self._extract_messages(content)
for message in messages:
if 'sender' in message and message['sender'] not in participants and message['sender'] != 'User':
participants.append(message['sender'])
return participants
def _extract_messages(self, content: Dict[str, Any]) -> List[Dict]:
"""Extract messages from WhatsApp content."""
messages = []
# Look for message elements
if 'elements' in content:
for element in content['elements']:
if element.get('type') == 'message':
message = {
'sender': element.get('sender', 'Unknown'),
'content': element.get('content', ''),
'timestamp': element.get('timestamp'),
'replied': self._is_replied(element)
}
messages.append(message)
return messages
def _is_replied(self, message_element: Dict) -> bool:
"""Determine if a message has been replied to."""
# Check for reply indicators in the message element
if 'attributes' in message_element:
attrs = message_element['attributes']
if 'replied' in attrs:
return attrs['replied']
# Check for visual indicators that might suggest a reply
if 'has_reply_indicator' in attrs:
return attrs['has_reply_indicator']
# Default to True (assuming replied) to avoid false positives
return True