Based on the proposal, I'll create an end-to-end POC implementation of the Contextual AI Assistant using Mem0. Let me build this step by step.
I'll create a working prototype that demonstrates how to implement the proposed system using Mem0's hybrid memory architecture. This POC will focus on:
First, let's set up our development environment with the necessary dependencies:
# contextual_assistant_poc.py
import os
import json
import datetime
import uuid
from typing import Dict, List, Any, Optional
import numpy as np
from fastapi import FastAPI, HTTPException, Request
from pydantic import BaseModel
import uvicorn
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Initialize FastAPI app
app = FastAPI(title="Contextual AI Assistant")
# Mock Mem0 client since we're building a POC
class MockMem0Client:
def __init__(self):
# Initialize our storage systems
self.vector_store = {} # Simulates vector database
self.key_value_store = {} # Simulates key-value store
self.graph_store = {} # Simulates graph database
# Track relationships between entities
self.relationships = []
def add(self, entry: Dict[str, Any], user_id: str):
"""Add data to all appropriate stores"""
# Generate unique ID for the entry
entry_id = str(uuid.uuid4())
# Store in vector store (in a real system, we'd compute embeddings)
if "raw_content" in entry:
self.vector_store[entry_id] = {
"content": entry["raw_content"],
"metadata": {k: v for k, v in entry.items() if k != "raw_content"},
"user_id": user_id,
"timestamp": entry.get("timestamp", datetime.datetime.now().isoformat())
}
# Store entities in key-value store
if "entities" in entry:
for entity in entry["entities"]:
entity_id = entity.get("id", str(uuid.uuid4()))
self.key_value_store[entity_id] = {
**entity,
"user_id": user_id,
"entry_id": entry_id,
"timestamp": entry.get("timestamp", datetime.datetime.now().isoformat())
}
# Store relationships in graph store
if "relationships" in entry:
for rel in entry["relationships"]:
rel_id = str(uuid.uuid4())
self.graph_store[rel_id] = {
**rel,
"user_id": user_id,
"entry_id": entry_id,
"timestamp": entry.get("timestamp", datetime.datetime.now().isoformat())
}
self.relationships.append(rel)
return entry_id
def search(self, query: str, user_id: str, context_type: str = "vector",
filters: Dict[str, Any] = None, limit: int = 10):
"""Search for relevant information across stores"""
results = []
filters = filters or {}
if context_type == "vector":
# Simulate vector search with simple keyword matching
for entry_id, entry in self.vector_store.items():
if user_id != entry["user_id"]:
continue
# Simple keyword matching (in a real system, we'd use embeddings)
score = self._simple_similarity(query, entry["content"])
# Apply filters
if filters and not self._matches_filters(entry["metadata"], filters):
continue
if score > 0.2: # Arbitrary threshold
results.append({
"id": entry_id,
"content": entry["content"],
"score": score,
"metadata": entry["metadata"],
"timestamp": entry["timestamp"]
})
elif context_type == "key_value":
# Direct lookup with filters
for entity_id, entity in self.key_value_store.items():
if user_id != entity["user_id"]:
continue
if filters and not self._matches_filters(entity, filters):
continue
# Simple keyword matching on entity attributes
score = 0
for k, v in entity.items():
if isinstance(v, str) and any(term.lower() in v.lower() for term in query.split()):
score += 0.3
if score > 0.2: # Arbitrary threshold
results.append({
"id": entity_id,
**entity,
"score": score
})
elif context_type == "graph":
# Search relationships
for rel_id, rel in self.graph_store.items():
if user_id != rel["user_id"]:
continue
if filters and not self._matches_filters(rel, filters):
continue
# Simple keyword matching on relationship attributes
score = 0
for k, v in rel.items():
if isinstance(v, str) and any(term.lower() in v.lower() for term in query.split()):
score += 0.3
if score > 0.2: # Arbitrary threshold
results.append({
"id": rel_id,
**rel,
"score": score
})
# Also include connected entities
source_id = rel.get("source")
target_id = rel.get("target")
if source_id in self.key_value_store:
results.append({
"id": source_id,
**self.key_value_store[source_id],
"score": score * 0.9,
"connection": "source"
})
if target_id in self.key_value_store:
results.append({
"id": target_id,
**self.key_value_store[target_id],
"score": score * 0.9,
"connection": "target"
})
# Sort by score and limit results
results.sort(key=lambda x: x.get("score", 0), reverse=True)
return results[:limit]
def get(self, entity_id: str, user_id: str, context_type: str = "key_value"):
"""Get a specific entity by ID"""
if context_type == "key_value" and entity_id in self.key_value_store:
entity = self.key_value_store[entity_id]
if entity["user_id"] == user_id:
return entity
return None
def get_many(self, entity_ids: List[str], user_id: str, context_type: str = "key_value"):
"""Get multiple entities by ID"""
results = []
for entity_id in entity_ids:
entity = self.get(entity_id, user_id, context_type)
if entity:
results.append(entity)
return results
def _simple_similarity(self, query: str, text: str) -> float:
"""Simple text similarity function (placeholder for vector similarity)"""
query_terms = set(query.lower().split())
text_terms = set(text.lower().split())
# Jaccard similarity
intersection = query_terms.intersection(text_terms)
union = query_terms.union(text_terms)
if not union:
return 0.0
return len(intersection) / len(union)
def _matches_filters(self, item: Dict[str, Any], filters: Dict[str, Any]) -> bool:
"""Check if an item matches the given filters"""
for k, v in filters.items():
# Handle nested keys with dot notation
if "." in k:
parts = k.split(".")
current = item
for part in parts[:-1]:
if part not in current:
return False
current = current[part]
if parts[-1] not in current or current[parts[-1]] != v:
return False
# Handle direct keys
elif k not in item or item[k] != v:
return False
return True
# Initialize our mock Mem0 client
memory = MockMem0Client()
Now, let's define the data models for our API endpoints:
# Data models for API endpoints
class ScreenData(BaseModel):
"""Model for screen data sent to the ingestion API"""
content: str
metadata: Optional[Dict[str, Any]] = None
class ChatQuery(BaseModel):
"""Model for queries sent to the chat completion API"""
query: str
user_id: str
class ChatResponse(BaseModel):
"""Model for responses from the chat completion API"""
response: str
sources: Optional[List[Dict[str, Any]]] = None
Next, let's implement the ingestion pipeline that processes screen data:
# Application classification and entity extraction
def classify_application(content: str, metadata: Optional[Dict[str, Any]] = None) -> str:
"""Classify the application type based on screen content and metadata"""
# In a real system, we'd use more sophisticated classification
# For this POC, we'll use simple keyword matching
content_lower = content.lower()
if metadata and "app_name" in metadata:
return metadata["app_name"]
if any(term in content_lower for term in ["message", "chat", "whatsapp", "telegram"]):
return "messaging"
if any(term in content_lower for term in ["github", "pull request", "pr", "commit", "code review"]):
return "github"
if any(term in content_lower for term in ["jira", "ticket", "issue", "story", "epic"]):
return "jira"
if any(term in content_lower for term in ["email", "subject", "inbox", "sent"]):
return "email"
return "unknown"
def extract_entities(content: str, app_type: str) -> List[Dict[str, Any]]:
"""Extract entities from screen content based on app type"""
entities = []
# In a real system, we'd use NER models or more sophisticated extraction
# For this POC, we'll use simple regex and rules
if app_type == "messaging":
# Extract people and messages
lines = content.split("\\\\n")
current_sender = None
for line in lines:
if ":" in line:
parts = line.split(":", 1)
if len(parts) == 2:
sender = parts[0].strip()
message = parts[1].strip()
# Add person entity if new
if sender != current_sender:
current_sender = sender
person_id = f"person_{uuid.uuid4()}"
entities.append({
"type": "person",
"id": person_id,
"name": sender
})
# Add message entity
message_id = f"message_{uuid.uuid4()}"
entities.append({
"type": "message",
"id": message_id,
"content": message,
"sender": sender,
"status": "unreplied" if sender != "Me" and "?" in message else "normal"
})
elif app_type == "github":
# Extract PRs, comments, and people
if "pull request" in content.lower() or "pr" in content.lower():
# Extract PR details
pr_id = f"pr_{uuid.uuid4()}"
# Basic extraction of PR title
title_match = None
for line in content.split("\\\\n"):
if line.strip() and not title_match:
title_match = line.strip()
break
entities.append({
"type": "pull_request",
"id": pr_id,
"title": title_match or "Untitled PR",
"review_status": "pending" if "needs review" in content.lower() else "reviewed"
})
# Extract comments
comment_lines = [line for line in content.split("\\\\n") if "commented:" in line.lower()]
for line in comment_lines:
parts = line.split("commented:", 1)
if len(parts) == 2:
commenter = parts[0].strip()
comment = parts[1].strip()
# Add person entity
person_id = f"person_{uuid.uuid4()}"
entities.append({
"type": "person",
"id": person_id,
"name": commenter
})
# Add comment entity
comment_id = f"comment_{uuid.uuid4()}"
entities.append({
"type": "comment",
"id": comment_id,
"content": comment,
"author": commenter
})
# Extract generic entities for all app types
# Extract potential team mentions
team_mentions = []
if "team" in content.lower():
for line in content.split("\\\\n"):
if "team" in line.lower():
team_mentions.append(line.strip())
for mention in team_mentions:
team_id = f"team_{uuid.uuid4()}"
entities.append({
"type": "team",
"id": team_id,
"name": mention
})
return entities
def identify_relationships(entities: List[Dict[str, Any]], app_type: str) -> List[Dict[str, Any]]:
"""Identify relationships between extracted entities"""
relationships = []
# Group entities by type for easier processing
entities_by_type = {}
for entity in entities:
entity_type = entity["type"]
if entity_type not in entities_by_type:
entities_by_type[entity_type] = []
entities_by_type[entity_type].append(entity)
# Process relationships based on app type
if app_type == "messaging":
# Connect messages to senders
messages = entities_by_type.get("message", [])
people = {p["name"]: p for p in entities_by_type.get("person", [])}
for message in messages:
sender_name = message.get("sender")
if sender_name in people:
sender = people[sender_name]
relationships.append({
"source": sender["id"],
"target": message["id"],
"type": "authored"
})
# If the message is not from "Me", create a received_by relationship
if sender_name != "Me":
relationships.append({
"source": message["id"],
"target": "me",
"type": "received_by"
})
else:
# Messages from "Me" are sent_to the previous message's sender
# This is a simplification - in a real system we'd track conversation threads
for prev_message in messages:
if prev_message["sender"] != "Me" and prev_message["id"] != message["id"]:
relationships.append({
"source": message["id"],
"target": people.get(prev_message["sender"], {}).get("id"),
"type": "sent_to"
})
break
elif app_type == "github":
# Connect PRs, comments, and people
prs = entities_by_type.get("pull_request", [])
comments = entities_by_type.get("comment", [])
people = {p["name"]: p for p in entities_by_type.get("person", [])}
for comment in comments:
author_name = comment.get("author")
if author_name in people:
author = people[author_name]
relationships.append({
"source": author["id"],
"target": comment["id"],
"type": "authored"
})
# Connect comment to PR (simplification - assumes one PR per screen)
if prs:
relationships.append({
"source": comment["id"],
"target": prs[0]["id"],
"type": "comments_on"
})
# Process team relationships
teams = entities_by_type.get("team", [])
for team in teams:
# Connect people to teams based on content proximity
# This is a simplification - in a real system we'd use more sophisticated methods
for person_type in ["person"]:
for person in entities_by_type.get(person_type, []):
relationships.append({
"source": person["id"],
"target": team["id"],
"type": "member_of"
})
return relationships
def current_time():
"""Get current time in ISO format"""
return datetime.datetime.now().isoformat()
def summarize_content(content: str) -> str:
"""Create a summary of the content for storage"""
# In a real system, we'd use an LLM to generate a summary
# For this POC, we'll just return the first 100 characters
return content[:min(len(content), 100)]
@app.post("/ingest")
async def ingest_screen_data(screen_data: ScreenData, request: Request):
"""Ingest screen data from client"""
# Extract user ID from request (in a real system, this would come from authentication)
user_id = request.headers.get("X-User-ID", "default_user")
# Process the screen data
content = screen_data.content
metadata = screen_data.metadata or {}
# Classify application type
app_type = classify_application(content, metadata)
# Extract entities
entities = extract_entities(content, app_type)
# Identify relationships
relationships = identify_relationships(entities, app_type)
# Create memory entry
memory_entry = {
"app_type": app_type,
"timestamp": current_time(),
"entities": entities,
"relationships": relationships,
"raw_content": content,
"metadata": metadata
}
# Store in memory system
entry_id = memory.add(memory_entry, user_id)
return {"status": "success", "entry_id": entry_id, "app_type": app_type}
Now, let's implement the query processing logic:
# Mock LLM for generating responses
def mock_llm(prompt: str, context: List[Dict[str, Any]] = None) -> str:
"""Mock LLM function to generate responses based on context"""
# In a real system, we'd call an actual LLM API
# For this POC, we'll use templated responses
context = context or []
# Team identification query
if "team" in prompt.lower() and "who" in prompt.lower():
if not context:
return "I couldn't find information about your team members."
team_members = []
for item in context:
if item.get("type") == "person" and "member_of" in str(item):
team_members.append(item.get("name", "Unknown"))
if item.get("type") == "relationship" and item.get("type") == "member_of":
person_id = item.get("source")
for c in context:
if c.get("id") == person_id and c.get("name") not in team_members:
team_members.append(c.get("name", "Unknown"))
if team_members:
return f"Based on your screen history, your team includes: {', '.join(team_members)}"
else:
return "I couldn't identify specific team members from your screen history."
# Unreplied messages query
if "message" in prompt.lower() and "haven't replied" in prompt.lower():
if not context:
return "I couldn't find any unreplied messages."
unreplied = []
for item in context:
if item.get("type") == "message" and item.get("status") == "unreplied":
sender = item.get("sender", "Unknown")
content = item.get("content", "")
unreplied.append({"sender": sender, "content": content})
if unreplied:
response = "Here are the messages you haven't replied to:\\\\n\\\\n"
for msg in unreplied:
response += f"From {msg['sender']}: \\\\"{msg['content']}\\\\"\\\\n"
response += f"Suggested reply: \\\\"Thank you for your message. I'll get back to you soon.\\\\"\\\\n\\\\n"
return response
else:
return "I couldn't find any unreplied messages in your chat history."
# PR reviews query
if "pr" in prompt.lower() and "review" in prompt.lower():
if not context:
return "I couldn't find any PRs that need your review."
pending_prs = []
for item in context:
if item.get("type") == "pull_request" and item.get("review_status") == "pending":
title = item.get("title", "Untitled PR")
pending_prs.append({"title": title})
if pending_prs:
response = "Here are the PRs that need your review:\\\\n\\\\n"
for pr in pending_prs:
response += f"- {pr['title']}\\\\n"
return response
else:
return "I couldn't find any PRs that need your review in your screen history."
# Default response
return "I'm not sure how to answer that based on your screen history. Can you rephrase your question?"
def analyze_query(query: str) -> Dict[str, Any]:
"""Analyze the query to determine search strategy"""
query_lower = query.lower()
# Team identification query
if "team" in query_lower and any(term in query_lower for term in ["who", "people", "members"]):
return {
"type": "team_identification",
"search_strategy": [
{"context_type": "graph", "query": "team members", "filters": {}},
{"context_type": "vector", "query": "team members colleagues", "filters": {}}
]
}
# Unreplied messages query
if "message" in query_lower and "haven't replied" in query_lower:
app_filter = {}
if "whatsapp" in query_lower:
app_filter = {"app_type": "messaging"}
return {
"type": "unreplied_messages",
"search_strategy": [
{"context_type": "key_value", "query": "unreplied messages", "filters": {"status": "unreplied", **app_filter}},
{"context_type": "vector", "query": "unreplied messages", "filters": app_filter}
]
}
# PR reviews query
if any(term in query_lower for term in ["pr", "pull request"]) and "review" in query_lower:
return {
"type": "pending_reviews",
"search_strategy": [
{"context_type": "key_value", "query": "pending PR reviews", "filters": {"type": "pull_request", "review_status": "pending"}},
{"context_type": "vector", "query": "pull requests that need review", "filters": {"app_type": "github"}}
]
}
# Default search strategy
return {
"type": "general",
"search_strategy": [
{"context_type": "vector", "query": query, "filters": {}}
]
}
@app.post("/chat_completion")
async def chat_completion(chat_query: ChatQuery):
"""Process user query and generate response"""
query = chat_query.query
user_id = chat_query.user_id
# Analyze the query
analysis = analyze_query(query)
query_type = analysis["type"]
search_strategy = analysis["search_strategy"]
# Collect context from memory
context = []
for strategy in search_strategy:
results = memory.search(
query=strategy["query"],
user_id=user_id,
context_type=strategy["context_type"],
filters=strategy["filters"]
)
context.extend(results)
# Generate response using LLM
response = mock_llm(query, context)
# Prepare sources
sources = []
for item in context:
if "id" in item and "type" in item:
sources.append({
"id": item["id"],
"type": item["type"],
"score": item.get("score", 0)
})
return {"response": response, "sources": sources[:5]}