Spaces:
Sleeping
Sleeping
| from datetime import datetime, timedelta | |
| from enum import Enum | |
| from typing import Any, Dict, List, Optional, Union | |
| import json | |
| import logging | |
| import numpy as np | |
| DEFAULT_EMBEDDING_DIM = 1536 | |
| DISTANCE_RATE = 0.8 | |
| class TimeType(str, Enum): | |
| RECENT = "recent" | |
| EARLIER = "earlier" | |
| MIN_MEMORIES_N = {TimeType.RECENT: 3, TimeType.EARLIER: 10} | |
| TIME_RANGE = {TimeType.RECENT: 60 * 60 * 24 * 1, TimeType.EARLIER: 60 * 60 * 24 * 7} | |
| class MemoryType(str, Enum): | |
| TEXT = "TEXT" | |
| MARKDOWN = "MARKDOWN" | |
| PDF = "PDF" | |
| LINK = "LINK" | |
| class AnalysisType(str, Enum): | |
| SUBJECT = "SUBJECT" | |
| OBJECT = "OBJECT" | |
| CHAT = "CHAT" | |
| def datetime2timestamp(time_str: str) -> float: | |
| """Convert datetime string to timestamp. | |
| Args: | |
| time_str: String representation of datetime in TIME_FORMAT format. | |
| Returns: | |
| Timestamp in seconds. | |
| Raises: | |
| Exception: If time_str has invalid format. | |
| """ | |
| try: | |
| timestamp = datetime.strptime(time_str, TIME_FORMAT).timestamp() | |
| return timestamp | |
| except Exception as e: | |
| logging.error(f"Invalid time format: {time_str}") | |
| raise e | |
| OBJECT_NOTE_TYPE = [MemoryType.LINK] | |
| SUBJECT_NOTE_TYPE = [ | |
| MemoryType.TEXT, | |
| MemoryType.MARKDOWN, | |
| MemoryType.PDF, | |
| ] | |
| TIME_FORMAT = "%Y-%m-%d %H:%M:%S" | |
| TAG_TYPE = { | |
| TimeType.RECENT: {"time": "Today", "default": "Recent"}, | |
| TimeType.EARLIER: {"time": "Earlier", "default": "Earlier"}, | |
| } | |
| class Chunk: | |
| """Represents a chunk of document content with embedding information.""" | |
| def __init__( | |
| self, | |
| id: int, | |
| document_id: int, | |
| content: str, | |
| embedding: Optional[Union[List[float], np.ndarray]] = None, | |
| tags: Optional[List[str]] = None, | |
| topic: Optional[str] = None, | |
| ): | |
| """Initialize a Chunk instance. | |
| Args: | |
| id: Unique identifier for the chunk. | |
| document_id: ID of the document this chunk belongs to. | |
| content: Text content of the chunk. | |
| embedding: Vector representation of the chunk content. | |
| tags: List of tags associated with the chunk. | |
| topic: Topic classification for the chunk. | |
| """ | |
| self.id = id | |
| self.document_id = document_id | |
| self.content = content | |
| self.embedding = embedding.squeeze() if embedding is not None else None | |
| self.tags = tags | |
| self.topic = topic | |
| class Note: | |
| """Represents a note with its content and metadata.""" | |
| def __init__( | |
| self, | |
| noteId: int = None, | |
| content: str = "", | |
| createTime: str = "", | |
| memoryType: str = "", | |
| embedding: Optional[Union[List[float], np.ndarray]] = None, | |
| chunks: List[Chunk] = None, | |
| title: str = "", | |
| summary: str = "", | |
| insight: str = "", | |
| tags: List[str] = None, | |
| topic: str = None, | |
| ): | |
| """Initialize a Note instance. | |
| Args: | |
| noteId: Unique identifier for the note. | |
| content: Text content of the note. | |
| createTime: Creation timestamp in string format. | |
| memoryType: Type of the memory (TEXT, MARKDOWN, etc.). | |
| embedding: Vector representation of the note content. | |
| chunks: List of chunks the note is divided into. | |
| title: Title of the note. | |
| summary: Summary of the note content. | |
| insight: Insights extracted from the note. | |
| tags: List of tags associated with the note. | |
| topic: Topic classification for the note. | |
| """ | |
| self.id = noteId | |
| self.content = content | |
| self.create_time = createTime | |
| self.memory_type = memoryType | |
| self.embedding = embedding.squeeze() if embedding is not None else None | |
| self.chunks = chunks or [] | |
| self.title = title | |
| self.summary = summary | |
| self.insight = insight | |
| self.tags = tags | |
| self.topic = topic | |
| def __str__(self) -> str: | |
| """Return a string representation of the note. | |
| Returns: | |
| Formatted string with note metadata and content. | |
| """ | |
| note_statement = "---\n" | |
| if self.id: | |
| note_statement += f"[ID]: {self.id}\n" | |
| if self.title: | |
| note_statement += f"[Title]: {self.title}\n" | |
| if self.create_time: | |
| note_statement += f"[Date]: {self.create_time}\n" | |
| if self.memory_type: | |
| note_statement += f"[Type]: {self.memory_type}\n" | |
| note_statement += "---\n\n" | |
| if self.summary: | |
| note_statement += f"----- Doc Summary -----\n{self.summary}\n\n" | |
| if self.insight: | |
| note_statement += f"----- Doc Insight -----\n{self.insight}\n\n" | |
| if not (self.insight or self.summary): | |
| note_statement += f"----- Doc Content -----\n{self.content[:4000]}\n\n" | |
| return note_statement | |
| def to_json(self) -> Dict[str, Any]: | |
| """Convert the note to a JSON-serializable dictionary. | |
| Returns: | |
| Dictionary representation of the note. | |
| """ | |
| if hasattr(self, "processed"): | |
| return { | |
| "id": self.id, | |
| "insight": self.insight, | |
| "summary": self.summary, | |
| "memory_type": self.memory_type, | |
| "create_time": self.create_time, | |
| "title": self.title, | |
| "content": self.content, | |
| "processed": self.processed, | |
| } | |
| else: | |
| return { | |
| "id": self.id, | |
| "insight": self.insight, | |
| "summary": self.summary, | |
| "memory_type": self.memory_type, | |
| "create_time": self.create_time, | |
| "title": self.title, | |
| "content": self.content, | |
| } | |
| def to_str(self, analysis_type: AnalysisType = None) -> str: | |
| """Convert the note to a string based on analysis type. | |
| Args: | |
| analysis_type: Type of analysis to determine format. | |
| Returns: | |
| Formatted string representation of the note. | |
| Raises: | |
| ValueError: If memory_type or analysis_type is invalid. | |
| """ | |
| if not analysis_type: | |
| if self.memory_type in SUBJECT_NOTE_TYPE: | |
| analysis_type = AnalysisType.SUBJECT | |
| elif self.memory_type in OBJECT_NOTE_TYPE: | |
| analysis_type = AnalysisType.OBJECT | |
| else: | |
| raise ValueError(f"Invalid memory type: {self.memory_type}") | |
| if analysis_type == AnalysisType.SUBJECT: | |
| return self.to_subject_str() | |
| elif analysis_type == AnalysisType.OBJECT: | |
| return self.to_object_str() | |
| else: | |
| raise ValueError(f"Invalid analysis type: {analysis_type}") | |
| def to_subject_str(self) -> str: | |
| """Convert the note to a string formatted as subject. | |
| Returns: | |
| Formatted string for subject analysis. | |
| """ | |
| note_statement = "---\n" | |
| if self.id: | |
| note_statement += f"[ID]: {self.id}\n" | |
| if self.title: | |
| note_statement += f"[Title]: {self.title}\n" | |
| if self.create_time: | |
| note_statement += f"[Date]: {self.create_time}\n" | |
| if self.memory_type: | |
| note_statement += f"[Type]: {self.memory_type}\n" | |
| note_statement += "---\n\n" | |
| if self.summary: | |
| note_statement += f"----- Doc Summary -----\n{self.summary}\n\n" | |
| if self.insight: | |
| note_statement += f"----- Doc Insight -----\n{self.insight}\n\n" | |
| if not (self.insight or self.summary): | |
| note_statement += f"----- Doc Content -----\n{self.content[:4000]}\n\n" | |
| return note_statement | |
| def to_object_str(self) -> str: | |
| """Convert the note to a string formatted as object. | |
| Returns: | |
| Formatted string for object analysis. | |
| """ | |
| note_statement = "---\n" | |
| if self.id: | |
| note_statement += f"[ID]: {self.id}\n" | |
| if self.title: | |
| note_statement += f"[Title]: {self.title}\n" | |
| if self.create_time: | |
| note_statement += f"[Read Time]: {self.create_time}\n" | |
| if self.memory_type: | |
| note_statement += f"[Meta Type]: {self.memory_type}\n" | |
| note_statement += "---\n\n" | |
| if self.summary: | |
| note_statement += f"----- Doc Summary -----\n{self.summary}\n\n" | |
| if not self.summary and self.insight: | |
| note_statement += f"----- Doc Insight -----\n{self.insight}\n\n" | |
| if not (self.insight or self.summary): | |
| note_statement += f"----- Doc Content -----\n{self.content[:4000]}\n\n" | |
| return note_statement | |
| class Memory: | |
| def __init__(self, memoryId: int, embedding: List[float] = None): | |
| self.memory_id = memoryId | |
| if embedding is not None: | |
| self.embedding = np.array(embedding).squeeze() | |
| else: | |
| self.embedding = None | |
| def to_json(self): | |
| return {"memoryId": self.memory_id} | |
| class Cluster: | |
| def __init__( | |
| self, | |
| clusterId: int, | |
| memoryList: List[Optional[Union[Dict, Memory]]] = [], | |
| centerEmbedding: List[float] = None, | |
| is_new=False, | |
| ): | |
| self.cluster_id = clusterId | |
| memory_list = [ | |
| memory if isinstance(memory, Memory) else Memory(**memory) | |
| for memory in memoryList | |
| ] | |
| self.memory_list = memory_list | |
| self.is_new = is_new | |
| self.size = len(memory_list) | |
| self.cluster_center = ( | |
| np.array(centerEmbedding) | |
| if centerEmbedding | |
| else np.zeros(DEFAULT_EMBEDDING_DIM) | |
| ) | |
| self.merge_list = [] | |
| def add_memory(self, memory: Memory): | |
| self.memory_list.append(memory) | |
| self.size += 1 | |
| self.get_cluster_center() | |
| def extend_memory_list(self, memory_list: List[Memory]): | |
| self.memory_list.extend(memory_list) | |
| self.size += len(memory_list) | |
| self.get_cluster_center() | |
| def get_cluster_center(self): | |
| if not self.memory_list: | |
| self.cluster_center = np.zeros(DEFAULT_EMBEDDING_DIM) | |
| else: | |
| self.cluster_center = np.mean( | |
| [memory.embedding for memory in self.memory_list], axis=0 | |
| ) | |
| def prune_outliers_from_cluster(self): | |
| if not self.memory_list: | |
| self.get_cluster_center() | |
| memory_list = sorted( | |
| self.memory_list, | |
| key=lambda x: np.linalg.norm(x.embedding - self.cluster_center), | |
| ) | |
| memory_list = memory_list[: max(int(self.size * DISTANCE_RATE), 1)] | |
| self.memory_list = memory_list | |
| self.size = len(memory_list) | |
| self.get_cluster_center() | |
| def to_json(self): | |
| return { | |
| "clusterId": self.cluster_id if not self.is_new else None, | |
| "memoryList": [memory.to_json() for memory in self.memory_list], | |
| "centerEmbedding": self.cluster_center.tolist(), | |
| "mergeList": self.merge_list, | |
| } | |
| class ShadeTimeline: | |
| def __init__( | |
| self, | |
| refMemoryId: int = None, | |
| createTime: str = "", | |
| descSecondView: str = "", | |
| descThirdView: str = "", | |
| is_new: bool = False, | |
| ): | |
| self.create_time = createTime | |
| self.ref_memory_id = refMemoryId | |
| self.desc_second_view = descSecondView | |
| self.desc_third_view = descThirdView | |
| self.is_new = is_new | |
| def from_raw_format(cls, raw_format: Dict[str, Any]): | |
| return cls( | |
| refMemoryId=raw_format.get("refMemoryId", None), | |
| createTime=raw_format.get("createTime", ""), | |
| descSecondView="", | |
| descThirdView=raw_format.get("description", ""), | |
| is_new=True, | |
| ) | |
| def add_second_view(self, description): | |
| self.desc_second_view = description | |
| def to_json(self): | |
| return { | |
| "createTime": self.create_time, | |
| "refMemoryId": self.ref_memory_id, | |
| "descThirdView": self.desc_third_view, | |
| "descSecondView": self.desc_second_view, | |
| } | |
| class ConfidenceLevel(str, Enum): | |
| VERY_LOW = "VERY LOW" | |
| LOW = "LOW" | |
| MEDIUM = "MEDIUM" | |
| HIGH = "HIGH" | |
| VERY_HIGH = "VERY HIGH" | |
| CONFIDENCE_LEVELS_INT = { | |
| ConfidenceLevel.VERY_LOW: 1, | |
| ConfidenceLevel.LOW: 2, | |
| ConfidenceLevel.MEDIUM: 3, | |
| ConfidenceLevel.HIGH: 4, | |
| ConfidenceLevel.VERY_HIGH: 5, | |
| } | |
| class ShadeInfo: | |
| def __init__( | |
| self, | |
| id: int = None, | |
| name: str = "", | |
| aspect: str = "", | |
| icon: str = "", | |
| descThirdView: str = "", | |
| contentThirdView: str = "", | |
| descSecondView: str = "", | |
| contentSecondView: str = "", | |
| timelines: List[Dict[str, Any]] = [], | |
| confidenceLevel: str = None, | |
| ): | |
| self.id = id | |
| self.name = name | |
| self.aspect = aspect | |
| self.icon = icon | |
| self.desc_second_view = descSecondView | |
| self.desc_third_view = descThirdView | |
| self.content_third_view = contentThirdView | |
| self.content_second_view = contentSecondView | |
| if confidenceLevel: | |
| self.confidence_level = ConfidenceLevel(confidenceLevel) | |
| else: | |
| self.confidence_level = None | |
| self.timelines = [ShadeTimeline(**timeline) for timeline in timelines] | |
| def imporve_shade_info( | |
| self, | |
| improveDesc: str, | |
| improveContent: str, | |
| improveTimelines: List[Dict[str, Any]], | |
| ): | |
| self.desc_third_view = improveDesc | |
| self.content_third_view = improveContent | |
| self.timelines.extend( | |
| [ShadeTimeline.from_raw_format(timeline) for timeline in improveTimelines] | |
| ) | |
| def add_second_view( | |
| self, | |
| domainDesc: str, | |
| domainContent: str, | |
| domainTimeline: List[Dict[str, Any]], | |
| *args, | |
| **kwargs, | |
| ): | |
| self.desc_second_view = domainDesc | |
| self.content_second_view = domainContent | |
| timelime_dict = { | |
| timelime.ref_memory_id: timelime for timelime in self.timelines | |
| } | |
| for timeline in domainTimeline: | |
| ref_memory_id = timeline.get("refMemoryId", None) | |
| if not (ref_memory_id and ref_memory_id in timelime_dict): | |
| logging.error( | |
| f"Timeline with refMemoryId {ref_memory_id} already exists, skipping" | |
| ) | |
| continue | |
| timelime_dict[ref_memory_id].add_second_view( | |
| timeline.get("description", "") | |
| ) | |
| def _preview_(self, second_view: bool = False): | |
| if second_view: | |
| return f"- **{self.name}**: {self.desc_second_view}" | |
| return f"- **{self.name}**: {self.desc_third_view}" | |
| def to_str(self): | |
| shade_statement = f"---\n**[Name]**: {self.name}\n**[Aspect]**: {self.aspect}\n**[Icon]**: {self.icon}\n" | |
| shade_statement += f"**[Description]**: \n{self.desc_third_view}\n\n**[Content]**: \n{self.content_third_view}\n" | |
| shade_statement += "---\n\n[Timelines]:\n" | |
| for timeline in self.timelines: | |
| shade_statement += f"- {timeline.create_time}, {timeline.desc_third_view}, {timeline.ref_memory_id}\n" | |
| return shade_statement | |
| def to_json(self): | |
| return { | |
| "id": self.id, | |
| "name": self.name, | |
| "aspect": self.aspect, | |
| "icon": self.icon, | |
| "descSecondView": self.desc_second_view, | |
| "descThirdView": self.desc_third_view, | |
| "contentThirdView": self.content_third_view, | |
| "contentSecondView": self.content_second_view, | |
| "confidenceLevel": self.confidence_level if self.confidence_level else None, | |
| "timelines": [timeline.to_json() for timeline in self.timelines], | |
| } | |
| class AttributeInfo: | |
| def __init__( | |
| self, | |
| id: int = None, | |
| name: str = "", | |
| description: str = "", | |
| confidenceLevel: Optional[Union[str, ConfidenceLevel]] = None, | |
| ): | |
| self.id = id | |
| self.name = name | |
| self.description = description | |
| if confidenceLevel and isinstance(confidenceLevel, str): | |
| self.confidence_level = ConfidenceLevel(confidenceLevel) | |
| elif isinstance(confidenceLevel, ConfidenceLevel): | |
| self.confidence_level = confidenceLevel | |
| else: | |
| self.confidence_level = None | |
| def to_str(self): | |
| # - **[Attribute Name]**: (Attribute Description), Confidence level: [LOW/MEDIUM/HIGH] | |
| return f"- **{self.name}**: {self.description}, Confidence level: {self.confidence_level.value}" | |
| def to_json(self): | |
| return { | |
| "id": self.id, | |
| "name": self.name, | |
| "description": self.description, | |
| "confidenceLevel": self.confidence_level.value | |
| if self.confidence_level | |
| else None, | |
| } | |
| class Bio: | |
| def __init__( | |
| self, | |
| contentThirdView: str = "", | |
| content: str = "", | |
| summaryThirdView: str = "", | |
| summary: str = "", | |
| attributeList: List[Dict[str, Any]] = [], | |
| shadesList: List[Dict[str, Any]] = [], | |
| ): | |
| self.content_third_view = contentThirdView | |
| self.content_second_view = content | |
| self.summary_third_view = summaryThirdView | |
| self.summary_second_view = summary | |
| self.attribute_list = sorted( | |
| [AttributeInfo(**attribute) for attribute in attributeList], | |
| key=lambda x: CONFIDENCE_LEVELS_INT[x.confidence_level], | |
| reverse=True, | |
| ) | |
| self.shades_list = sorted( | |
| [ShadeInfo(**shade) for shade in shadesList], | |
| key=lambda x: len(x.timelines), | |
| reverse=True, | |
| ) | |
| def to_str(self) -> str: | |
| global_bio_statement = "" | |
| if self.is_raw_bio(): | |
| global_bio_statement += ( | |
| f"**[Origin Analysis]**\n{self.summary_third_view}\n" | |
| ) | |
| # global_bio_statement += f"**[Identity Attributes]**\n" | |
| # global_bio_statement += '\n'.join([attribute.to_str() for attribute in self.attribute_list]) | |
| global_bio_statement += f"\n**[Current Shades]**\n" | |
| for shade in self.shades_list: | |
| global_bio_statement += shade.to_str() | |
| global_bio_statement += "\n==============\n" | |
| return global_bio_statement | |
| def complete_content(self, second_view: bool = False) -> str: | |
| interests_preference_field = ( | |
| "\n### User's Interests and Preferences ###\n" | |
| + "\n".join([shade._preview_(second_view) for shade in self.shades_list]) | |
| ) | |
| if not second_view: | |
| conclusion_field = "\n### Conclusion ###\n" + self.summary_third_view | |
| else: | |
| conclusion_field = "\n### Conclusion ###\n" + self.summary_second_view | |
| return f"""## Comprehensive Analysis Report ## | |
| {interests_preference_field} | |
| {conclusion_field}""" | |
| def is_raw_bio(self) -> bool: | |
| if not self.content_third_view and not self.summary_third_view: | |
| return True | |
| return False | |
| def to_json(self) -> Dict[str, Any]: | |
| return { | |
| "contentThirdView": self.content_third_view, | |
| "content": self.content_second_view, | |
| "summaryThirdView": self.summary_third_view, | |
| "summary": self.summary_second_view, | |
| "shadesList": [shade.to_json() for shade in self.shades_list], | |
| } | |
| class ShadeMergeInfo: | |
| def __init__( | |
| self, | |
| id: int = None, | |
| name: str = "", | |
| aspect: str = "", | |
| icon: str = "", | |
| desc_third_view: str = "", | |
| content_third_view: str = "", | |
| desc_second_view: str = "", | |
| content_second_view: str = "", | |
| cluster_info: Optional[Dict[str, Any]] = None, | |
| ): | |
| self.id = id | |
| self.name = name | |
| self.aspect = aspect | |
| self.icon = icon | |
| self.desc_second_view = desc_second_view | |
| self.desc_third_view = desc_third_view | |
| self.content_third_view = content_third_view | |
| self.content_second_view = content_second_view | |
| self.cluster_info = cluster_info | |
| def improve_shade_info(self, improveDesc: str, improveContent: str): | |
| self.desc_third_view = improveDesc | |
| self.content_third_view = improveContent | |
| def add_second_view(self, domainDesc: str, domainContent: str): | |
| self.desc_second_view = domainDesc | |
| self.content_second_view = domainContent | |
| def _preview_(self, second_view: bool = False): | |
| if second_view: | |
| return f"- **{self.name}**: {self.desc_second_view}" | |
| return f"- **{self.name}**: {self.desc_third_view}" | |
| def to_str(self): | |
| shade_statement = f"---\n**[Name]**: {self.name}\n**[Aspect]**: {self.aspect}\n**[Icon]**: {self.icon}\n" | |
| shade_statement += f"**[Description]**: \n{self.desc_third_view}\n\n**[Content]**: \n{self.content_third_view}\n" | |
| shade_statement += "---\n\n" | |
| if self.cluster_info: | |
| shade_statement += ( | |
| f"**[Cluster Info]**: \n{json.dumps(self.cluster_info, indent=2)}\n" | |
| ) | |
| return shade_statement | |
| def to_json(self): | |
| return { | |
| "id": self.id, | |
| "name": self.name, | |
| "aspect": self.aspect, | |
| "icon": self.icon, | |
| "descSecondView": self.desc_second_view, | |
| "descThirdView": self.desc_third_view, | |
| "contentThirdView": self.content_third_view, | |
| "contentSecondView": self.content_second_view, | |
| "clusterInfo": self.cluster_info, | |
| } | |
| class ShadeMergeResponse: | |
| def __init__(self, result: Any, success: bool): | |
| self.success: bool = success | |
| self.message: str = "" | |
| self.merge_shade_list: Optional[List[Dict[str, Any]]] = None | |
| if not success: | |
| self.message = result if isinstance(result, str) else "Error occurred" | |
| logging.error(self.message) | |
| else: | |
| self.message = "Success" | |
| self.merge_shade_list = result.get("mergeShadeList") | |
| def to_json(self) -> dict: | |
| return { | |
| "success": self.success, | |
| "message": self.message, | |
| "mergeShadeList": self.merge_shade_list, | |
| } | |
| class Todo: | |
| def __init__( | |
| self, | |
| todoId: int = 0, | |
| content: str = "", | |
| deadlineTime: str = "", | |
| createTime: str = "", | |
| status: str = "Done", | |
| ) -> None: | |
| self.todo_id = todoId | |
| self.content = content | |
| self.deadline_time = deadlineTime | |
| self.create_time = createTime | |
| self.status = status | |
| def __str__(self): | |
| todo_statement = "---\n" | |
| todo_statement += f"[Action] User have a Plan\n" | |
| if self.content: | |
| todo_statement += f"[Content]: {self.content}\n" | |
| if self.create_time: | |
| todo_statement += f"[Create Time]: {self.create_time}\n" | |
| if self.deadline_time: | |
| todo_statement += f"[Deadline Time]: {self.deadline_time}\n" | |
| if self.status: | |
| todo_statement += f"[Status]: {self.status}\n" | |
| return todo_statement | |
| class Chat: | |
| def __init__( | |
| self, | |
| sessionId: str = "", | |
| summary: str = "", | |
| title: str = "", | |
| createTime: str = "", | |
| ) -> None: | |
| self.session_id = sessionId | |
| self.summary = summary | |
| self.title = title | |
| self.create_time = createTime | |
| def __str__(self): | |
| chat_statement = "---\n" | |
| chat_statement += f"[Action] User had a chat\n" | |
| if self.create_time: | |
| chat_statement += f"[Create Time]: {self.create_time}\n" | |
| if self.title: | |
| chat_statement += f"[Title]: {self.title}\n" | |
| if self.summary: | |
| chat_statement += f"{self.summary}\n" | |
| return chat_statement | |
| class UserInfo: | |
| def __init__( | |
| self, cur_time: str, notes: List[Note], todos: List[Todo], chats: List[Chat] | |
| ): | |
| self.notes = notes | |
| self.todos = todos | |
| self.chats = chats | |
| self.cur_time = cur_time | |
| self.recent_tag = {k: v["default"] for k, v in TAG_TYPE.items()} | |
| self.memories = sorted( | |
| notes + todos + chats, | |
| key=lambda x: datetime2timestamp(x.create_time), | |
| reverse=True, | |
| ) | |
| self.recent_memories = self.get_range_memories(TimeType.RECENT) | |
| self.earlier_memories = self.get_range_memories(TimeType.EARLIER)[ | |
| len(self.recent_memories) : | |
| ] | |
| def __str__(self): | |
| user_memories_statement = "### {recent_type} Memory ###\n".format( | |
| recent_type=self.recent_tag[TimeType.RECENT] | |
| ) | |
| user_memories_statement += "".join( | |
| [str(memory) for memory in self.recent_memories] | |
| ) | |
| user_memories_statement += "\n\n### Earlier Memory ###\n" | |
| user_memories_statement += "".join( | |
| [str(memory) for memory in self.earlier_memories] | |
| ) | |
| return user_memories_statement | |
| def get_range_memories(self, time_type: TimeType) -> List[Union[Note, Todo, Chat]]: | |
| if len(self.memories) < MIN_MEMORIES_N[time_type]: | |
| return self.memories | |
| recent_memories = [] | |
| cur_datetime = datetime.fromtimestamp(datetime2timestamp(self.cur_time)) | |
| end_datetime = cur_datetime + timedelta(days=1) | |
| end_timestamp = end_datetime.replace(hour=0, minute=0, second=0).timestamp() | |
| for memory in self.memories: | |
| if ( | |
| end_timestamp - datetime2timestamp(memory.create_time) | |
| < TIME_RANGE[time_type] | |
| ): | |
| recent_memories.append(memory) | |
| else: | |
| break | |
| if len(recent_memories) >= MIN_MEMORIES_N[time_type]: | |
| self.recent_tag[time_type] = TAG_TYPE[time_type].get( | |
| "time", TAG_TYPE[time_type].get("default") | |
| ) | |
| return recent_memories | |
| return self.memories[: MIN_MEMORIES_N[time_type]] | |