Spaces:
Sleeping
Sleeping
File size: 11,227 Bytes
01d5a5d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 |
from datetime import datetime
from typing import List, Optional
import numpy as np
from lpm_kernel.L1.bio import Note, Chunk, Bio, ShadeInfo, ShadeMergeInfo
from lpm_kernel.L1.l1_generator import L1Generator
from lpm_kernel.common.repository.database_session import DatabaseSession
from lpm_kernel.file_data.document_service import document_service
from lpm_kernel.models.l1 import L1Bio
from lpm_kernel.models.l1 import (
L1GenerationResult,
L1Version,
GlobalBioDTO,
StatusBioDTO,
)
from lpm_kernel.models.status_biography import StatusBiography
from lpm_kernel.configs.logging import get_train_process_logger
logger = get_train_process_logger()
def extract_notes_from_documents(documents) -> tuple[List[Note], list]:
"""Extract Note objects and memory list from documents
Args:
documents: Document list containing L0 data
Returns:
tuple: (notes_list, memory_list)
- notes_list: List of Note objects
- memory_list: List of memory dictionaries for clustering
"""
notes_list = []
memory_list = []
for doc in documents:
doc_id = doc.get("id")
doc_embedding = document_service.get_document_embedding(doc_id)
chunks = document_service.get_document_chunks(doc_id)
all_chunk_embeddings = document_service.get_chunk_embeddings_by_document_id(
doc_id
)
if not doc_embedding:
logger.warning(f"Document {doc_id} missing document embedding")
continue
if not chunks:
logger.warning(f"Document {doc_id} missing chunks")
continue
if not all_chunk_embeddings:
logger.warning(f"Document {doc_id} missing chunk embeddings")
continue
# Ensure create_time is in string format
create_time = doc.get("create_time")
if isinstance(create_time, datetime):
create_time = create_time.strftime("%Y-%m-%d %H:%M:%S")
# Get document insight and summary
insight_data = doc.get("insight", {})
summary_data = doc.get("summary", {})
if insight_data is None:
insight_data = {}
if summary_data is None:
summary_data = {}
# Build Note object
note = Note(
noteId=doc_id,
content=doc.get("raw_content", ""),
createTime=create_time,
memoryType="TEXT",
embedding=np.array(doc_embedding),
chunks=[
Chunk(
id=f"{chunk.id}",
document_id=doc_id,
content=chunk.content,
embedding=np.array(all_chunk_embeddings.get(chunk.id))
if all_chunk_embeddings.get(chunk.id)
else None,
tags=chunk.tags if hasattr(chunk, "tags") else None,
topic=chunk.topic if hasattr(chunk, "topic") else None,
)
for chunk in chunks
if all_chunk_embeddings.get(chunk.id)
],
title=insight_data.get("title", ""),
summary=summary_data.get("summary", ""),
insight=insight_data.get("insight", ""),
tags=summary_data.get("keywords", []),
)
notes_list.append(note)
memory_list.append({"memoryId": str(doc_id), "embedding": doc_embedding})
return notes_list, memory_list
def generate_l1_from_l0() -> L1GenerationResult:
"""Generate L1 level knowledge representation from L0 data"""
l1_generator = L1Generator()
# 1. Prepare data
documents = document_service.list_documents_with_l0()
logger.info(f"Found {len(documents)} documents with L0 data")
# 2. Extract notes and memories
notes_list, memory_list = extract_notes_from_documents(documents)
if not notes_list or not memory_list:
logger.error("No valid documents found for processing")
return None
try:
# 3. Generate L1 data
# 3.1 Generate topics
clusters = l1_generator.gen_topics_for_shades(
old_cluster_list=[], old_outlier_memory_list=[], new_memory_list=memory_list
)
logger.info(f"Generated clusters: {bool(clusters)}")
# 3.2 Generate chunk topics
chunk_topics = l1_generator.generate_topics(notes_list)
logger.info(f"Generated chunk topics: {bool(chunk_topics)}")
# Add log in l1_manager.py
logger.info(f"chunk_topics content: {chunk_topics}")
# 3.3 Generate features for each cluster and merge them
shades = generate_shades(clusters, l1_generator, notes_list)
shades_merge_infos = convert_from_shades_to_merge_info(shades)
logger.info(f"Generated {len(shades)} shades")
merged_shades = l1_generator.merge_shades(shades_merge_infos)
logger.info(f"Merged shades success: {merged_shades.success}")
logger.info(
f"Number of merged shades: {len(merged_shades.merge_shade_list) if merged_shades.success else 0}"
)
# 3.4 Generate global biography
bio = l1_generator.gen_global_biography(
old_profile=Bio(
shadesList=merged_shades.merge_shade_list
if merged_shades.success
else []
),
cluster_list=clusters.get("clusterList", []),
)
logger.info(f"Generated global biography: {bio}")
# 4. Build result object
result = L1GenerationResult(
bio=bio, clusters=clusters, chunk_topics=chunk_topics
)
logger.info("L1 generation completed successfully")
return result
except Exception as e:
logger.error(f"Error in L1 generation: {str(e)}", exc_info=True)
raise
def generate_shades(clusters, l1_generator, notes_list):
shades = []
if clusters and "clusterList" in clusters:
for cluster in clusters.get("clusterList", []):
cluster_memory_ids = [
str(m.get("memoryId")) for m in cluster.get("memoryList", [])
]
logger.info(
f"Processing cluster with {len(cluster_memory_ids)} memories"
)
cluster_notes = [
note for note in notes_list if str(note.id) in cluster_memory_ids
]
if cluster_notes:
shade = l1_generator.gen_shade_for_cluster([], cluster_notes, [])
if shade:
shades.append(shade)
logger.info(
f"Generated shade for cluster: {shade.name if hasattr(shade, 'name') else 'Unknown'}"
)
return shades
def convert_from_shades_to_merge_info(shades: List[ShadeInfo]) -> List[ShadeMergeInfo]:
return [ShadeMergeInfo(
id=shade.id,
name=shade.name,
aspect=shade.aspect,
icon=shade.icon,
desc_third_view=shade.desc_third_view,
content_third_view=shade.content_third_view,
desc_second_view=shade.desc_second_view,
content_second_view=shade.content_second_view,
cluster_info=None
) for shade in shades]
def store_status_bio(status_bio: Bio) -> None:
"""Store status biography to database
Args:
status_bio (Bio): Generated status biography object
"""
try:
with DatabaseSession.session() as session:
# Delete old status biography (if exists)
session.query(StatusBiography).delete()
# Insert new status biography
new_bio = StatusBiography(
content=status_bio.content_second_view,
content_third_view=status_bio.content_third_view,
summary=status_bio.summary_second_view,
summary_third_view=status_bio.summary_third_view,
)
session.add(new_bio)
session.commit()
except Exception as e:
logger.error(f"Error storing status biography: {str(e)}", exc_info=True)
raise
def get_latest_status_bio() -> Optional[StatusBioDTO]:
"""Get the latest status biography
Returns:
Optional[StatusBioDTO]: Data transfer object for status biography, returns None if not found
"""
try:
with DatabaseSession.session() as session:
# Get the latest status biography
latest_bio = (
session.query(StatusBiography)
.order_by(StatusBiography.create_time.desc())
.first()
)
if not latest_bio:
return None
# Convert to DTO and return
return StatusBioDTO.from_model(latest_bio)
except Exception as e:
logger.error(f"Error getting status biography: {str(e)}", exc_info=True)
return None
def get_latest_global_bio() -> Optional[GlobalBioDTO]:
"""Get the latest global biography
Returns:
Optional[GlobalBioDTO]: Data transfer object for global biography, returns None if not found
"""
try:
with DatabaseSession.session() as session:
# Get the latest version of L1 data
latest_version = (
session.query(L1Version).order_by(L1Version.version.desc()).first()
)
if not latest_version:
return None
# Get bio data for this version
bio = (
session.query(L1Bio)
.filter(L1Bio.version == latest_version.version)
.first()
)
if not bio:
return None
# Convert to DTO and return
return GlobalBioDTO.from_model(bio)
except Exception as e:
logger.error(f"Error getting global biography: {str(e)}", exc_info=True)
return None
def generate_and_store_status_bio() -> Bio:
"""Generate and store status biography
Returns:
Bio: Generated status biography object
"""
# Generate status biography
status_bio = generate_status_bio()
if status_bio:
# Store to database
store_status_bio(status_bio)
return status_bio
def generate_status_bio() -> Bio:
"""Generate status biography
Returns:
Bio: Generated status biography
"""
l1_generator = L1Generator()
try:
# 1. Get all documents and extract notes
documents = document_service.list_documents_with_l0()
notes_list, _ = extract_notes_from_documents(documents)
if not notes_list:
logger.error("No valid notes found for status bio generation")
return None
# 2. Generate status biography
# Currently we only use notes, todos and chats are empty lists for now
current_time = datetime.now().strftime("%Y-%m-%d")
status_bio = l1_generator.gen_status_biography(
cur_time=current_time,
notes=notes_list,
todos=[], # Empty for now
chats=[], # Empty for now
)
logger.info("Status biography generated successfully")
return status_bio
except Exception as e:
logger.error(f"Error generating status bio: {str(e)}", exc_info=True)
raise
|