Complete End-to-End POC for GraphRAG Assistant with Mem0

This POC implements a contextual AI assistant that processes screen content and answers complex questions using Mem0's graph memory capabilities and GraphRAG techniques.

1. Project Structure

graphrag_assistant/
│
├── api/                           # API layer
│   ├── __init__.py
│   ├── main.py                    # FastAPI application
│   ├── routers/
│   │   ├── __init__.py
│   │   ├── ingest.py              # Ingestion endpoint
│   │   └── query.py               # Query endpoint
│   └── models/
│       ├── __init__.py
│       ├── request.py             # Request schemas
│       └── response.py            # Response schemas
│
├── core/                          # Core functionality
│   ├── __init__.py
│   ├── parsers/                   # Content parsers
│   │   ├── __init__.py
│   │   ├── base.py                # Base parser class
│   │   ├── whatsapp.py            # WhatsApp parser
│   │   ├── github.py              # GitHub parser
│   │   └── generic.py             # Generic text parser
│   │
│   ├── extraction/                # Entity extraction
│   │   ├── __init__.py
│   │   ├── entity.py              # Entity extraction
│   │   └── relation.py            # Relationship extraction
│   │
│   └── mem0/                      # Mem0 integration
│       ├── __init__.py
│       ├── client.py              # Mem0 client wrapper
│       ├── schema.py              # Graph schema definitions
│       └── queries.py             # Query templates
│
├── services/                      # Service integrations
│   ├── __init__.py
│   ├── mem0_service.py            # Mem0 service
│   ├── llm_service.py             # LLM service
│   └── embedding_service.py       # Embedding service
│
├── utils/                         # Utilities
│   ├── __init__.py
│   ├── text.py                    # Text processing
│   └── config.py                  # Configuration
│
├── tests/                         # Tests
│   ├── __init__.py
│   ├── test_parsers.py
│   ├── test_extraction.py
│   └── test_queries.py
│
├── config.yaml                    # Configuration file
├── requirements.txt               # Dependencies
├── Dockerfile                     # Docker configuration
└── main.py                        # Application entry point

2. Core Implementation Files

2.1 Configuration (config.yaml)

# Application configuration
app:
  name: "GraphRAG Assistant"
  version: "0.1.0"
  log_level: "INFO"

# API configuration
api:
  host: "0.0.0.0"
  port: 8000
  debug: true

# Mem0 configuration
mem0:
  api_key: "${MEM0_API_KEY}"  # Set via environment variable
  project_id: "${MEM0_PROJECT_ID}"
  endpoints:
    graph: "<https://api.mem0.ai/v1/graph>"
    vector: "<https://api.mem0.ai/v1/vector>"
    key_value: "<https://api.mem0.ai/v1/kv>"

# LLM configuration
llm:
  provider: "openai"
  model: "gpt-4"
  api_key: "${OPENAI_API_KEY}"
  temperature: 0.2
  max_tokens: 1000

# Embedding configuration
embedding:
  model: "sentence-transformers/all-mpnet-base-v2"
  dimension: 768
  batch_size: 32

# Parser configuration
parsers:
  whatsapp:
    enabled: true
    patterns:
      message: "^\\\\\\\\[\\\\\\\\d{2}:\\\\\\\\d{2}, \\\\\\\\d{1,2}/\\\\\\\\d{1,2}/\\\\\\\\d{4}\\\\\\\\] (.+?): (.+)$"
  github:
    enabled: true
  generic:
    enabled: true

2.2 Main Application (main.py)

import uvicorn
import yaml
import os
from api.main import create_app
from utils.config import load_config

def main():
    # Load configuration
    config = load_config()

    # Create FastAPI app
    app = create_app(config)

    # Run server
    uvicorn.run(
        app,
        host=config['api']['host'],
        port=config['api']['port'],
        log_level=config['app']['log_level'].lower()
    )

if __name__ == "__main__":
    main()

2.3 Configuration Utility (utils/config.py)

import os
import yaml
from pathlib import Path

def load_config(config_path="config.yaml"):
    """Load configuration from YAML file with environment variable substitution."""
    # Load YAML file
    with open(config_path, 'r') as f:
        config = yaml.safe_load(f)

    # Process environment variables
    config = _process_env_vars(config)

    return config

def _process_env_vars(config):
    """Recursively process environment variables in config."""
    if isinstance(config, dict):
        return {k: _process_env_vars(v) for k, v in config.items()}
    elif isinstance(config, list):
        return [_process_env_vars(i) for i in config]
    elif isinstance(config, str) and config.startswith("${") and config.endswith("}"):
        # Extract environment variable name
        env_var = config[2:-1]
        # Get value from environment or use empty string if not set
        return os.environ.get(env_var, "")
    else:
        return config

2.4 FastAPI Application (api/main.py)

from fastapi import FastAPI, Depends
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager

from .routers import ingest, query
from services.mem0_service import Mem0Service
from services.llm_service import LLMService
from services.embedding_service import EmbeddingService

# Services dictionary to be populated during startup
services = {}

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Initialize services on startup
    config = app.state.config

    # Initialize Mem0 service
    services["mem0"] = Mem0Service(
        api_key=config["mem0"]["api_key"],
        project_id=config["mem0"]["project_id"],
        endpoints=config["mem0"]["endpoints"]
    )

    # Initialize LLM service
    services["llm"] = LLMService(
        provider=config["llm"]["provider"],
        model=config["llm"]["model"],
        api_key=config["llm"]["api_key"],
        temperature=config["llm"]["temperature"],
        max_tokens=config["llm"]["max_tokens"]
    )

    # Initialize Embedding service
    services["embedding"] = EmbeddingService(
        model=config["embedding"]["model"],
        dimension=config["embedding"]["dimension"],
        batch_size=config["embedding"]["batch_size"]
    )

    yield

    # Cleanup on shutdown
    for service_name, service in services.items():
        if hasattr(service, "close"):
            await service.close()

def create_app(config):
    # Create FastAPI app
    app = FastAPI(
        title=config["app"]["name"],
        version=config["app"]["version"],
        lifespan=lifespan
    )

    # Store config in app state
    app.state.config = config

    # Add CORS middleware
    app.add_middleware(
        CORSMiddleware,
        allow_origins=["*"],
        allow_credentials=True,
        allow_methods=["*"],
        allow_headers=["*"],
    )

    # Dependency to get services
    def get_services():
        return services

    # Include routers
    app.include_router(ingest.router, dependencies=[Depends(get_services)])
    app.include_router(query.router, dependencies=[Depends(get_services)])

    return app

2.5 Request/Response Models (api/models)

# api/models/request.py
from pydantic import BaseModel, Field
from typing import Dict, List, Optional, Any
from datetime import datetime

class ScreenContent(BaseModel):
    """Model for screen content ingestion."""
    timestamp: datetime = Field(..., description="Capture timestamp")
    source: Dict[str, str] = Field(..., description="Source metadata (app, title, URL)")
    content_type: str = Field(..., description="Content type (chat, pr, document)")
    raw_text: str = Field(..., description="Raw text content")
    structured_data: Optional[Dict[str, Any]] = Field(None, description="Structured data")

class QueryRequest(BaseModel):
    """Model for chat completion request."""
    user_id: str = Field(..., description="User identifier")
    query: str = Field(..., description="User query")
    conversation_id: Optional[str] = Field(None, description="Conversation identifier")
    context: Optional[Dict[str, str]] = Field(None, description="Current context")

# api/models/response.py
from pydantic import BaseModel, Field
from typing import Dict, List, Optional, Any

class IngestResponse(BaseModel):
    """Model for ingestion response."""
    status: str = Field(..., description="Processing status")
    processed_entities: int = Field(0, description="Number of entities processed")
    processed_relationships: int = Field(0, description="Number of relationships processed")
    details: Optional[Dict[str, Any]] = Field(None, description="Processing details")

class Source(BaseModel):
    """Model for response source."""
    type: str = Field(..., description="Source type (entity, document)")
    id: str = Field(..., description="Source identifier")
    content: str = Field(..., description="Source content")

class SuggestedAction(BaseModel):
    """Model for suggested action."""
    type: str = Field(..., description="Action type (reply, review)")
    target: str = Field(..., description="Action target")
    content: str = Field(..., description="Action content")

class QueryResponse(BaseModel):
    """Model for chat completion response."""
    response: str = Field(..., description="Response text")
    sources: Optional[List[Source]] = Field(None, description="Information sources")
    suggested_actions: Optional[List[SuggestedAction]] = Field(None, description="Suggested actions")

2.6 API Routers

# api/routers/ingest.py
from fastapi import APIRouter, Depends, HTTPException
from typing import Dict, Any

from ..models.request import ScreenContent
from ..models.response import IngestResponse
from core.parsers import get_parser
from core.extraction.entity import EntityExtractor
from core.extraction.relation import RelationshipExtractor

router = APIRouter(prefix="/ingest", tags=["Ingestion"])

@router.post("/", response_model=IngestResponse)
async def ingest_screen_content(
    content: ScreenContent,
    services: Dict[str, Any] = Depends()
):
    """Ingest screen content and store in Mem0."""
    try:
        # Get appropriate parser based on content type and source
        parser = get_parser(content.content_type, content.source.get("application"))

        # Parse content
        parsed_content = parser.parse(
            content.raw_text,
            structured_data=content.structured_data,
            metadata=content.source
        )

        # Extract entities
        entity_extractor = EntityExtractor()
        entities = entity_extractor.extract(parsed_content)

        # Extract relationships
        relationship_extractor = RelationshipExtractor()
        relationships = relationship_extractor.extract(parsed_content, entities)

        # Store in Mem0
        mem0_service = services["mem0"]
        entity_ids = {}

        # Store entities
        for entity in entities:
            entity_id = mem0_service.store_entity(
                entity_type=entity["type"],
                properties=entity["properties"]
            )
            entity_ids[entity["id"]] = entity_id

        # Store relationships
        for relationship in relationships:
            mem0_service.store_relationship(
                from_id=entity_ids[relationship["from_id"]],
                to_id=entity_ids[relationship["to_id"]],
                relationship_type=relationship["type"],
                properties=relationship.get("properties", {})
            )

        return IngestResponse(
            status="success",
            processed_entities=len(entities),
            processed_relationships=len(relationships),
            details={
                "entity_types": {entity["type"] for entity in entities},
                "relationship_types": {rel["type"] for rel in relationships}
            }
        )

    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

# api/routers/query.py
from fastapi import APIRouter, Depends, HTTPException
from typing import Dict, Any

from ..models.request import QueryRequest
from ..models.response import QueryResponse, Source, SuggestedAction
from core.mem0.queries import get_query_handler

router = APIRouter(prefix="/chat_completion", tags=["Query"])

@router.post("/", response_model=QueryResponse)
async def process_query(
    request: QueryRequest,
    services: Dict[str, Any] = Depends()
):
    """Process user query and generate response."""
    try:
        # Get services
        mem0_service = services["mem0"]
        llm_service = services["llm"]
        embedding_service = services["embedding"]

        # Analyze query intent
        query_embedding = embedding_service.embed_text(request.query)
        query_intent = llm_service.classify_intent(request.query)

        # Get appropriate query handler
        query_handler = get_query_handler(query_intent)

        # Execute query against Mem0
        query_results = query_handler.execute(
            mem0_service=mem0_service,
            user_id=request.user_id,
            query=request.query,
            context=request.context
        )

        # Prepare context for LLM
        context = query_handler.prepare_context(query_results)

        # Generate response with LLM
        response_text = llm_service.generate_response(
            query=request.query,
            context=context
        )

        # Prepare sources
        sources = [
            Source(
                type=source["type"],
                id=source["id"],
                content=source["content"]
            )
            for source in query_results.get("sources", [])
        ]

        # Generate suggested actions if needed
        suggested_actions = []
        if query_intent == "unanswered_messages" and query_results.get("messages"):
            for message in query_results["messages"]:
                draft = llm_service.generate_reply_draft(
                    conversation=message.get("conversation_history", []),
                    last_message=message["text"]
                )

                suggested_actions.append(
                    SuggestedAction(
                        type="reply",
                        target=message["conversation_id"],
                        content=draft
                    )
                )

        return QueryResponse(
            response=response_text,
            sources=sources if sources else None,
            suggested_actions=suggested_actions if suggested_actions else None
        )

    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

2.7 Parser Implementation

# core/parsers/base.py
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional

class BaseParser(ABC):
    """Base class for content parsers."""

    @abstractmethod
    def parse(self, raw_text: str, structured_data: Optional[Dict[str, Any]] = None, metadata: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
        """Parse raw text content into structured data."""
        pass

# core/parsers/__init__.py
from typing import Dict, Any, Optional
from .base import BaseParser
from .whatsapp import WhatsAppParser
from .github import GitHubParser
from .generic import GenericParser

def get_parser(content_type: str, application: Optional[str] = None) -> BaseParser:
    """Get appropriate parser based on content type and application."""
    if content_type == "chat" and application == "WhatsApp":
        return WhatsAppParser()
    elif content_type in ["pr", "issue"] and application in ["GitHub", "GitLab"]:
        return GitHubParser()
    else:
        return GenericParser()

# core/parsers/whatsapp.py
import re
from datetime import datetime
from typing import Dict, Any, Optional, List
from .base import BaseParser

class WhatsAppParser(BaseParser):
    """Parser for WhatsApp conversations."""

    def __init__(self):
        # Regular expression for WhatsApp messages
        self.message_pattern = re.compile(r"^\\\\[(\\\\d{2}:\\\\d{2}), (\\\\d{1,2}/\\\\d{1,2}/\\\\d{4})\\\\] (.+?): (.+)$", re.MULTILINE)

    def parse(self, raw_text: str, structured_data: Optional[Dict[str, Any]] = None, metadata: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
        """Parse WhatsApp conversation."""
        # Use structured data if available
        if structured_data and "messages" in structured_data:
            return {
                "type": "conversation",
                "application": "WhatsApp",
                "title": metadata.get("window_title", "WhatsApp Chat"),
                "messages": structured_data["messages"]
            }

        # Otherwise parse from raw text
        messages = []
        for match in self.message_pattern.finditer(raw_text):
            time_str, date_str, sender, text = match.groups()

            # Parse timestamp
            try:
                # Convert DD/MM/YYYY to YYYY-MM-DD
                date_parts = date_str.split("/")
                iso_date = f"{date_parts[2]}-{date_parts[1]}-{date_parts[0]}"
                timestamp = f"{iso_date}T{time_str}:00"
            except:
                # Fallback to current time if parsing fails
                timestamp = datetime.now().isoformat()

            messages.append({
                "sender": sender,
                "text": text,
                "timestamp": timestamp
            })

        return {
            "type": "conversation",
            "application": "WhatsApp",
            "title": metadata.get("window_title", "WhatsApp Chat"),
            "messages": messages
        }

# core/parsers/github.py
import re
from typing import Dict, Any, Optional, List
from .base import BaseParser

class GitHubParser(BaseParser):
    """Parser for GitHub content."""

    def parse(self, raw_text: str, structured_data: Optional[Dict[str, Any]] = None, metadata: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
        """Parse GitHub content."""
        # Use structured data if available
        if structured_data:
            return structured_data

        # Extract from URL if available
        url = metadata.get("url", "")
        if "github.com" in url and "/pull/" in url:
            return self._parse_pr_from_url(url, raw_text, metadata)

        # Fallback to generic parsing
        return self._parse_generic_github(raw_text, metadata)

    def _parse_pr_from_url(self, url: str, raw_text: str, metadata: Dict[str, str]) -> Dict[str, Any]:
        """Parse PR information from GitHub URL."""
        # Extract repository and PR number from URL
        # Example: <https://github.com/owner/repo/pull/123>
        match = re.search(r"github\\\\.com/([^/]+)/([^/]+)/pull/(\\\\d+)", url)
        if not match:
            return self._parse_generic_github(raw_text, metadata)

        owner, repo, pr_number = match.groups()

        # Extract PR title from window title
        title = metadata.get("window_title", "")
        if " by " in title:
            title = title.split(" by ")[0].strip()

        # Extract PR status
        status = "open"
        if "Merged" in raw_text:
            status = "merged"
        elif "Closed" in raw_text:
            status = "closed"

        # Extract reviewers (simplified)
        reviewers = []
        review_section = re.search(r"Reviewers(.*?)Assignees", raw_text, re.DOTALL)
        if review_section:
            reviewers_text = review_section.group(1)
            reviewers = [name.strip() for name in reviewers_text.split() if name.strip()]

        return {
            "type": "pull_request",
            "application": "GitHub",
            "repository": f"{owner}/{repo}",
            "pr_number": pr_number,
            "title": title,
            "url": url,
            "status": status,
            "reviewers": reviewers
        }

    def _parse_generic_github(self, raw_text: str, metadata: Dict[str, str]) -> Dict[str, Any]:
        """Fallback parser for generic GitHub content."""
        return {
            "type": "github_page",
            "application": "GitHub",
            "title": metadata.get("window_title", "GitHub"),
            "url": metadata.get("url", ""),
            "content": raw_text
        }

# core/parsers/generic.py
from typing import Dict, Any, Optional
from .base import BaseParser

class GenericParser(BaseParser):
    """Generic parser for unstructured content."""

    def parse(self, raw_text: str, structured_data: Optional[Dict[str, Any]] = None, metadata: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
        """Parse generic content."""
        # Use structured data if available
        if structured_data:
            return structured_data

        # Extract application from metadata
        application = metadata.get("application", "unknown")

        # Extract title
        title = metadata.get("window_title", "Untitled")

        # Determine content type based on available information
        content_type = "document"
        if "email" in application.lower() or "@" in title:
            content_type = "email"
        elif any(app in application.lower() for app in ["chat", "slack", "teams", "messenger"]):
            content_type = "conversation"
        elif any(app in application.lower() for app in ["jira", "trello", "asana"]):
            content_type = "task"

        return {
            "type": content_type,
            "application": application,
            "title": title,
            "url": metadata.get("url", ""),
            "content": raw_text
        }

2.8 Entity Extraction Implementation