Setting Up GraphRAG with Mem0 in 6 Hours

Based on your requirements, I'll provide a complete implementation plan for building a GraphRAG system using Mem0's graph memory functionality. This solution will focus exclusively on Mem0 for the knowledge graph component without relying on other frameworks.

1. Project Structure (15 minutes)

graphmem/
├── app.py               # FastAPI application
├── ingest.py            # Ingestion logic
├── query.py             # Query processing
├── parsers/             # Application-specific parsers
│   ├── __init__.py
│   ├── base.py          # Base parser class
│   ├── whatsapp.py      # WhatsApp parser
│   ├── github.py        # GitHub parser
│   └── generic.py       # Generic content parser
├── memory/              # Memory management
│   ├── __init__.py
│   └── graph_memory.py  # Mem0 graph memory implementation
├── utils/               # Utility functions
│   ├── __init__.py
│   └── helpers.py       # Helper functions
├── config.py            # Configuration
├── requirements.txt     # Dependencies
└── demo.ipynb           # Demo notebook

2. Setting Up Dependencies (15 minutes)

Create a requirements.txt file:

mem0ai==0.2.4
fastapi==0.104.1
uvicorn==0.24.0
python-dotenv==1.0.0
pydantic==2.4.2
openai==1.3.0
pillow==10.1.0
pytesseract==0.3.10

Install the dependencies:

pip install -r requirements.txt

3. Implementing the Memory Layer with Mem0 (45 minutes)

# memory/graph_memory.py
from typing import Dict, Any, List, Optional, Tuple
import logging
from mem0 import Memory

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class GraphMemory:
    """Wrapper around Mem0's memory for our application."""

    def __init__(self, user_id: str = "user123"):
        """
        Initialize the graph memory.

        Args:
            user_id: Identifier for the user
        """
        self.memory = Memory()
        self.user_id = user_id

        # Initialize a unique session for this instance
        self.session_id = f"session_{user_id}"

    def add_entity(self, entity: Dict[str, Any]) -> None:
        """
        Add an entity to the graph memory.

        Args:
            entity: Dictionary containing entity information
        """
        try:
            # Create a unique entity ID
            entity_id = f"{entity['type']}:{entity['entity']}"

            # Store entity in memory
            self.memory.add(
                content=f"Entity: {entity['entity']} is a {entity['type']}. {entity['description']}",
                user_id=self.user_id,
                metadata={
                    "type": "entity",
                    "entity_type": entity["type"],
                    "entity_id": entity_id,
                    "entity_name": entity["entity"],
                    "source": entity.get("source", "unknown"),
                    "description": entity["description"]
                },
                context_type="graph"
            )

            logger.info(f"Added entity: {entity['entity']}")

        except Exception as e:
            logger.error(f"Error adding entity: {str(e)}")
            raise

    def add_relationship(self, relationship: Dict[str, Any]) -> None:
        """
        Add a relationship to the graph memory.

        Args:
            relationship: Dictionary containing relationship information
        """
        try:
            # Create a unique relationship ID
            rel_id = f"{relationship['source']}_{relationship['relation']}_{relationship['target']}"

            # Store relationship in memory
            self.memory.add(
                content=f"Relationship: {relationship['source']} {relationship['relation']} {relationship['target']}. {relationship['description']}",
                user_id=self.user_id,
                metadata={
                    "type": "relationship",
                    "relationship_id": rel_id,
                    "source": relationship["source"],
                    "target": relationship["target"],
                    "relation": relationship["relation"],
                    "source_app": relationship.get("source_app", "unknown"),
                    "description": relationship["description"],
                    **{k: v for k, v in relationship.items() if k not in ["source", "target", "relation", "description", "source_app"]}
                },
                context_type="graph"
            )

            logger.info(f"Added relationship: {relationship['source']} {relationship['relation']} {relationship['target']}")

        except Exception as e:
            logger.error(f"Error adding relationship: {str(e)}")
            raise

    def search_entities(self, query: str, entity_type: Optional[str] = None) -> List[Dict[str, Any]]:
        """
        Search for entities in the graph memory.

        Args:
            query: Search query
            entity_type: Optional type to filter results

        Returns:
            List of matching entities
        """
        try:
            # Create filter based on entity type
            filter_dict = {"type": "entity"}
            if entity_type:
                filter_dict["entity_type"] = entity_type

            # Search memory
            results = self.memory.search(
                query,
                user_id=self.user_id,
                context_type="graph",
                metadata_filter=filter_dict
            )

            # Format results
            entities = []
            for result in results:
                metadata = result.metadata
                entities.append({
                    "entity": metadata.get("entity_name", ""),
                    "type": metadata.get("entity_type", ""),
                    "description": metadata.get("description", ""),
                    "source": metadata.get("source", "unknown"),
                    "score": result.score
                })

            return entities

        except Exception as e:
            logger.error(f"Error searching entities: {str(e)}")
            return []

    def search_relationships(self, query: str, relation_type: Optional[str] = None) -> List[Dict[str, Any]]:
        """
        Search for relationships in the graph memory.

        Args:
            query: Search query
            relation_type: Optional relation type to filter results

        Returns:
            List of matching relationships
        """
        try:
            # Create filter based on relation type
            filter_dict = {"type": "relationship"}
            if relation_type:
                filter_dict["relation"] = relation_type

            # Search memory
            results = self.memory.search(
                query,
                user_id=self.user_id,
                context_type="graph",
                metadata_filter=filter_dict
            )

            # Format results
            relationships = []
            for result in results:
                metadata = result.metadata
                relationships.append({
                    "source": metadata.get("source", ""),
                    "target": metadata.get("target", ""),
                    "relation": metadata.get("relation", ""),
                    "description": metadata.get("description", ""),
                    "source_app": metadata.get("source_app", "unknown"),
                    "score": result.score,
                    **{k: v for k, v in metadata.items() if k not in ["source", "target", "relation", "description", "source_app", "type", "relationship_id"]}
                })

            return relationships

        except Exception as e:
            logger.error(f"Error searching relationships: {str(e)}")
            return []

    def get_team_members(self) -> List[str]:
        """
        Get all team members from the graph memory.

        Returns:
            List of team member names
        """
        # Search for team member relationships
        relationships = self.search_relationships("team_member", "team_member")

        # Extract unique team members
        team_members = set()
        for rel in relationships:
            if rel["source"] != "User":
                team_members.add(rel["source"])
            if rel["target"] != "User":
                team_members.add(rel["target"])

        return list(team_members)

    def get_unreplied_messages(self) -> List[Dict[str, Any]]:
        """
        Get all unreplied messages from the graph memory.

        Returns:
            List of unreplied message details
        """
        # Search for unreplied message relationships
        relationships = self.search_relationships("unreplied message", "sent_unreplied_message")

        # Format the results
        unreplied_messages = []
        for rel in relationships:
            unreplied_messages.append({
                "sender": rel["source"],
                "message": rel.get("message_content", ""),
                "timestamp": rel.get("timestamp", ""),
                "source_app": rel["source_app"]
            })

        return unreplied_messages

    def get_prs_needing_review(self) -> List[Dict[str, Any]]:
        """
        Get all PRs needing review from the graph memory.

        Returns:
            List of PR details
        """
        # Search for PR review relationships
        relationships = self.search_relationships("PR needs review", "needs_review")

        # Format the results
        prs = []
        for rel in relationships:
            prs.append({
                "pr": rel["source"],
                "repository": rel.get("repository", ""),
                "url": rel.get("url", ""),
                "description": rel["description"]
            })

        return prs

4. Implementing the Base Parser (30 minutes)

# parsers/base.py
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Tuple

class BaseParser(ABC):
    """Base class for all content parsers."""

    @abstractmethod
    def parse(self, content: Dict[str, Any]) -> Tuple[List[Dict], List[Dict]]:
        """
        Parse content and extract entities and relationships.

        Args:
            content: Dict containing screen content

        Returns:
            Tuple of (entities, relationships)
            - entities: List of dicts with keys 'entity', 'type', 'description', 'source'
            - relationships: List of dicts with keys 'source', 'target', 'relation', 'description', 'source_app'
        """
        pass

    def get_app_type(self, content: Dict[str, Any]) -> str:
        """
        Determine the application type from content.

        Args:
            content: Dict containing screen content

        Returns:
            String identifying the application type
        """
        app_name = content.get('application', '').lower()

        if 'whatsapp' in app_name:
            return 'whatsapp'
        elif any(x in app_name for x in ['github', 'gitlab']):
            return 'github'
        else:
            return 'generic'

5. Implementing Application-Specific Parsers (1 hour)

Generic Parser

# parsers/generic.py
import re
from typing import Dict, Any, List, Tuple
from .base import BaseParser

class GenericParser(BaseParser):
    """Parser for generic content."""

    def parse(self, content: Dict[str, Any]) -> Tuple[List[Dict], List[Dict]]:
        """Parse generic content to extract entities and relationships."""
        entities = []
        relationships = []

        # Extract text content
        text = self._extract_text(content)

        # Extract potential entities (names, organizations)
        potential_entities = self._extract_potential_entities(text)

        for entity, entity_type in potential_entities:
            entities.append({
                'entity': entity,
                'type': entity_type,
                'description': f"Entity found in {content.get('application', 'unknown application')}",
                'source': content.get('application', 'unknown')
            })

        # Extract potential relationships (simplified)
        for i, entity1 in enumerate(potential_entities):
            for entity2 in potential_entities[i+1:]:
                if self._are_related(entity1[0], entity2[0], text):
                    relationships.append({
                        'source': entity1[0],
                        'target': entity2[0],
                        'relation': 'mentioned_with',
                        'description': f"Entities mentioned together in {content.get('application', 'unknown application')}",
                        'source_app': content.get('application', 'unknown')
                    })

        return entities, relationships

    def _extract_text(self, content: Dict[str, Any]) -> str:
        """Extract all text from content."""
        text = ""

        # Extract direct text
        if 'text' in content:
            text += content['text'] + " "

        # Extract text from elements
        if 'elements' in content:
            for element in content['elements']:
                if 'content' in element and isinstance(element['content'], str):
                    text += element['content'] + " "

        return text

    def _extract_potential_entities(self, text: str) -> List[Tuple[str, str]]:
        """Extract potential entities from text."""
        entities = []

        # Simple pattern for names (capitalized words)
        name_pattern = r'\\\\b[A-Z][a-z]+ [A-Z][a-z]+\\\\b'
        names = re.findall(name_pattern, text)
        for name in names:
            entities.append((name, 'PERSON'))

        # Simple pattern for organizations (capitalized words with non-space chars)
        org_pattern = r'\\\\b[A-Z][A-Za-z]+([ \\\\-][A-Z][A-Za-z]+)+\\\\b'
        orgs = re.findall(org_pattern, text)
        for org in orgs:
            if org not in names:  # Avoid duplicates
                entities.append((org, 'ORGANIZATION'))

        return entities

    def _are_related(self, entity1: str, entity2: str, text: str) -> bool:
        """Check if two entities are related in the text."""
        # Simple heuristic: check if entities appear within 50 characters of each other
        entity1_pos = text.find(entity1)
        entity2_pos = text.find(entity2)

        if entity1_pos == -1 or entity2_pos == -1:
            return False

        return abs(entity1_pos - entity2_pos) < 50

WhatsApp Parser

# parsers/whatsapp.py
from typing import Dict, Any, List, Tuple
from .base import BaseParser
import re

class WhatsAppParser(BaseParser):
    """Parser for WhatsApp content."""

    def parse(self, content: Dict[str, Any]) -> Tuple[List[Dict], List[Dict]]:
        """Parse WhatsApp content to extract entities and relationships."""
        entities = []
        relationships = []

        # Extract text content
        text = self._extract_text(content)

        # Extract chat participants
        participants = self._extract_participants(content)

        # Add participants as entities
        for participant in participants:
            entities.append({
                'entity': participant,
                'type': 'PERSON',
                'description': f"WhatsApp contact",
                'source': 'whatsapp'
            })

        # Extract messages
        messages = self._extract_messages(content)

        # Process messages to find unreplied ones
        for message in messages:
            if not message.get('replied', True):
                # Create relationship for unreplied message
                if message.get('sender') and message.get('sender') != 'User':
                    relationships.append({
                        'source': message.get('sender', 'Unknown'),
                        'target': 'User',
                        'relation': 'sent_unreplied_message',
                        'description': f"Unreplied message: {message.get('content', '')}",
                        'source_app': 'whatsapp',
                        'timestamp': message.get('timestamp'),
                        'message_content': message.get('content', '')
                    })

        return entities, relationships

    def _extract_text(self, content: Dict[str, Any]) -> str:
        """Extract all text from content."""
        text = ""

        # Extract direct text
        if 'text' in content:
            text += content['text'] + " "

        # Extract text from elements
        if 'elements' in content:
            for element in content['elements']:
                if 'content' in element and isinstance(element['content'], str):
                    text += element['content'] + " "

        return text

    def _extract_participants(self, content: Dict[str, Any]) -> List[str]:
        """Extract chat participants from WhatsApp content."""
        participants = []

        # Extract from header elements
        if 'elements' in content:
            for element in content['elements']:
                if element.get('type') == 'header' and 'content' in element:
                    # Typically the chat header contains the name of the contact or group
                    participants.append(element['content'])

        # Extract from message senders
        messages = self._extract_messages(content)
        for message in messages:
            if 'sender' in message and message['sender'] not in participants and message['sender'] != 'User':
                participants.append(message['sender'])

        return participants

    def _extract_messages(self, content: Dict[str, Any]) -> List[Dict]:
        """Extract messages from WhatsApp content."""
        messages = []

        # Look for message elements
        if 'elements' in content:
            for element in content['elements']:
                if element.get('type') == 'message':
                    message = {
                        'sender': element.get('sender', 'Unknown'),
                        'content': element.get('content', ''),
                        'timestamp': element.get('timestamp'),
                        'replied': self._is_replied(element)
                    }
                    messages.append(message)

        return messages

    def _is_replied(self, message_element: Dict) -> bool:
        """Determine if a message has been replied to."""
        # Check for reply indicators in the message element
        if 'attributes' in message_element:
            attrs = message_element['attributes']
            if 'replied' in attrs:
                return attrs['replied']

            # Check for visual indicators that might suggest a reply
            if 'has_reply_indicator' in attrs:
                return attrs['has_reply_indicator']

        # Default to True (assuming replied) to avoid false positives
        return True

GitHub Parser