Based on the proposal, I'll create an end-to-end POC implementation of the Contextual AI Assistant using Mem0. Let me build this step by step.

Contextual AI Assistant: End-to-End POC Implementation

I'll create a working prototype that demonstrates how to implement the proposed system using Mem0's hybrid memory architecture. This POC will focus on:

  1. Setting up the core infrastructure
  2. Implementing the ingestion pipeline
  3. Building the memory system
  4. Creating the query processing logic
  5. Demonstrating with example scenarios

1. Setting Up the Environment

First, let's set up our development environment with the necessary dependencies:

# contextual_assistant_poc.py

import os
import json
import datetime
import uuid
from typing import Dict, List, Any, Optional
import numpy as np
from fastapi import FastAPI, HTTPException, Request
from pydantic import BaseModel
import uvicorn
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# Initialize FastAPI app
app = FastAPI(title="Contextual AI Assistant")

# Mock Mem0 client since we're building a POC
class MockMem0Client:
    def __init__(self):
        # Initialize our storage systems
        self.vector_store = {}  # Simulates vector database
        self.key_value_store = {}  # Simulates key-value store
        self.graph_store = {}  # Simulates graph database

        # Track relationships between entities
        self.relationships = []

    def add(self, entry: Dict[str, Any], user_id: str):
        """Add data to all appropriate stores"""
        # Generate unique ID for the entry
        entry_id = str(uuid.uuid4())

        # Store in vector store (in a real system, we'd compute embeddings)
        if "raw_content" in entry:
            self.vector_store[entry_id] = {
                "content": entry["raw_content"],
                "metadata": {k: v for k, v in entry.items() if k != "raw_content"},
                "user_id": user_id,
                "timestamp": entry.get("timestamp", datetime.datetime.now().isoformat())
            }

        # Store entities in key-value store
        if "entities" in entry:
            for entity in entry["entities"]:
                entity_id = entity.get("id", str(uuid.uuid4()))
                self.key_value_store[entity_id] = {
                    **entity,
                    "user_id": user_id,
                    "entry_id": entry_id,
                    "timestamp": entry.get("timestamp", datetime.datetime.now().isoformat())
                }

        # Store relationships in graph store
        if "relationships" in entry:
            for rel in entry["relationships"]:
                rel_id = str(uuid.uuid4())
                self.graph_store[rel_id] = {
                    **rel,
                    "user_id": user_id,
                    "entry_id": entry_id,
                    "timestamp": entry.get("timestamp", datetime.datetime.now().isoformat())
                }
                self.relationships.append(rel)

        return entry_id

    def search(self, query: str, user_id: str, context_type: str = "vector",
               filters: Dict[str, Any] = None, limit: int = 10):
        """Search for relevant information across stores"""
        results = []
        filters = filters or {}

        if context_type == "vector":
            # Simulate vector search with simple keyword matching
            for entry_id, entry in self.vector_store.items():
                if user_id != entry["user_id"]:
                    continue

                # Simple keyword matching (in a real system, we'd use embeddings)
                score = self._simple_similarity(query, entry["content"])

                # Apply filters
                if filters and not self._matches_filters(entry["metadata"], filters):
                    continue

                if score > 0.2:  # Arbitrary threshold
                    results.append({
                        "id": entry_id,
                        "content": entry["content"],
                        "score": score,
                        "metadata": entry["metadata"],
                        "timestamp": entry["timestamp"]
                    })

        elif context_type == "key_value":
            # Direct lookup with filters
            for entity_id, entity in self.key_value_store.items():
                if user_id != entity["user_id"]:
                    continue

                if filters and not self._matches_filters(entity, filters):
                    continue

                # Simple keyword matching on entity attributes
                score = 0
                for k, v in entity.items():
                    if isinstance(v, str) and any(term.lower() in v.lower() for term in query.split()):
                        score += 0.3

                if score > 0.2:  # Arbitrary threshold
                    results.append({
                        "id": entity_id,
                        **entity,
                        "score": score
                    })

        elif context_type == "graph":
            # Search relationships
            for rel_id, rel in self.graph_store.items():
                if user_id != rel["user_id"]:
                    continue

                if filters and not self._matches_filters(rel, filters):
                    continue

                # Simple keyword matching on relationship attributes
                score = 0
                for k, v in rel.items():
                    if isinstance(v, str) and any(term.lower() in v.lower() for term in query.split()):
                        score += 0.3

                if score > 0.2:  # Arbitrary threshold
                    results.append({
                        "id": rel_id,
                        **rel,
                        "score": score
                    })

                    # Also include connected entities
                    source_id = rel.get("source")
                    target_id = rel.get("target")

                    if source_id in self.key_value_store:
                        results.append({
                            "id": source_id,
                            **self.key_value_store[source_id],
                            "score": score * 0.9,
                            "connection": "source"
                        })

                    if target_id in self.key_value_store:
                        results.append({
                            "id": target_id,
                            **self.key_value_store[target_id],
                            "score": score * 0.9,
                            "connection": "target"
                        })

        # Sort by score and limit results
        results.sort(key=lambda x: x.get("score", 0), reverse=True)
        return results[:limit]

    def get(self, entity_id: str, user_id: str, context_type: str = "key_value"):
        """Get a specific entity by ID"""
        if context_type == "key_value" and entity_id in self.key_value_store:
            entity = self.key_value_store[entity_id]
            if entity["user_id"] == user_id:
                return entity
        return None

    def get_many(self, entity_ids: List[str], user_id: str, context_type: str = "key_value"):
        """Get multiple entities by ID"""
        results = []
        for entity_id in entity_ids:
            entity = self.get(entity_id, user_id, context_type)
            if entity:
                results.append(entity)
        return results

    def _simple_similarity(self, query: str, text: str) -> float:
        """Simple text similarity function (placeholder for vector similarity)"""
        query_terms = set(query.lower().split())
        text_terms = set(text.lower().split())

        # Jaccard similarity
        intersection = query_terms.intersection(text_terms)
        union = query_terms.union(text_terms)

        if not union:
            return 0.0

        return len(intersection) / len(union)

    def _matches_filters(self, item: Dict[str, Any], filters: Dict[str, Any]) -> bool:
        """Check if an item matches the given filters"""
        for k, v in filters.items():
            # Handle nested keys with dot notation
            if "." in k:
                parts = k.split(".")
                current = item
                for part in parts[:-1]:
                    if part not in current:
                        return False
                    current = current[part]

                if parts[-1] not in current or current[parts[-1]] != v:
                    return False
            # Handle direct keys
            elif k not in item or item[k] != v:
                return False
        return True

# Initialize our mock Mem0 client
memory = MockMem0Client()

2. Defining Data Models

Now, let's define the data models for our API endpoints:

# Data models for API endpoints
class ScreenData(BaseModel):
    """Model for screen data sent to the ingestion API"""
    content: str
    metadata: Optional[Dict[str, Any]] = None

class ChatQuery(BaseModel):
    """Model for queries sent to the chat completion API"""
    query: str
    user_id: str

class ChatResponse(BaseModel):
    """Model for responses from the chat completion API"""
    response: str
    sources: Optional[List[Dict[str, Any]]] = None

3. Implementing the Ingestion Pipeline

Next, let's implement the ingestion pipeline that processes screen data:

# Application classification and entity extraction

def classify_application(content: str, metadata: Optional[Dict[str, Any]] = None) -> str:
    """Classify the application type based on screen content and metadata"""
    # In a real system, we'd use more sophisticated classification
    # For this POC, we'll use simple keyword matching

    content_lower = content.lower()

    if metadata and "app_name" in metadata:
        return metadata["app_name"]

    if any(term in content_lower for term in ["message", "chat", "whatsapp", "telegram"]):
        return "messaging"

    if any(term in content_lower for term in ["github", "pull request", "pr", "commit", "code review"]):
        return "github"

    if any(term in content_lower for term in ["jira", "ticket", "issue", "story", "epic"]):
        return "jira"

    if any(term in content_lower for term in ["email", "subject", "inbox", "sent"]):
        return "email"

    return "unknown"

def extract_entities(content: str, app_type: str) -> List[Dict[str, Any]]:
    """Extract entities from screen content based on app type"""
    entities = []

    # In a real system, we'd use NER models or more sophisticated extraction
    # For this POC, we'll use simple regex and rules

    if app_type == "messaging":
        # Extract people and messages
        lines = content.split("\\\\n")
        current_sender = None

        for line in lines:
            if ":" in line:
                parts = line.split(":", 1)
                if len(parts) == 2:
                    sender = parts[0].strip()
                    message = parts[1].strip()

                    # Add person entity if new
                    if sender != current_sender:
                        current_sender = sender
                        person_id = f"person_{uuid.uuid4()}"
                        entities.append({
                            "type": "person",
                            "id": person_id,
                            "name": sender
                        })

                    # Add message entity
                    message_id = f"message_{uuid.uuid4()}"
                    entities.append({
                        "type": "message",
                        "id": message_id,
                        "content": message,
                        "sender": sender,
                        "status": "unreplied" if sender != "Me" and "?" in message else "normal"
                    })

    elif app_type == "github":
        # Extract PRs, comments, and people
        if "pull request" in content.lower() or "pr" in content.lower():
            # Extract PR details
            pr_id = f"pr_{uuid.uuid4()}"

            # Basic extraction of PR title
            title_match = None
            for line in content.split("\\\\n"):
                if line.strip() and not title_match:
                    title_match = line.strip()
                    break

            entities.append({
                "type": "pull_request",
                "id": pr_id,
                "title": title_match or "Untitled PR",
                "review_status": "pending" if "needs review" in content.lower() else "reviewed"
            })

            # Extract comments
            comment_lines = [line for line in content.split("\\\\n") if "commented:" in line.lower()]
            for line in comment_lines:
                parts = line.split("commented:", 1)
                if len(parts) == 2:
                    commenter = parts[0].strip()
                    comment = parts[1].strip()

                    # Add person entity
                    person_id = f"person_{uuid.uuid4()}"
                    entities.append({
                        "type": "person",
                        "id": person_id,
                        "name": commenter
                    })

                    # Add comment entity
                    comment_id = f"comment_{uuid.uuid4()}"
                    entities.append({
                        "type": "comment",
                        "id": comment_id,
                        "content": comment,
                        "author": commenter
                    })

    # Extract generic entities for all app types

    # Extract potential team mentions
    team_mentions = []
    if "team" in content.lower():
        for line in content.split("\\\\n"):
            if "team" in line.lower():
                team_mentions.append(line.strip())

    for mention in team_mentions:
        team_id = f"team_{uuid.uuid4()}"
        entities.append({
            "type": "team",
            "id": team_id,
            "name": mention
        })

    return entities

def identify_relationships(entities: List[Dict[str, Any]], app_type: str) -> List[Dict[str, Any]]:
    """Identify relationships between extracted entities"""
    relationships = []

    # Group entities by type for easier processing
    entities_by_type = {}
    for entity in entities:
        entity_type = entity["type"]
        if entity_type not in entities_by_type:
            entities_by_type[entity_type] = []
        entities_by_type[entity_type].append(entity)

    # Process relationships based on app type
    if app_type == "messaging":
        # Connect messages to senders
        messages = entities_by_type.get("message", [])
        people = {p["name"]: p for p in entities_by_type.get("person", [])}

        for message in messages:
            sender_name = message.get("sender")
            if sender_name in people:
                sender = people[sender_name]
                relationships.append({
                    "source": sender["id"],
                    "target": message["id"],
                    "type": "authored"
                })

                # If the message is not from "Me", create a received_by relationship
                if sender_name != "Me":
                    relationships.append({
                        "source": message["id"],
                        "target": "me",
                        "type": "received_by"
                    })
                else:
                    # Messages from "Me" are sent_to the previous message's sender
                    # This is a simplification - in a real system we'd track conversation threads
                    for prev_message in messages:
                        if prev_message["sender"] != "Me" and prev_message["id"] != message["id"]:
                            relationships.append({
                                "source": message["id"],
                                "target": people.get(prev_message["sender"], {}).get("id"),
                                "type": "sent_to"
                            })
                            break

    elif app_type == "github":
        # Connect PRs, comments, and people
        prs = entities_by_type.get("pull_request", [])
        comments = entities_by_type.get("comment", [])
        people = {p["name"]: p for p in entities_by_type.get("person", [])}

        for comment in comments:
            author_name = comment.get("author")
            if author_name in people:
                author = people[author_name]
                relationships.append({
                    "source": author["id"],
                    "target": comment["id"],
                    "type": "authored"
                })

                # Connect comment to PR (simplification - assumes one PR per screen)
                if prs:
                    relationships.append({
                        "source": comment["id"],
                        "target": prs[0]["id"],
                        "type": "comments_on"
                    })

    # Process team relationships
    teams = entities_by_type.get("team", [])
    for team in teams:
        # Connect people to teams based on content proximity
        # This is a simplification - in a real system we'd use more sophisticated methods
        for person_type in ["person"]:
            for person in entities_by_type.get(person_type, []):
                relationships.append({
                    "source": person["id"],
                    "target": team["id"],
                    "type": "member_of"
                })

    return relationships

def current_time():
    """Get current time in ISO format"""
    return datetime.datetime.now().isoformat()

def summarize_content(content: str) -> str:
    """Create a summary of the content for storage"""
    # In a real system, we'd use an LLM to generate a summary
    # For this POC, we'll just return the first 100 characters
    return content[:min(len(content), 100)]

@app.post("/ingest")
async def ingest_screen_data(screen_data: ScreenData, request: Request):
    """Ingest screen data from client"""
    # Extract user ID from request (in a real system, this would come from authentication)
    user_id = request.headers.get("X-User-ID", "default_user")

    # Process the screen data
    content = screen_data.content
    metadata = screen_data.metadata or {}

    # Classify application type
    app_type = classify_application(content, metadata)

    # Extract entities
    entities = extract_entities(content, app_type)

    # Identify relationships
    relationships = identify_relationships(entities, app_type)

    # Create memory entry
    memory_entry = {
        "app_type": app_type,
        "timestamp": current_time(),
        "entities": entities,
        "relationships": relationships,
        "raw_content": content,
        "metadata": metadata
    }

    # Store in memory system
    entry_id = memory.add(memory_entry, user_id)

    return {"status": "success", "entry_id": entry_id, "app_type": app_type}

4. Implementing the Query Processing

Now, let's implement the query processing logic:

# Mock LLM for generating responses
def mock_llm(prompt: str, context: List[Dict[str, Any]] = None) -> str:
    """Mock LLM function to generate responses based on context"""
    # In a real system, we'd call an actual LLM API
    # For this POC, we'll use templated responses

    context = context or []

    # Team identification query
    if "team" in prompt.lower() and "who" in prompt.lower():
        if not context:
            return "I couldn't find information about your team members."

        team_members = []
        for item in context:
            if item.get("type") == "person" and "member_of" in str(item):
                team_members.append(item.get("name", "Unknown"))

            if item.get("type") == "relationship" and item.get("type") == "member_of":
                person_id = item.get("source")
                for c in context:
                    if c.get("id") == person_id and c.get("name") not in team_members:
                        team_members.append(c.get("name", "Unknown"))

        if team_members:
            return f"Based on your screen history, your team includes: {', '.join(team_members)}"
        else:
            return "I couldn't identify specific team members from your screen history."

    # Unreplied messages query
    if "message" in prompt.lower() and "haven't replied" in prompt.lower():
        if not context:
            return "I couldn't find any unreplied messages."

        unreplied = []
        for item in context:
            if item.get("type") == "message" and item.get("status") == "unreplied":
                sender = item.get("sender", "Unknown")
                content = item.get("content", "")
                unreplied.append({"sender": sender, "content": content})

        if unreplied:
            response = "Here are the messages you haven't replied to:\\\\n\\\\n"
            for msg in unreplied:
                response += f"From {msg['sender']}: \\\\"{msg['content']}\\\\"\\\\n"
                response += f"Suggested reply: \\\\"Thank you for your message. I'll get back to you soon.\\\\"\\\\n\\\\n"
            return response
        else:
            return "I couldn't find any unreplied messages in your chat history."

    # PR reviews query
    if "pr" in prompt.lower() and "review" in prompt.lower():
        if not context:
            return "I couldn't find any PRs that need your review."

        pending_prs = []
        for item in context:
            if item.get("type") == "pull_request" and item.get("review_status") == "pending":
                title = item.get("title", "Untitled PR")
                pending_prs.append({"title": title})

        if pending_prs:
            response = "Here are the PRs that need your review:\\\\n\\\\n"
            for pr in pending_prs:
                response += f"- {pr['title']}\\\\n"
            return response
        else:
            return "I couldn't find any PRs that need your review in your screen history."

    # Default response
    return "I'm not sure how to answer that based on your screen history. Can you rephrase your question?"

def analyze_query(query: str) -> Dict[str, Any]:
    """Analyze the query to determine search strategy"""
    query_lower = query.lower()

    # Team identification query
    if "team" in query_lower and any(term in query_lower for term in ["who", "people", "members"]):
        return {
            "type": "team_identification",
            "search_strategy": [
                {"context_type": "graph", "query": "team members", "filters": {}},
                {"context_type": "vector", "query": "team members colleagues", "filters": {}}
            ]
        }

    # Unreplied messages query
    if "message" in query_lower and "haven't replied" in query_lower:
        app_filter = {}
        if "whatsapp" in query_lower:
            app_filter = {"app_type": "messaging"}

        return {
            "type": "unreplied_messages",
            "search_strategy": [
                {"context_type": "key_value", "query": "unreplied messages", "filters": {"status": "unreplied", **app_filter}},
                {"context_type": "vector", "query": "unreplied messages", "filters": app_filter}
            ]
        }

    # PR reviews query
    if any(term in query_lower for term in ["pr", "pull request"]) and "review" in query_lower:
        return {
            "type": "pending_reviews",
            "search_strategy": [
                {"context_type": "key_value", "query": "pending PR reviews", "filters": {"type": "pull_request", "review_status": "pending"}},
                {"context_type": "vector", "query": "pull requests that need review", "filters": {"app_type": "github"}}
            ]
        }

    # Default search strategy
    return {
        "type": "general",
        "search_strategy": [
            {"context_type": "vector", "query": query, "filters": {}}
        ]
    }

@app.post("/chat_completion")
async def chat_completion(chat_query: ChatQuery):
    """Process user query and generate response"""
    query = chat_query.query
    user_id = chat_query.user_id

    # Analyze the query
    analysis = analyze_query(query)
    query_type = analysis["type"]
    search_strategy = analysis["search_strategy"]

    # Collect context from memory
    context = []
    for strategy in search_strategy:
        results = memory.search(
            query=strategy["query"],
            user_id=user_id,
            context_type=strategy["context_type"],
            filters=strategy["filters"]
        )
        context.extend(results)

    # Generate response using LLM
    response = mock_llm(query, context)

    # Prepare sources
    sources = []
    for item in context:
        if "id" in item and "type" in item:
            sources.append({
                "id": item["id"],
                "type": item["type"],
                "score": item.get("score", 0)
            })

    return {"response": response, "sources": sources[:5]}