This POC implements a contextual AI assistant that processes screen content and answers complex questions using Mem0's graph memory capabilities and GraphRAG techniques.
graphrag_assistant/
│
├── api/ # API layer
│ ├── __init__.py
│ ├── main.py # FastAPI application
│ ├── routers/
│ │ ├── __init__.py
│ │ ├── ingest.py # Ingestion endpoint
│ │ └── query.py # Query endpoint
│ └── models/
│ ├── __init__.py
│ ├── request.py # Request schemas
│ └── response.py # Response schemas
│
├── core/ # Core functionality
│ ├── __init__.py
│ ├── parsers/ # Content parsers
│ │ ├── __init__.py
│ │ ├── base.py # Base parser class
│ │ ├── whatsapp.py # WhatsApp parser
│ │ ├── github.py # GitHub parser
│ │ └── generic.py # Generic text parser
│ │
│ ├── extraction/ # Entity extraction
│ │ ├── __init__.py
│ │ ├── entity.py # Entity extraction
│ │ └── relation.py # Relationship extraction
│ │
│ └── mem0/ # Mem0 integration
│ ├── __init__.py
│ ├── client.py # Mem0 client wrapper
│ ├── schema.py # Graph schema definitions
│ └── queries.py # Query templates
│
├── services/ # Service integrations
│ ├── __init__.py
│ ├── mem0_service.py # Mem0 service
│ ├── llm_service.py # LLM service
│ └── embedding_service.py # Embedding service
│
├── utils/ # Utilities
│ ├── __init__.py
│ ├── text.py # Text processing
│ └── config.py # Configuration
│
├── tests/ # Tests
│ ├── __init__.py
│ ├── test_parsers.py
│ ├── test_extraction.py
│ └── test_queries.py
│
├── config.yaml # Configuration file
├── requirements.txt # Dependencies
├── Dockerfile # Docker configuration
└── main.py # Application entry point
# Application configuration
app:
name: "GraphRAG Assistant"
version: "0.1.0"
log_level: "INFO"
# API configuration
api:
host: "0.0.0.0"
port: 8000
debug: true
# Mem0 configuration
mem0:
api_key: "${MEM0_API_KEY}" # Set via environment variable
project_id: "${MEM0_PROJECT_ID}"
endpoints:
graph: "<https://api.mem0.ai/v1/graph>"
vector: "<https://api.mem0.ai/v1/vector>"
key_value: "<https://api.mem0.ai/v1/kv>"
# LLM configuration
llm:
provider: "openai"
model: "gpt-4"
api_key: "${OPENAI_API_KEY}"
temperature: 0.2
max_tokens: 1000
# Embedding configuration
embedding:
model: "sentence-transformers/all-mpnet-base-v2"
dimension: 768
batch_size: 32
# Parser configuration
parsers:
whatsapp:
enabled: true
patterns:
message: "^\\\\\\\\[\\\\\\\\d{2}:\\\\\\\\d{2}, \\\\\\\\d{1,2}/\\\\\\\\d{1,2}/\\\\\\\\d{4}\\\\\\\\] (.+?): (.+)$"
github:
enabled: true
generic:
enabled: true
import uvicorn
import yaml
import os
from api.main import create_app
from utils.config import load_config
def main():
# Load configuration
config = load_config()
# Create FastAPI app
app = create_app(config)
# Run server
uvicorn.run(
app,
host=config['api']['host'],
port=config['api']['port'],
log_level=config['app']['log_level'].lower()
)
if __name__ == "__main__":
main()
import os
import yaml
from pathlib import Path
def load_config(config_path="config.yaml"):
"""Load configuration from YAML file with environment variable substitution."""
# Load YAML file
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
# Process environment variables
config = _process_env_vars(config)
return config
def _process_env_vars(config):
"""Recursively process environment variables in config."""
if isinstance(config, dict):
return {k: _process_env_vars(v) for k, v in config.items()}
elif isinstance(config, list):
return [_process_env_vars(i) for i in config]
elif isinstance(config, str) and config.startswith("${") and config.endswith("}"):
# Extract environment variable name
env_var = config[2:-1]
# Get value from environment or use empty string if not set
return os.environ.get(env_var, "")
else:
return config
from fastapi import FastAPI, Depends
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from .routers import ingest, query
from services.mem0_service import Mem0Service
from services.llm_service import LLMService
from services.embedding_service import EmbeddingService
# Services dictionary to be populated during startup
services = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
# Initialize services on startup
config = app.state.config
# Initialize Mem0 service
services["mem0"] = Mem0Service(
api_key=config["mem0"]["api_key"],
project_id=config["mem0"]["project_id"],
endpoints=config["mem0"]["endpoints"]
)
# Initialize LLM service
services["llm"] = LLMService(
provider=config["llm"]["provider"],
model=config["llm"]["model"],
api_key=config["llm"]["api_key"],
temperature=config["llm"]["temperature"],
max_tokens=config["llm"]["max_tokens"]
)
# Initialize Embedding service
services["embedding"] = EmbeddingService(
model=config["embedding"]["model"],
dimension=config["embedding"]["dimension"],
batch_size=config["embedding"]["batch_size"]
)
yield
# Cleanup on shutdown
for service_name, service in services.items():
if hasattr(service, "close"):
await service.close()
def create_app(config):
# Create FastAPI app
app = FastAPI(
title=config["app"]["name"],
version=config["app"]["version"],
lifespan=lifespan
)
# Store config in app state
app.state.config = config
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Dependency to get services
def get_services():
return services
# Include routers
app.include_router(ingest.router, dependencies=[Depends(get_services)])
app.include_router(query.router, dependencies=[Depends(get_services)])
return app
# api/models/request.py
from pydantic import BaseModel, Field
from typing import Dict, List, Optional, Any
from datetime import datetime
class ScreenContent(BaseModel):
"""Model for screen content ingestion."""
timestamp: datetime = Field(..., description="Capture timestamp")
source: Dict[str, str] = Field(..., description="Source metadata (app, title, URL)")
content_type: str = Field(..., description="Content type (chat, pr, document)")
raw_text: str = Field(..., description="Raw text content")
structured_data: Optional[Dict[str, Any]] = Field(None, description="Structured data")
class QueryRequest(BaseModel):
"""Model for chat completion request."""
user_id: str = Field(..., description="User identifier")
query: str = Field(..., description="User query")
conversation_id: Optional[str] = Field(None, description="Conversation identifier")
context: Optional[Dict[str, str]] = Field(None, description="Current context")
# api/models/response.py
from pydantic import BaseModel, Field
from typing import Dict, List, Optional, Any
class IngestResponse(BaseModel):
"""Model for ingestion response."""
status: str = Field(..., description="Processing status")
processed_entities: int = Field(0, description="Number of entities processed")
processed_relationships: int = Field(0, description="Number of relationships processed")
details: Optional[Dict[str, Any]] = Field(None, description="Processing details")
class Source(BaseModel):
"""Model for response source."""
type: str = Field(..., description="Source type (entity, document)")
id: str = Field(..., description="Source identifier")
content: str = Field(..., description="Source content")
class SuggestedAction(BaseModel):
"""Model for suggested action."""
type: str = Field(..., description="Action type (reply, review)")
target: str = Field(..., description="Action target")
content: str = Field(..., description="Action content")
class QueryResponse(BaseModel):
"""Model for chat completion response."""
response: str = Field(..., description="Response text")
sources: Optional[List[Source]] = Field(None, description="Information sources")
suggested_actions: Optional[List[SuggestedAction]] = Field(None, description="Suggested actions")
# api/routers/ingest.py
from fastapi import APIRouter, Depends, HTTPException
from typing import Dict, Any
from ..models.request import ScreenContent
from ..models.response import IngestResponse
from core.parsers import get_parser
from core.extraction.entity import EntityExtractor
from core.extraction.relation import RelationshipExtractor
router = APIRouter(prefix="/ingest", tags=["Ingestion"])
@router.post("/", response_model=IngestResponse)
async def ingest_screen_content(
content: ScreenContent,
services: Dict[str, Any] = Depends()
):
"""Ingest screen content and store in Mem0."""
try:
# Get appropriate parser based on content type and source
parser = get_parser(content.content_type, content.source.get("application"))
# Parse content
parsed_content = parser.parse(
content.raw_text,
structured_data=content.structured_data,
metadata=content.source
)
# Extract entities
entity_extractor = EntityExtractor()
entities = entity_extractor.extract(parsed_content)
# Extract relationships
relationship_extractor = RelationshipExtractor()
relationships = relationship_extractor.extract(parsed_content, entities)
# Store in Mem0
mem0_service = services["mem0"]
entity_ids = {}
# Store entities
for entity in entities:
entity_id = mem0_service.store_entity(
entity_type=entity["type"],
properties=entity["properties"]
)
entity_ids[entity["id"]] = entity_id
# Store relationships
for relationship in relationships:
mem0_service.store_relationship(
from_id=entity_ids[relationship["from_id"]],
to_id=entity_ids[relationship["to_id"]],
relationship_type=relationship["type"],
properties=relationship.get("properties", {})
)
return IngestResponse(
status="success",
processed_entities=len(entities),
processed_relationships=len(relationships),
details={
"entity_types": {entity["type"] for entity in entities},
"relationship_types": {rel["type"] for rel in relationships}
}
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# api/routers/query.py
from fastapi import APIRouter, Depends, HTTPException
from typing import Dict, Any
from ..models.request import QueryRequest
from ..models.response import QueryResponse, Source, SuggestedAction
from core.mem0.queries import get_query_handler
router = APIRouter(prefix="/chat_completion", tags=["Query"])
@router.post("/", response_model=QueryResponse)
async def process_query(
request: QueryRequest,
services: Dict[str, Any] = Depends()
):
"""Process user query and generate response."""
try:
# Get services
mem0_service = services["mem0"]
llm_service = services["llm"]
embedding_service = services["embedding"]
# Analyze query intent
query_embedding = embedding_service.embed_text(request.query)
query_intent = llm_service.classify_intent(request.query)
# Get appropriate query handler
query_handler = get_query_handler(query_intent)
# Execute query against Mem0
query_results = query_handler.execute(
mem0_service=mem0_service,
user_id=request.user_id,
query=request.query,
context=request.context
)
# Prepare context for LLM
context = query_handler.prepare_context(query_results)
# Generate response with LLM
response_text = llm_service.generate_response(
query=request.query,
context=context
)
# Prepare sources
sources = [
Source(
type=source["type"],
id=source["id"],
content=source["content"]
)
for source in query_results.get("sources", [])
]
# Generate suggested actions if needed
suggested_actions = []
if query_intent == "unanswered_messages" and query_results.get("messages"):
for message in query_results["messages"]:
draft = llm_service.generate_reply_draft(
conversation=message.get("conversation_history", []),
last_message=message["text"]
)
suggested_actions.append(
SuggestedAction(
type="reply",
target=message["conversation_id"],
content=draft
)
)
return QueryResponse(
response=response_text,
sources=sources if sources else None,
suggested_actions=suggested_actions if suggested_actions else None
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# core/parsers/base.py
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional
class BaseParser(ABC):
"""Base class for content parsers."""
@abstractmethod
def parse(self, raw_text: str, structured_data: Optional[Dict[str, Any]] = None, metadata: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
"""Parse raw text content into structured data."""
pass
# core/parsers/__init__.py
from typing import Dict, Any, Optional
from .base import BaseParser
from .whatsapp import WhatsAppParser
from .github import GitHubParser
from .generic import GenericParser
def get_parser(content_type: str, application: Optional[str] = None) -> BaseParser:
"""Get appropriate parser based on content type and application."""
if content_type == "chat" and application == "WhatsApp":
return WhatsAppParser()
elif content_type in ["pr", "issue"] and application in ["GitHub", "GitLab"]:
return GitHubParser()
else:
return GenericParser()
# core/parsers/whatsapp.py
import re
from datetime import datetime
from typing import Dict, Any, Optional, List
from .base import BaseParser
class WhatsAppParser(BaseParser):
"""Parser for WhatsApp conversations."""
def __init__(self):
# Regular expression for WhatsApp messages
self.message_pattern = re.compile(r"^\\\\[(\\\\d{2}:\\\\d{2}), (\\\\d{1,2}/\\\\d{1,2}/\\\\d{4})\\\\] (.+?): (.+)$", re.MULTILINE)
def parse(self, raw_text: str, structured_data: Optional[Dict[str, Any]] = None, metadata: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
"""Parse WhatsApp conversation."""
# Use structured data if available
if structured_data and "messages" in structured_data:
return {
"type": "conversation",
"application": "WhatsApp",
"title": metadata.get("window_title", "WhatsApp Chat"),
"messages": structured_data["messages"]
}
# Otherwise parse from raw text
messages = []
for match in self.message_pattern.finditer(raw_text):
time_str, date_str, sender, text = match.groups()
# Parse timestamp
try:
# Convert DD/MM/YYYY to YYYY-MM-DD
date_parts = date_str.split("/")
iso_date = f"{date_parts[2]}-{date_parts[1]}-{date_parts[0]}"
timestamp = f"{iso_date}T{time_str}:00"
except:
# Fallback to current time if parsing fails
timestamp = datetime.now().isoformat()
messages.append({
"sender": sender,
"text": text,
"timestamp": timestamp
})
return {
"type": "conversation",
"application": "WhatsApp",
"title": metadata.get("window_title", "WhatsApp Chat"),
"messages": messages
}
# core/parsers/github.py
import re
from typing import Dict, Any, Optional, List
from .base import BaseParser
class GitHubParser(BaseParser):
"""Parser for GitHub content."""
def parse(self, raw_text: str, structured_data: Optional[Dict[str, Any]] = None, metadata: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
"""Parse GitHub content."""
# Use structured data if available
if structured_data:
return structured_data
# Extract from URL if available
url = metadata.get("url", "")
if "github.com" in url and "/pull/" in url:
return self._parse_pr_from_url(url, raw_text, metadata)
# Fallback to generic parsing
return self._parse_generic_github(raw_text, metadata)
def _parse_pr_from_url(self, url: str, raw_text: str, metadata: Dict[str, str]) -> Dict[str, Any]:
"""Parse PR information from GitHub URL."""
# Extract repository and PR number from URL
# Example: <https://github.com/owner/repo/pull/123>
match = re.search(r"github\\\\.com/([^/]+)/([^/]+)/pull/(\\\\d+)", url)
if not match:
return self._parse_generic_github(raw_text, metadata)
owner, repo, pr_number = match.groups()
# Extract PR title from window title
title = metadata.get("window_title", "")
if " by " in title:
title = title.split(" by ")[0].strip()
# Extract PR status
status = "open"
if "Merged" in raw_text:
status = "merged"
elif "Closed" in raw_text:
status = "closed"
# Extract reviewers (simplified)
reviewers = []
review_section = re.search(r"Reviewers(.*?)Assignees", raw_text, re.DOTALL)
if review_section:
reviewers_text = review_section.group(1)
reviewers = [name.strip() for name in reviewers_text.split() if name.strip()]
return {
"type": "pull_request",
"application": "GitHub",
"repository": f"{owner}/{repo}",
"pr_number": pr_number,
"title": title,
"url": url,
"status": status,
"reviewers": reviewers
}
def _parse_generic_github(self, raw_text: str, metadata: Dict[str, str]) -> Dict[str, Any]:
"""Fallback parser for generic GitHub content."""
return {
"type": "github_page",
"application": "GitHub",
"title": metadata.get("window_title", "GitHub"),
"url": metadata.get("url", ""),
"content": raw_text
}
# core/parsers/generic.py
from typing import Dict, Any, Optional
from .base import BaseParser
class GenericParser(BaseParser):
"""Generic parser for unstructured content."""
def parse(self, raw_text: str, structured_data: Optional[Dict[str, Any]] = None, metadata: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
"""Parse generic content."""
# Use structured data if available
if structured_data:
return structured_data
# Extract application from metadata
application = metadata.get("application", "unknown")
# Extract title
title = metadata.get("window_title", "Untitled")
# Determine content type based on available information
content_type = "document"
if "email" in application.lower() or "@" in title:
content_type = "email"
elif any(app in application.lower() for app in ["chat", "slack", "teams", "messenger"]):
content_type = "conversation"
elif any(app in application.lower() for app in ["jira", "trello", "asana"]):
content_type = "task"
return {
"type": content_type,
"application": application,
"title": title,
"url": metadata.get("url", ""),
"content": raw_text
}