Spaces:
Sleeping
Sleeping
| from copy import deepcopy | |
| from datetime import datetime | |
| from enum import Enum | |
| from typing import Dict, Any, List, Optional | |
| import logging | |
| import os | |
| from openai import OpenAI | |
| from lpm_kernel.L1.bio import ( | |
| Bio, | |
| CONFIDENCE_LEVELS_INT, | |
| Chat, | |
| Cluster, | |
| Memory, | |
| Note, | |
| ShadeInfo, | |
| ShadeMergeInfo, | |
| Todo, | |
| ) | |
| from lpm_kernel.L1.prompt import ( | |
| COMMON_PERSPECTIVE_SHIFT_SYSTEM_PROMPT, | |
| GLOBAL_BIO_SYSTEM_PROMPT, | |
| PREFER_LANGUAGE_SYSTEM_PROMPT, | |
| SHADE_MERGE_DEFAULT_SYSTEM_PROMPT, | |
| ) | |
| from lpm_kernel.L1.shade_generator import ShadeGenerator, ShadeMerger | |
| from lpm_kernel.L1.status_bio_generator import StatusBioGenerator | |
| from lpm_kernel.L1.topics_generator import TopicsGenerator | |
| from lpm_kernel.api.services.user_llm_config_service import UserLLMConfigService | |
| from lpm_kernel.configs.config import Config | |
| from lpm_kernel.configs.logging import get_train_process_logger | |
| logger = get_train_process_logger() | |
| DATE_TIME_FORMAT = "%Y-%m-%d" | |
| class ConfidenceLevel(str, Enum): | |
| VERY_LOW = "VERY LOW" | |
| LOW = "LOW" | |
| MEDIUM = "MEDIUM" | |
| HIGH = "HIGH" | |
| VERY_HIGH = "VERY HIGH" | |
| IMPORTANCE_TO_CONFIDENCE = { | |
| 1: ConfidenceLevel.VERY_LOW, | |
| 2: ConfidenceLevel.LOW, | |
| 3: ConfidenceLevel.MEDIUM, | |
| 4: ConfidenceLevel.HIGH, | |
| 5: ConfidenceLevel.VERY_HIGH, | |
| } | |
| class DailyTimeline: | |
| def __init__(self, id: int, dateTime: str, content: str, noteIds: List[int]): | |
| self.id = id | |
| self.date_time = dateTime | |
| self.content = content.strip() | |
| self.note_ids = noteIds | |
| def _desc_(self) -> str: | |
| """Returns a string representation of the daily timeline. | |
| Returns: | |
| str: Formatted string representation. | |
| """ | |
| return f"- [{self.date_time}] {self.content}".strip() | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Converts the DailyTimeline object to a dictionary. | |
| Returns: | |
| Dict[str, Any]: Dictionary representation of the DailyTimeline. | |
| """ | |
| return { | |
| "id": self.id, | |
| "dateTime": self.date_time, | |
| "content": self.content, | |
| "noteIds": self.note_ids, | |
| } | |
| class MonthlyTimeline: | |
| def __init__( | |
| self, id: int, monthDate: str, title: str, dailyTimelines: List[Dict[str, Any]] | |
| ): | |
| self.id = id | |
| self.month_date = monthDate | |
| self.title = title | |
| daily_timelines = [ | |
| DailyTimeline(**daily_timeline) for daily_timeline in dailyTimelines | |
| ] | |
| self.daily_timelines = sorted( | |
| daily_timelines, | |
| key=lambda x: datetime.strptime(x.date_time, DATE_TIME_FORMAT), | |
| ) | |
| def _desc_(self) -> str: | |
| """Returns a string representation of the monthly timeline. | |
| Returns: | |
| str: Formatted string representation. | |
| """ | |
| return f"** {self.month_date} **\n" + "\n".join( | |
| [daily_timeline._desc_() for daily_timeline in self.daily_timelines] | |
| ) | |
| def _preview_(self, preview_num: int = 0) -> str: | |
| """Generates a preview of the monthly timeline. | |
| Args: | |
| preview_num: Number of daily timelines to include in the preview. | |
| Returns: | |
| str: Preview string of the monthly timeline. | |
| """ | |
| preview_statement = f"[{self.month_date}] {self.title}\n" | |
| for daily_timeline in self.daily_timelines[:preview_num]: | |
| preview_statement += daily_timeline._desc_() + "\n" | |
| return preview_statement | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Converts the MonthlyTimeline object to a dictionary. | |
| Returns: | |
| Dict[str, Any]: Dictionary representation of the MonthlyTimeline. | |
| """ | |
| return { | |
| "id": self.id, | |
| "monthDate": self.month_date, | |
| "title": self.title, | |
| "dailyTimelines": [ | |
| daily_timeline.to_dict() for daily_timeline in self.daily_timelines | |
| ], | |
| } | |
| class EntityWiki: | |
| def __init__(self, wikiText: str, monthlyTimelines: List[Dict[str, Any]]): | |
| self.wiki_text = wikiText | |
| self.monthly_timelines = [ | |
| MonthlyTimeline(**monthly_timeline) for monthly_timeline in monthlyTimelines | |
| ] | |
| self.max_month_idx = ( | |
| max([monthly_timeline.id for monthly_timeline in self.monthly_timelines]) | |
| if self.monthly_timelines | |
| else 0 | |
| ) | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Converts the EntityWiki object to a dictionary. | |
| Returns: | |
| Dict[str, Any]: Dictionary representation of the EntityWiki. | |
| """ | |
| return { | |
| "wikiText": self.wiki_text, | |
| "monthlyTimelines": [ | |
| monthly_timeline.to_dict() | |
| for monthly_timeline in self.monthly_timelines | |
| ], | |
| } | |
| class L1Generator: | |
| def __init__(self): | |
| self.preferred_language = "English" | |
| self.bio_model_params = { | |
| "temperature": 0, | |
| "max_tokens": 2000, | |
| "top_p": 0, | |
| "frequency_penalty": 0, | |
| "seed": 42, | |
| "presence_penalty": 0, | |
| "timeout": 45, | |
| } | |
| self.user_llm_config_service = UserLLMConfigService() | |
| self.user_llm_config = self.user_llm_config_service.get_available_llm() | |
| if self.user_llm_config is None: | |
| self.client = None | |
| self.model_name = None | |
| else: | |
| self.client = OpenAI( | |
| api_key=self.user_llm_config.chat_api_key, | |
| base_url=self.user_llm_config.chat_endpoint, | |
| ) | |
| self.model_name = self.user_llm_config.chat_model_name | |
| self._top_p_adjusted = False # Flag to track if top_p has been adjusted | |
| def _fix_top_p_param(self, error_message: str) -> bool: | |
| """Fixes the top_p parameter if an API error indicates it's invalid. | |
| Some LLM providers don't accept top_p=0 and require values in specific ranges. | |
| This function checks if the error is related to top_p and adjusts it to 0.001, | |
| which is close enough to 0 to maintain deterministic behavior while satisfying | |
| API requirements. | |
| Args: | |
| error_message: Error message from the API response. | |
| Returns: | |
| bool: True if top_p was adjusted, False otherwise. | |
| """ | |
| if not self._top_p_adjusted and "top_p" in error_message.lower(): | |
| logger.warning("Fixing top_p parameter from 0 to 0.001 to comply with model API requirements") | |
| self.bio_model_params["top_p"] = 0.001 | |
| self._top_p_adjusted = True | |
| return True | |
| return False | |
| def _call_llm_with_retry(self, messages: List[Dict[str, str]], **kwargs) -> Any: | |
| """Calls the LLM API with automatic retry for parameter adjustments. | |
| This function handles making API calls to the language model while | |
| implementing automatic parameter fixes when errors occur. If the API | |
| rejects the call due to invalid top_p parameter, it will adjust the | |
| parameter value and retry the call once. | |
| Args: | |
| messages: List of messages for the API call. | |
| **kwargs: Additional parameters to pass to the API call. | |
| Returns: | |
| API response object from the language model. | |
| Raises: | |
| Exception: If the API call fails after all retries or for unrelated errors. | |
| """ | |
| try: | |
| return self.client.chat.completions.create( | |
| model=self.model_name, | |
| messages=messages, | |
| **self.bio_model_params, | |
| **kwargs | |
| ) | |
| except Exception as e: | |
| error_msg = str(e) | |
| logger.error(f"API Error: {error_msg}") | |
| # Try to fix top_p parameter if needed | |
| if hasattr(e, 'response') and hasattr(e.response, 'status_code') and e.response.status_code == 400: | |
| if self._fix_top_p_param(error_msg): | |
| logger.info("Retrying LLM API call with adjusted top_p parameter") | |
| return self.client.chat.completions.create( | |
| model=self.model_name, | |
| messages=messages, | |
| **self.bio_model_params, | |
| **kwargs | |
| ) | |
| # Re-raise the exception | |
| raise | |
| def __build_message( | |
| self, system_prompt: str, user_prompt: str, language: str | |
| ) -> List[Dict[str, str]]: | |
| """Builds message for LLM API call. | |
| Args: | |
| system_prompt: System prompt content. | |
| user_prompt: User prompt content. | |
| language: Preferred language for the response. | |
| Returns: | |
| List[Dict[str, str]]: Formatted message for the LLM. | |
| """ | |
| raw_message = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt}, | |
| ] | |
| if language: | |
| raw_message.append( | |
| { | |
| "role": "system", | |
| "content": PREFER_LANGUAGE_SYSTEM_PROMPT.format(language=language), | |
| } | |
| ) | |
| return raw_message | |
| def _global_bio_generate(self, global_bio: Bio) -> Bio: | |
| """Generates global biography. | |
| Args: | |
| global_bio: Bio object to generate content for. | |
| Returns: | |
| Bio: Updated Bio object with generated content. | |
| """ | |
| user_prompt = global_bio.to_str() | |
| system_prompt = GLOBAL_BIO_SYSTEM_PROMPT | |
| global_bio_message = self.__build_message( | |
| system_prompt, user_prompt, language=self.preferred_language | |
| ) | |
| response = self._call_llm_with_retry(global_bio_message) | |
| third_perspective_result = response.choices[0].message.content | |
| global_bio.summary_third_view = third_perspective_result | |
| global_bio.content_third_view = global_bio.complete_content() | |
| global_bio = self._shift_perspective(global_bio) | |
| global_bio = self._assign_confidence_level(global_bio) | |
| return global_bio | |
| def _shift_perspective(self, global_bio: Bio) -> Bio: | |
| """Shifts the perspective of the biography to second person. | |
| Args: | |
| global_bio: Bio object to shift perspective for. | |
| Returns: | |
| Bio: Updated Bio object with shifted perspective. | |
| """ | |
| system_prompt = COMMON_PERSPECTIVE_SHIFT_SYSTEM_PROMPT | |
| user_prompt = global_bio.summary_third_view | |
| shift_perspective_message = self.__build_message( | |
| system_prompt, user_prompt, language=self.preferred_language | |
| ) | |
| response = self._call_llm_with_retry(shift_perspective_message) | |
| second_perspective_result = response.choices[0].message.content | |
| global_bio.summary_second_view = second_perspective_result | |
| global_bio.content_second_view = global_bio.complete_content(second_view=True) | |
| return global_bio | |
| def _assign_confidence_level(self, global_bio: Bio) -> Bio: | |
| """Assigns confidence levels to shades in the biography. | |
| Args: | |
| global_bio: Bio object to assign confidence levels to. | |
| Returns: | |
| Bio: Updated Bio object with confidence levels assigned. | |
| """ | |
| level_n, interest_n = len(IMPORTANCE_TO_CONFIDENCE), len(global_bio.shades_list) | |
| level_list = [ | |
| IMPORTANCE_TO_CONFIDENCE[level_n - int(i / interest_n * level_n)] | |
| for i in range(interest_n) | |
| ] | |
| for shade, level in zip(global_bio.shades_list, level_list): | |
| shade.confidence_level = level | |
| return global_bio | |
| def gen_global_biography( | |
| self, old_profile: Bio, cluster_list: List[Cluster] | |
| ) -> Bio: | |
| """Generates the global biography of the user. | |
| Args: | |
| old_profile: Previous Bio object. | |
| cluster_list: List of clusters for reference. | |
| Returns: | |
| Bio: Updated global biography. | |
| """ | |
| global_bio = deepcopy(old_profile) | |
| global_bio = self._global_bio_generate(global_bio) | |
| return global_bio | |
| def gen_shade_for_cluster( | |
| self, | |
| old_memory_list: List[Note], | |
| new_memory_list: List[Note], | |
| shade_info_list: List[ShadeInfo], | |
| )-> Optional[ShadeInfo]: | |
| """Generates shade for a cluster. | |
| Args: | |
| old_memory_list: List of previous notes. | |
| new_memory_list: List of new notes. | |
| shade_info_list: List of shade information. | |
| Returns: | |
| Generated shade. | |
| """ | |
| shade_generator = ShadeGenerator() | |
| shade = shade_generator.generate_shade( | |
| old_memory_list=old_memory_list, | |
| new_memory_list=new_memory_list, | |
| shade_info_list=shade_info_list, | |
| ) | |
| return shade | |
| def merge_shades(self, shade_info_list: List[ShadeMergeInfo]): | |
| """Merges multiple shades. | |
| Args: | |
| shade_info_list: List of shade merge information. | |
| Returns: | |
| Merged shade result. | |
| """ | |
| shade_merger = ShadeMerger() | |
| return shade_merger.merge_shades(shade_info_list) | |
| def gen_status_biography( | |
| self, cur_time: str, notes: List[Note], todos: List[Todo], chats: List[Chat] | |
| ): | |
| """Generates the status biography of the user. | |
| Args: | |
| cur_time: Current time string. | |
| notes: List of notes. | |
| todos: List of todos. | |
| chats: List of chats. | |
| Returns: | |
| Generated status biography. | |
| """ | |
| status_bio_generator = StatusBioGenerator() | |
| return status_bio_generator.generate_status_bio(notes, todos, chats) | |
| def gen_topics_for_shades( | |
| self, | |
| old_cluster_list: List[Cluster], | |
| old_outlier_memory_list: List[Memory], | |
| new_memory_list: List[Memory], | |
| cophenetic_distance: float = 1.0, | |
| outlier_cutoff_distance: float = 0.5, | |
| cluster_merge_distance: float = 0.5, | |
| ): | |
| """Generates topics for shades. | |
| Args: | |
| old_cluster_list: List of previous clusters. | |
| old_outlier_memory_list: List of previous outlier memories. | |
| new_memory_list: List of new memories. | |
| cophenetic_distance: Distance threshold for cophenetic clustering. | |
| outlier_cutoff_distance: Distance threshold for outlier detection. | |
| cluster_merge_distance: Distance threshold for cluster merging. | |
| Returns: | |
| Generated topics for shades. | |
| """ | |
| topics_generator = TopicsGenerator() | |
| return topics_generator.generate_topics_for_shades( | |
| old_cluster_list, | |
| old_outlier_memory_list, | |
| new_memory_list, | |
| cophenetic_distance, | |
| outlier_cutoff_distance, | |
| cluster_merge_distance, | |
| ) | |
| def generate_topics(self, notes_list: List[Note]): | |
| """Generates topics from a list of notes. | |
| Args: | |
| notes_list: List of notes to generate topics from. | |
| Returns: | |
| Generated topics. | |
| """ | |
| topics_generator = TopicsGenerator() | |
| return topics_generator.generate_topics(notes_list) | |