Gemini
feat: add detailed logging
01d5a5d
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Dict, List, Optional, Union
import json
import logging
import numpy as np
DEFAULT_EMBEDDING_DIM = 1536
DISTANCE_RATE = 0.8
class TimeType(str, Enum):
RECENT = "recent"
EARLIER = "earlier"
MIN_MEMORIES_N = {TimeType.RECENT: 3, TimeType.EARLIER: 10}
TIME_RANGE = {TimeType.RECENT: 60 * 60 * 24 * 1, TimeType.EARLIER: 60 * 60 * 24 * 7}
class MemoryType(str, Enum):
TEXT = "TEXT"
MARKDOWN = "MARKDOWN"
PDF = "PDF"
LINK = "LINK"
class AnalysisType(str, Enum):
SUBJECT = "SUBJECT"
OBJECT = "OBJECT"
CHAT = "CHAT"
def datetime2timestamp(time_str: str) -> float:
"""Convert datetime string to timestamp.
Args:
time_str: String representation of datetime in TIME_FORMAT format.
Returns:
Timestamp in seconds.
Raises:
Exception: If time_str has invalid format.
"""
try:
timestamp = datetime.strptime(time_str, TIME_FORMAT).timestamp()
return timestamp
except Exception as e:
logging.error(f"Invalid time format: {time_str}")
raise e
OBJECT_NOTE_TYPE = [MemoryType.LINK]
SUBJECT_NOTE_TYPE = [
MemoryType.TEXT,
MemoryType.MARKDOWN,
MemoryType.PDF,
]
TIME_FORMAT = "%Y-%m-%d %H:%M:%S"
TAG_TYPE = {
TimeType.RECENT: {"time": "Today", "default": "Recent"},
TimeType.EARLIER: {"time": "Earlier", "default": "Earlier"},
}
class Chunk:
"""Represents a chunk of document content with embedding information."""
def __init__(
self,
id: int,
document_id: int,
content: str,
embedding: Optional[Union[List[float], np.ndarray]] = None,
tags: Optional[List[str]] = None,
topic: Optional[str] = None,
):
"""Initialize a Chunk instance.
Args:
id: Unique identifier for the chunk.
document_id: ID of the document this chunk belongs to.
content: Text content of the chunk.
embedding: Vector representation of the chunk content.
tags: List of tags associated with the chunk.
topic: Topic classification for the chunk.
"""
self.id = id
self.document_id = document_id
self.content = content
self.embedding = embedding.squeeze() if embedding is not None else None
self.tags = tags
self.topic = topic
class Note:
"""Represents a note with its content and metadata."""
def __init__(
self,
noteId: int = None,
content: str = "",
createTime: str = "",
memoryType: str = "",
embedding: Optional[Union[List[float], np.ndarray]] = None,
chunks: List[Chunk] = None,
title: str = "",
summary: str = "",
insight: str = "",
tags: List[str] = None,
topic: str = None,
):
"""Initialize a Note instance.
Args:
noteId: Unique identifier for the note.
content: Text content of the note.
createTime: Creation timestamp in string format.
memoryType: Type of the memory (TEXT, MARKDOWN, etc.).
embedding: Vector representation of the note content.
chunks: List of chunks the note is divided into.
title: Title of the note.
summary: Summary of the note content.
insight: Insights extracted from the note.
tags: List of tags associated with the note.
topic: Topic classification for the note.
"""
self.id = noteId
self.content = content
self.create_time = createTime
self.memory_type = memoryType
self.embedding = embedding.squeeze() if embedding is not None else None
self.chunks = chunks or []
self.title = title
self.summary = summary
self.insight = insight
self.tags = tags
self.topic = topic
def __str__(self) -> str:
"""Return a string representation of the note.
Returns:
Formatted string with note metadata and content.
"""
note_statement = "---\n"
if self.id:
note_statement += f"[ID]: {self.id}\n"
if self.title:
note_statement += f"[Title]: {self.title}\n"
if self.create_time:
note_statement += f"[Date]: {self.create_time}\n"
if self.memory_type:
note_statement += f"[Type]: {self.memory_type}\n"
note_statement += "---\n\n"
if self.summary:
note_statement += f"----- Doc Summary -----\n{self.summary}\n\n"
if self.insight:
note_statement += f"----- Doc Insight -----\n{self.insight}\n\n"
if not (self.insight or self.summary):
note_statement += f"----- Doc Content -----\n{self.content[:4000]}\n\n"
return note_statement
def to_json(self) -> Dict[str, Any]:
"""Convert the note to a JSON-serializable dictionary.
Returns:
Dictionary representation of the note.
"""
if hasattr(self, "processed"):
return {
"id": self.id,
"insight": self.insight,
"summary": self.summary,
"memory_type": self.memory_type,
"create_time": self.create_time,
"title": self.title,
"content": self.content,
"processed": self.processed,
}
else:
return {
"id": self.id,
"insight": self.insight,
"summary": self.summary,
"memory_type": self.memory_type,
"create_time": self.create_time,
"title": self.title,
"content": self.content,
}
def to_str(self, analysis_type: AnalysisType = None) -> str:
"""Convert the note to a string based on analysis type.
Args:
analysis_type: Type of analysis to determine format.
Returns:
Formatted string representation of the note.
Raises:
ValueError: If memory_type or analysis_type is invalid.
"""
if not analysis_type:
if self.memory_type in SUBJECT_NOTE_TYPE:
analysis_type = AnalysisType.SUBJECT
elif self.memory_type in OBJECT_NOTE_TYPE:
analysis_type = AnalysisType.OBJECT
else:
raise ValueError(f"Invalid memory type: {self.memory_type}")
if analysis_type == AnalysisType.SUBJECT:
return self.to_subject_str()
elif analysis_type == AnalysisType.OBJECT:
return self.to_object_str()
else:
raise ValueError(f"Invalid analysis type: {analysis_type}")
def to_subject_str(self) -> str:
"""Convert the note to a string formatted as subject.
Returns:
Formatted string for subject analysis.
"""
note_statement = "---\n"
if self.id:
note_statement += f"[ID]: {self.id}\n"
if self.title:
note_statement += f"[Title]: {self.title}\n"
if self.create_time:
note_statement += f"[Date]: {self.create_time}\n"
if self.memory_type:
note_statement += f"[Type]: {self.memory_type}\n"
note_statement += "---\n\n"
if self.summary:
note_statement += f"----- Doc Summary -----\n{self.summary}\n\n"
if self.insight:
note_statement += f"----- Doc Insight -----\n{self.insight}\n\n"
if not (self.insight or self.summary):
note_statement += f"----- Doc Content -----\n{self.content[:4000]}\n\n"
return note_statement
def to_object_str(self) -> str:
"""Convert the note to a string formatted as object.
Returns:
Formatted string for object analysis.
"""
note_statement = "---\n"
if self.id:
note_statement += f"[ID]: {self.id}\n"
if self.title:
note_statement += f"[Title]: {self.title}\n"
if self.create_time:
note_statement += f"[Read Time]: {self.create_time}\n"
if self.memory_type:
note_statement += f"[Meta Type]: {self.memory_type}\n"
note_statement += "---\n\n"
if self.summary:
note_statement += f"----- Doc Summary -----\n{self.summary}\n\n"
if not self.summary and self.insight:
note_statement += f"----- Doc Insight -----\n{self.insight}\n\n"
if not (self.insight or self.summary):
note_statement += f"----- Doc Content -----\n{self.content[:4000]}\n\n"
return note_statement
class Memory:
def __init__(self, memoryId: int, embedding: List[float] = None):
self.memory_id = memoryId
if embedding is not None:
self.embedding = np.array(embedding).squeeze()
else:
self.embedding = None
def to_json(self):
return {"memoryId": self.memory_id}
class Cluster:
def __init__(
self,
clusterId: int,
memoryList: List[Optional[Union[Dict, Memory]]] = [],
centerEmbedding: List[float] = None,
is_new=False,
):
self.cluster_id = clusterId
memory_list = [
memory if isinstance(memory, Memory) else Memory(**memory)
for memory in memoryList
]
self.memory_list = memory_list
self.is_new = is_new
self.size = len(memory_list)
self.cluster_center = (
np.array(centerEmbedding)
if centerEmbedding
else np.zeros(DEFAULT_EMBEDDING_DIM)
)
self.merge_list = []
def add_memory(self, memory: Memory):
self.memory_list.append(memory)
self.size += 1
self.get_cluster_center()
def extend_memory_list(self, memory_list: List[Memory]):
self.memory_list.extend(memory_list)
self.size += len(memory_list)
self.get_cluster_center()
def get_cluster_center(self):
if not self.memory_list:
self.cluster_center = np.zeros(DEFAULT_EMBEDDING_DIM)
else:
self.cluster_center = np.mean(
[memory.embedding for memory in self.memory_list], axis=0
)
def prune_outliers_from_cluster(self):
if not self.memory_list:
self.get_cluster_center()
memory_list = sorted(
self.memory_list,
key=lambda x: np.linalg.norm(x.embedding - self.cluster_center),
)
memory_list = memory_list[: max(int(self.size * DISTANCE_RATE), 1)]
self.memory_list = memory_list
self.size = len(memory_list)
self.get_cluster_center()
def to_json(self):
return {
"clusterId": self.cluster_id if not self.is_new else None,
"memoryList": [memory.to_json() for memory in self.memory_list],
"centerEmbedding": self.cluster_center.tolist(),
"mergeList": self.merge_list,
}
class ShadeTimeline:
def __init__(
self,
refMemoryId: int = None,
createTime: str = "",
descSecondView: str = "",
descThirdView: str = "",
is_new: bool = False,
):
self.create_time = createTime
self.ref_memory_id = refMemoryId
self.desc_second_view = descSecondView
self.desc_third_view = descThirdView
self.is_new = is_new
@classmethod
def from_raw_format(cls, raw_format: Dict[str, Any]):
return cls(
refMemoryId=raw_format.get("refMemoryId", None),
createTime=raw_format.get("createTime", ""),
descSecondView="",
descThirdView=raw_format.get("description", ""),
is_new=True,
)
def add_second_view(self, description):
self.desc_second_view = description
def to_json(self):
return {
"createTime": self.create_time,
"refMemoryId": self.ref_memory_id,
"descThirdView": self.desc_third_view,
"descSecondView": self.desc_second_view,
}
class ConfidenceLevel(str, Enum):
VERY_LOW = "VERY LOW"
LOW = "LOW"
MEDIUM = "MEDIUM"
HIGH = "HIGH"
VERY_HIGH = "VERY HIGH"
CONFIDENCE_LEVELS_INT = {
ConfidenceLevel.VERY_LOW: 1,
ConfidenceLevel.LOW: 2,
ConfidenceLevel.MEDIUM: 3,
ConfidenceLevel.HIGH: 4,
ConfidenceLevel.VERY_HIGH: 5,
}
class ShadeInfo:
def __init__(
self,
id: int = None,
name: str = "",
aspect: str = "",
icon: str = "",
descThirdView: str = "",
contentThirdView: str = "",
descSecondView: str = "",
contentSecondView: str = "",
timelines: List[Dict[str, Any]] = [],
confidenceLevel: str = None,
):
self.id = id
self.name = name
self.aspect = aspect
self.icon = icon
self.desc_second_view = descSecondView
self.desc_third_view = descThirdView
self.content_third_view = contentThirdView
self.content_second_view = contentSecondView
if confidenceLevel:
self.confidence_level = ConfidenceLevel(confidenceLevel)
else:
self.confidence_level = None
self.timelines = [ShadeTimeline(**timeline) for timeline in timelines]
def imporve_shade_info(
self,
improveDesc: str,
improveContent: str,
improveTimelines: List[Dict[str, Any]],
):
self.desc_third_view = improveDesc
self.content_third_view = improveContent
self.timelines.extend(
[ShadeTimeline.from_raw_format(timeline) for timeline in improveTimelines]
)
def add_second_view(
self,
domainDesc: str,
domainContent: str,
domainTimeline: List[Dict[str, Any]],
*args,
**kwargs,
):
self.desc_second_view = domainDesc
self.content_second_view = domainContent
timelime_dict = {
timelime.ref_memory_id: timelime for timelime in self.timelines
}
for timeline in domainTimeline:
ref_memory_id = timeline.get("refMemoryId", None)
if not (ref_memory_id and ref_memory_id in timelime_dict):
logging.error(
f"Timeline with refMemoryId {ref_memory_id} already exists, skipping"
)
continue
timelime_dict[ref_memory_id].add_second_view(
timeline.get("description", "")
)
def _preview_(self, second_view: bool = False):
if second_view:
return f"- **{self.name}**: {self.desc_second_view}"
return f"- **{self.name}**: {self.desc_third_view}"
def to_str(self):
shade_statement = f"---\n**[Name]**: {self.name}\n**[Aspect]**: {self.aspect}\n**[Icon]**: {self.icon}\n"
shade_statement += f"**[Description]**: \n{self.desc_third_view}\n\n**[Content]**: \n{self.content_third_view}\n"
shade_statement += "---\n\n[Timelines]:\n"
for timeline in self.timelines:
shade_statement += f"- {timeline.create_time}, {timeline.desc_third_view}, {timeline.ref_memory_id}\n"
return shade_statement
def to_json(self):
return {
"id": self.id,
"name": self.name,
"aspect": self.aspect,
"icon": self.icon,
"descSecondView": self.desc_second_view,
"descThirdView": self.desc_third_view,
"contentThirdView": self.content_third_view,
"contentSecondView": self.content_second_view,
"confidenceLevel": self.confidence_level if self.confidence_level else None,
"timelines": [timeline.to_json() for timeline in self.timelines],
}
class AttributeInfo:
def __init__(
self,
id: int = None,
name: str = "",
description: str = "",
confidenceLevel: Optional[Union[str, ConfidenceLevel]] = None,
):
self.id = id
self.name = name
self.description = description
if confidenceLevel and isinstance(confidenceLevel, str):
self.confidence_level = ConfidenceLevel(confidenceLevel)
elif isinstance(confidenceLevel, ConfidenceLevel):
self.confidence_level = confidenceLevel
else:
self.confidence_level = None
def to_str(self):
# - **[Attribute Name]**: (Attribute Description), Confidence level: [LOW/MEDIUM/HIGH]
return f"- **{self.name}**: {self.description}, Confidence level: {self.confidence_level.value}"
def to_json(self):
return {
"id": self.id,
"name": self.name,
"description": self.description,
"confidenceLevel": self.confidence_level.value
if self.confidence_level
else None,
}
class Bio:
def __init__(
self,
contentThirdView: str = "",
content: str = "",
summaryThirdView: str = "",
summary: str = "",
attributeList: List[Dict[str, Any]] = [],
shadesList: List[Dict[str, Any]] = [],
):
self.content_third_view = contentThirdView
self.content_second_view = content
self.summary_third_view = summaryThirdView
self.summary_second_view = summary
self.attribute_list = sorted(
[AttributeInfo(**attribute) for attribute in attributeList],
key=lambda x: CONFIDENCE_LEVELS_INT[x.confidence_level],
reverse=True,
)
self.shades_list = sorted(
[ShadeInfo(**shade) for shade in shadesList],
key=lambda x: len(x.timelines),
reverse=True,
)
def to_str(self) -> str:
global_bio_statement = ""
if self.is_raw_bio():
global_bio_statement += (
f"**[Origin Analysis]**\n{self.summary_third_view}\n"
)
# global_bio_statement += f"**[Identity Attributes]**\n"
# global_bio_statement += '\n'.join([attribute.to_str() for attribute in self.attribute_list])
global_bio_statement += f"\n**[Current Shades]**\n"
for shade in self.shades_list:
global_bio_statement += shade.to_str()
global_bio_statement += "\n==============\n"
return global_bio_statement
def complete_content(self, second_view: bool = False) -> str:
interests_preference_field = (
"\n### User's Interests and Preferences ###\n"
+ "\n".join([shade._preview_(second_view) for shade in self.shades_list])
)
if not second_view:
conclusion_field = "\n### Conclusion ###\n" + self.summary_third_view
else:
conclusion_field = "\n### Conclusion ###\n" + self.summary_second_view
return f"""## Comprehensive Analysis Report ##
{interests_preference_field}
{conclusion_field}"""
def is_raw_bio(self) -> bool:
if not self.content_third_view and not self.summary_third_view:
return True
return False
def to_json(self) -> Dict[str, Any]:
return {
"contentThirdView": self.content_third_view,
"content": self.content_second_view,
"summaryThirdView": self.summary_third_view,
"summary": self.summary_second_view,
"shadesList": [shade.to_json() for shade in self.shades_list],
}
class ShadeMergeInfo:
def __init__(
self,
id: int = None,
name: str = "",
aspect: str = "",
icon: str = "",
desc_third_view: str = "",
content_third_view: str = "",
desc_second_view: str = "",
content_second_view: str = "",
cluster_info: Optional[Dict[str, Any]] = None,
):
self.id = id
self.name = name
self.aspect = aspect
self.icon = icon
self.desc_second_view = desc_second_view
self.desc_third_view = desc_third_view
self.content_third_view = content_third_view
self.content_second_view = content_second_view
self.cluster_info = cluster_info
def improve_shade_info(self, improveDesc: str, improveContent: str):
self.desc_third_view = improveDesc
self.content_third_view = improveContent
def add_second_view(self, domainDesc: str, domainContent: str):
self.desc_second_view = domainDesc
self.content_second_view = domainContent
def _preview_(self, second_view: bool = False):
if second_view:
return f"- **{self.name}**: {self.desc_second_view}"
return f"- **{self.name}**: {self.desc_third_view}"
def to_str(self):
shade_statement = f"---\n**[Name]**: {self.name}\n**[Aspect]**: {self.aspect}\n**[Icon]**: {self.icon}\n"
shade_statement += f"**[Description]**: \n{self.desc_third_view}\n\n**[Content]**: \n{self.content_third_view}\n"
shade_statement += "---\n\n"
if self.cluster_info:
shade_statement += (
f"**[Cluster Info]**: \n{json.dumps(self.cluster_info, indent=2)}\n"
)
return shade_statement
def to_json(self):
return {
"id": self.id,
"name": self.name,
"aspect": self.aspect,
"icon": self.icon,
"descSecondView": self.desc_second_view,
"descThirdView": self.desc_third_view,
"contentThirdView": self.content_third_view,
"contentSecondView": self.content_second_view,
"clusterInfo": self.cluster_info,
}
class ShadeMergeResponse:
def __init__(self, result: Any, success: bool):
self.success: bool = success
self.message: str = ""
self.merge_shade_list: Optional[List[Dict[str, Any]]] = None
if not success:
self.message = result if isinstance(result, str) else "Error occurred"
logging.error(self.message)
else:
self.message = "Success"
self.merge_shade_list = result.get("mergeShadeList")
def to_json(self) -> dict:
return {
"success": self.success,
"message": self.message,
"mergeShadeList": self.merge_shade_list,
}
class Todo:
def __init__(
self,
todoId: int = 0,
content: str = "",
deadlineTime: str = "",
createTime: str = "",
status: str = "Done",
) -> None:
self.todo_id = todoId
self.content = content
self.deadline_time = deadlineTime
self.create_time = createTime
self.status = status
def __str__(self):
todo_statement = "---\n"
todo_statement += f"[Action] User have a Plan\n"
if self.content:
todo_statement += f"[Content]: {self.content}\n"
if self.create_time:
todo_statement += f"[Create Time]: {self.create_time}\n"
if self.deadline_time:
todo_statement += f"[Deadline Time]: {self.deadline_time}\n"
if self.status:
todo_statement += f"[Status]: {self.status}\n"
return todo_statement
class Chat:
def __init__(
self,
sessionId: str = "",
summary: str = "",
title: str = "",
createTime: str = "",
) -> None:
self.session_id = sessionId
self.summary = summary
self.title = title
self.create_time = createTime
def __str__(self):
chat_statement = "---\n"
chat_statement += f"[Action] User had a chat\n"
if self.create_time:
chat_statement += f"[Create Time]: {self.create_time}\n"
if self.title:
chat_statement += f"[Title]: {self.title}\n"
if self.summary:
chat_statement += f"{self.summary}\n"
return chat_statement
class UserInfo:
def __init__(
self, cur_time: str, notes: List[Note], todos: List[Todo], chats: List[Chat]
):
self.notes = notes
self.todos = todos
self.chats = chats
self.cur_time = cur_time
self.recent_tag = {k: v["default"] for k, v in TAG_TYPE.items()}
self.memories = sorted(
notes + todos + chats,
key=lambda x: datetime2timestamp(x.create_time),
reverse=True,
)
self.recent_memories = self.get_range_memories(TimeType.RECENT)
self.earlier_memories = self.get_range_memories(TimeType.EARLIER)[
len(self.recent_memories) :
]
def __str__(self):
user_memories_statement = "### {recent_type} Memory ###\n".format(
recent_type=self.recent_tag[TimeType.RECENT]
)
user_memories_statement += "".join(
[str(memory) for memory in self.recent_memories]
)
user_memories_statement += "\n\n### Earlier Memory ###\n"
user_memories_statement += "".join(
[str(memory) for memory in self.earlier_memories]
)
return user_memories_statement
def get_range_memories(self, time_type: TimeType) -> List[Union[Note, Todo, Chat]]:
if len(self.memories) < MIN_MEMORIES_N[time_type]:
return self.memories
recent_memories = []
cur_datetime = datetime.fromtimestamp(datetime2timestamp(self.cur_time))
end_datetime = cur_datetime + timedelta(days=1)
end_timestamp = end_datetime.replace(hour=0, minute=0, second=0).timestamp()
for memory in self.memories:
if (
end_timestamp - datetime2timestamp(memory.create_time)
< TIME_RANGE[time_type]
):
recent_memories.append(memory)
else:
break
if len(recent_memories) >= MIN_MEMORIES_N[time_type]:
self.recent_tag[time_type] = TAG_TYPE[time_type].get(
"time", TAG_TYPE[time_type].get("default")
)
return recent_memories
return self.memories[: MIN_MEMORIES_N[time_type]]