secondme-api / lpm_kernel /file_data /document_service.py
Gemini
feat: add detailed logging
01d5a5d
# file_data/service.py
from pathlib import Path
from typing import List, Dict, Optional
import os
from sqlalchemy import select
from lpm_kernel.common.repository.database_session import DatabaseSession
from lpm_kernel.common.repository.vector_store_factory import VectorStoreFactory
from lpm_kernel.file_data.document_dto import DocumentDTO, CreateDocumentRequest
from lpm_kernel.file_data.exceptions import FileProcessingError
from lpm_kernel.kernel.l0_base import InsightKernel, SummaryKernel
from lpm_kernel.models.memory import Memory
from .document import Document
from .document_repository import DocumentRepository
from .dto.chunk_dto import ChunkDTO
from .embedding_service import EmbeddingService
from .process_factory import ProcessorFactory
from .process_status import ProcessStatus
from lpm_kernel.configs.logging import get_train_process_logger
logger = get_train_process_logger()
class DocumentService:
def __init__(self):
self._repository = DocumentRepository()
self._insight_kernel = InsightKernel()
self._summary_kernel = SummaryKernel()
self.vector_store = VectorStoreFactory.get_instance()
self.embedding_service = EmbeddingService()
def create_document(self, data: CreateDocumentRequest) -> Document:
"""
create new document
Args:
data (CreateDocumentRequest): create doc request
Returns:
Document: create doc object
"""
doc = Document(
name=data.name,
title=data.title,
mime_type=data.mime_type,
user_description=data.user_description,
url=str(data.url) if data.url else None,
document_size=data.document_size,
extract_status=data.extract_status,
embedding_status=ProcessStatus.INITIALIZED,
raw_content=data.raw_content,
)
return self._repository.create(doc)
def list_documents(self) -> List[Document]:
"""
get all doc list
Returns:
List[Document]: doc object list
"""
return self._repository.list()
def scan_directory(
self, directory_path: str, recursive: bool = False
) -> List[DocumentDTO]:
"""
scan and process files
Args:
directory_path (str): dir to scan
recursive (bool, optional): if recursive scan. Defaults to False.
Returns:
List[Document]: processed doc object list
Raises:
FileProcessingError: when dir not exist or failed
"""
path = Path(directory_path)
if not path.is_dir():
raise FileProcessingError(f"{directory_path} is not a directory")
documents_dtos: List[DocumentDTO] = []
pattern = "**/*" if recursive else "*"
# list all files
files = list(path.glob(pattern))
logger.info(f"Found files: {files}")
for file_path in files:
if file_path.is_file():
try:
logger.info(f"Processing file: {file_path}")
doc = ProcessorFactory.auto_detect_and_process(str(file_path))
# create CreateDocumentRequest obj to database
request = CreateDocumentRequest(
name=doc.name,
title=doc.name,
mime_type=doc.mime_type,
user_description="Auto scanned document",
document_size=doc.document_size,
url=str(file_path.absolute()),
raw_content=doc.raw_content,
extract_status=doc.extract_status,
embedding_status=ProcessStatus.INITIALIZED,
)
saved_doc = self.create_document(request)
documents_dtos.append(saved_doc.to_dto())
logger.info(f"Successfully processed and saved: {file_path}")
except Exception as e:
# add detailed error log
logger.exception(
f"Error processing file {file_path}"
)
continue
logger.info(f"Total documents processed and saved: {len(documents_dtos)}")
return documents_dtos
def _analyze_document(self, doc: DocumentDTO) -> DocumentDTO:
"""
analyze one file
Args:
doc (Document): doc to analyze
Returns:
Document: updated doc
Raises:
Exception: error occurred
"""
try:
# generate insight
insight_result = self._insight_kernel.analyze(doc)
# generate summary
summary_result = self._summary_kernel.analyze(
doc, insight_result["insight"]
)
# update database
updated_doc = self._repository.update_document_analysis(
doc.id, insight_result, summary_result
)
return updated_doc
except Exception as e:
logger.error(f"Document {doc.id} analysis failed: {str(e)}", exc_info=True)
# update status as failed
self._update_analyze_status_failed(doc.id)
raise
def analyze_document(self, document_id: int) -> DocumentDTO:
"""
Analyze a single document by ID
Args:
document_id (int): ID of document to analyze
Returns:
DocumentDTO: The analyzed document
Raises:
ValueError: If document not found
Exception: If analysis fails
"""
try:
# Get document
document = self._repository.find_one(document_id)
if not document:
raise ValueError(f"Document not found with id: {document_id}")
# Perform analysis
return self._analyze_document(document)
except ValueError as e:
logger.error(f"Document {document_id} not found: {str(e)}")
raise
except Exception as e:
logger.error(f"Error analyzing document {document_id}: {str(e)}", exc_info=True)
self._update_analyze_status_failed(document_id)
raise
def _update_analyze_status_failed(self, doc_id: int) -> None:
"""update status as failed"""
try:
with self._repository._db.session() as session:
document = session.get(self._repository.model, doc_id)
if document:
document.analyze_status = ProcessStatus.FAILED
session.commit()
logger.debug(f"Updated analyze status for document {doc_id} to FAILED")
else:
logger.warning(f"Document not found with id: {doc_id}")
except Exception as e:
logger.error(f"Error updating document analyze status: {str(e)}")
def check_all_documents_embeding_status(self) -> bool:
"""
Check if there are any documents that need embedding
Returns:
bool: True if there are documents that need embedding, False otherwise
"""
try:
unembedding_docs = self._repository.find_unembedding()
return len(unembedding_docs) > 0
except Exception as e:
logger.error(f"Error checking documents embedding status: {str(e)}", exc_info=True)
raise
def analyze_all_documents(self) -> List[DocumentDTO]:
"""
analyze all unanalyzed documents
Returns:
List[DocumentDTO]: finished doc list
Raises:
Exception: error occurred
"""
try:
# get all unanalyzed documents
unanalyzed_docs = self._repository.find_unanalyzed()
analyzed_docs = []
success_count = 0
error_count = 0
for index, doc in enumerate(unanalyzed_docs, 1):
try:
analyzed_doc = self._analyze_document(doc)
analyzed_docs.append(analyzed_doc)
success_count += 1
except Exception as e:
error_count += 1
logger.error(f"Document {doc.id} processing failed: {str(e)}")
continue
return analyzed_docs
except Exception as e:
logger.error(f"Error occurred during batch analysis: {str(e)}", exc_info=True)
raise
def get_document_l0(self, document_id: int) -> Dict:
"""
get chunks and embeds
Args:
document_id (int): doc ID
Returns:
Dict: format:
{
"document_id": int,
"chunks": List[Dict],
"total_chunks": int
}
Raises:
FileProcessingError: doc not existed
"""
try:
# get doc
document = self._repository.find_one(document_id)
if not document:
raise FileProcessingError(f"Document not found: {document_id}")
# get doc chunks
chunks = self.get_document_chunks(document_id)
if not chunks:
return {"document_id": document_id, "chunks": [], "total_chunks": 0}
# get doc embeddings
all_chunk_embeddings = self.get_chunk_embeddings_by_document_id(document_id)
# get L0 data
l0_data = {
"document_id": document_id,
"chunks": [
{
"id": chunk.id,
"content": chunk.content,
"has_embedding": chunk.has_embedding,
"embedding": all_chunk_embeddings.get(chunk.id),
"tags": chunk.tags,
"topic": chunk.topic,
}
for chunk in chunks
],
"total_chunks": len(chunks),
}
return l0_data
except FileProcessingError as e:
raise e
except Exception as e:
logger.error(f"Error getting L0 data for document {document_id}: {str(e)}")
raise FileProcessingError(f"Failed to get L0 data: {str(e)}")
def get_document_chunks(self, document_id: int) -> List[ChunkDTO]:
"""
get chunks result
Args:
document_id (int): doc ID
Returns:
List[ChunkDTO]: doc chunks list,each ChunkDTO include embedding info
"""
try:
document = self._repository.find_one(document_id=document_id)
if not document:
logger.info(f"Document not found with id: {document_id}")
return []
chunks = self._repository.find_chunks(document_id=document_id)
logger.info(f"Found {len(chunks)} chunks for document {document_id}")
for chunk in chunks:
chunk.length = len(chunk.content) if chunk.content else 0
if chunk.has_embedding:
chunk.embedding = (
self.embedding_service.get_chunk_embedding_by_chunk_id(chunk.id)
)
return chunks
except Exception as e:
logger.error(f"Error getting chunks for document {document_id}: {str(e)}")
return []
# def save_chunk(self, chunk: Chunk) -> None:
# """
# Args:
# chunk (Chunk): chunk obj
# Raises:
# Exception: error occurred
# """
# try:
# # create ChunkModel instance
# chunk_model = ChunkModel(
# document_id=chunk.document_id,
# content=chunk.content,
# tags=chunk.tags,
# topic=chunk.topic,
# )
# # save to db
# self._repository.save_chunk(chunk_model)
# logger.debug(f"Saved chunk for document {chunk.document_id}")
# except Exception as e:
# logger.error(f"Error saving chunk: {str(e)}")
# raise
def list_documents_with_l0(self) -> List[Dict]:
"""
get all docs' L0 data
Returns:
List[Dict]: list of dict of docs with L0 data
"""
# 1. get all basic data
documents = self.list_documents()
logger.info(f"list_documents len: {len(documents)}")
# 2. each doc L0
documents_with_l0 = []
for doc in documents:
doc_dict = doc.to_dict()
try:
l0_data = self.get_document_l0(doc.id)
doc_dict["l0_data"] = l0_data
logger.info(f"success getting L0 data for document {doc.id} success")
except Exception as e:
logger.error(f"Error getting L0 data for document {doc.id}: {str(e)}")
doc_dict["l0_data"] = None
documents_with_l0.append(doc_dict)
return documents_with_l0
def get_document_by_id(self, document_id: int) -> Optional[Document]:
"""
get doc by ID
Args:
document_id (int): doc ID
Returns:
Optional[Document]: doc object, None if not found
"""
try:
return self._repository.find_one(document_id)
except Exception as e:
logger.error(f"Error getting document by id {document_id}: {str(e)}")
return None
def generate_document_chunk_embeddings(self, document_id: int) -> List[ChunkDTO]:
"""
handle chunks and embeddings
Args:
document_id (int): ID
Returns:
List[ChunkDTO]: chunks list
Raises:
Exception: error occurred
"""
try:
chunks_dtos = self._repository.find_chunks(document_id)
if not chunks_dtos:
logger.info(f"No chunks found for document {document_id}")
return []
# handle embeddings
processed_chunks = self.embedding_service.generate_chunk_embeddings(
chunks_dtos
)
# update state in db
for chunk_dto in processed_chunks:
if chunk_dto.has_embedding:
self._repository.update_chunk_embedding_status(chunk_dto.id, True)
return processed_chunks
except Exception as e:
logger.error(f"Error processing chunk embeddings: {str(e)}")
raise
def get_chunk_embeddings_by_document_id(
self, document_id: int
) -> Dict[int, List[float]]:
"""
get chunks embeddings
Args:
document_id (int): doc ID
Returns:
Dict[int, List[float]]: chunk_id to embedding mapping
Raises:
Exception: error occurred
"""
try:
# get all chunks ID
chunks = self._repository.find_chunks(document_id)
chunk_ids = [str(chunk.id) for chunk in chunks]
# get embeddings from ChromaDB
embeddings = {}
if chunk_ids:
results = self.embedding_service.chunk_collection.get(
ids=chunk_ids, include=["embeddings", "documents"]
)
# transfer chunk_id -> embedding
for i, chunk_id in enumerate(results["ids"]):
embeddings[int(chunk_id)] = results["embeddings"][i]
return embeddings
except Exception as e:
logger.error(
f"Error getting chunk embeddings for document {document_id}: {str(e)}"
)
raise
def process_document_embedding(self, document_id: int) -> List[float]:
"""
handle doc level embedding
Args:
document_id (int): doc ID
Returns:
List[float]: doc embedding
Raises:
ValueError: doc not exist
Exception: error occurred
"""
try:
document = self._repository.find_one(document_id)
if not document:
raise ValueError(f"Document not found with id: {document_id}")
if not document.raw_content:
logger.warning(
f"Document {document_id} has no content to process embedding"
)
self._repository.update_embedding_status(
document_id, ProcessStatus.FAILED
)
return None
# gen doc embedding
embedding = self.embedding_service.generate_document_embedding(document)
if embedding is not None:
self._repository.update_embedding_status(
document_id, ProcessStatus.SUCCESS
)
else:
self._repository.update_embedding_status(
document_id, ProcessStatus.FAILED
)
return embedding
except Exception as e:
logger.error(f"Error processing document embedding: {str(e)}")
self._repository.update_embedding_status(document_id, ProcessStatus.FAILED)
raise
def get_document_embedding(self, document_id: int) -> Optional[List[float]]:
"""
get doc embedding
Args:
document_id (int): doc ID
Returns:
Optional[List[float]]: doc embedding
Raises:
Exception: error occurred
"""
try:
results = self.embedding_service.document_collection.get(
ids=[str(document_id)], include=["embeddings"]
)
if results and results["embeddings"]:
return results["embeddings"][0]
return None
except Exception as e:
logger.error(f"Error getting document embedding: {str(e)}")
raise
def delete_file_by_name(self, filename: str) -> bool:
"""
Args:
filename (str): name to delete
Returns:
bool: if success
Raises:
Exception: error occurred
"""
logger.info(f"Starting to delete file: {filename}")
try:
# 1. search memories
db = DatabaseSession()
memory = None
document_id = None
with db._session_factory() as session:
query = select(Memory).where(Memory.name == filename)
result = session.execute(query)
memory = result.scalar_one_or_none()
if not memory:
logger.warning(f"File record not found: {filename}")
return False
# get related document_id
document_id = memory.document_id
# get filepath
file_path = memory.path
# 2. delete memory
session.delete(memory)
session.commit()
logger.info(f"Deleted record from memories table: {filename}")
# if no related document, only delete physical file
if not document_id:
# delete physical file
if os.path.exists(file_path):
os.remove(file_path)
logger.info(f"Deleted physical file: {file_path}")
return True
# 3. get doc obj
document = self._repository.get_by_id(document_id)
if not document:
logger.warning(f"Corresponding document record not found, ID: {document_id}")
# if no document record, delete physical file
if os.path.exists(file_path):
os.remove(file_path)
logger.info(f"Deleted physical file: {file_path}")
return True
# 4. get all chunks
chunks = self._repository.find_chunks(document_id)
# 5. delete doc embedding from ChromaDB
try:
self.embedding_service.document_collection.delete(
ids=[str(document_id)]
)
logger.info(f"Deleted document embedding from ChromaDB, ID: {document_id}")
except Exception as e:
logger.error(f"Error deleting document embedding: {str(e)}")
# 6. delete all chunk embedding from ChromaDB
if chunks:
try:
chunk_ids = [str(chunk.id) for chunk in chunks]
self.embedding_service.chunk_collection.delete(
ids=chunk_ids
)
logger.info(f"Deleted {len(chunk_ids)} chunk embeddings from ChromaDB")
except Exception as e:
logger.error(f"Error deleting chunk embeddings: {str(e)}")
# 7. delete all chunks embedding from ChromaDB
with db._session_factory() as session:
from lpm_kernel.file_data.models import ChunkModel
session.query(ChunkModel).filter(
ChunkModel.document_id == document_id
).delete()
session.commit()
logger.info(f"Deleted all related chunks")
# delete doc record
doc_entity = session.get(Document, document_id)
if doc_entity:
session.delete(doc_entity)
session.commit()
logger.info(f"Deleted document record from database, ID: {document_id}")
# 8. delete physical file
if os.path.exists(file_path):
os.remove(file_path)
logger.info(f"Deleted physical file: {file_path}")
return True
except Exception as e:
logger.error(f"Error deleting file: {str(e)}", exc_info=True)
raise
def fix_missing_document_analysis(self) -> int:
"""Fix documents with missing insights or summaries
Returns:
int: Number of documents fixed
"""
try:
# Find all documents that have analysis issues
docs = self._repository.list()
fixed_count = 0
for doc in docs:
needs_fixing = False
# Check if document needs analysis
if not doc.analyze_status or doc.analyze_status != ProcessStatus.SUCCESS:
needs_fixing = True
logger.info(f"Document {doc.id} needs analysis (status: {doc.analyze_status})")
# Check if document has missing insights or summaries
elif not doc.insight or not doc.summary:
needs_fixing = True
logger.info(f"Document {doc.id} has missing insight or summary")
# Process documents that need fixing
if needs_fixing:
try:
# Process document analysis
self.analyze_document(doc.id)
fixed_count += 1
logger.info(f"Fixed document {doc.id} analysis")
except Exception as e:
logger.error(f"Error fixing document {doc.id} analysis: {str(e)}")
logger.info(f"Fixed {fixed_count} documents with missing analysis")
return fixed_count
except Exception as e:
logger.error(f"Error in fix_missing_document_analysis: {str(e)}")
raise FileProcessingError(f"Failed to fix document analysis: {str(e)}")
def verify_document_embeddings(self, verbose=True) -> Dict:
"""
Verify all document embeddings and return statistics
Args:
verbose (bool): Whether to log detailed information
Returns:
Dict: Statistics about document embeddings
"""
try:
docs = self._repository.list()
results = {
"total_documents": len(docs),
"documents_with_embedding": 0,
"documents_without_embedding": 0,
"documents_with_content": 0,
"documents_without_content": 0,
"documents_with_summary": 0,
"documents_without_summary": 0,
"documents_with_insight": 0,
"documents_without_insight": 0,
"documents_needing_repair": 0,
}
documents_needing_repair = []
for doc in docs:
# Check if document has content
if doc.raw_content:
results["documents_with_content"] += 1
else:
results["documents_without_content"] += 1
# Check if document has summary
if doc.summary:
results["documents_with_summary"] += 1
else:
results["documents_without_summary"] += 1
# Check if document has insight
if doc.insight:
results["documents_with_insight"] += 1
else:
results["documents_without_insight"] += 1
# Check if embeddings exist in ChromaDB
embedding = self.get_document_embedding(doc.id)
if embedding is not None:
results["documents_with_embedding"] += 1
if verbose:
logger.info(f"Document {doc.id}: '{doc.name}' has embedding of dimension {len(embedding)}")
else:
results["documents_without_embedding"] += 1
if verbose:
logger.warning(f"Document {doc.id}: '{doc.name}' missing embedding")
# Check if document needs repair (has content but missing embedding or analysis)
if doc.raw_content and (embedding is None or not doc.summary or not doc.insight):
documents_needing_repair.append(doc.id)
results["documents_needing_repair"] += 1
# Log statistics
logger.info(f"Document embedding verification results: {results}")
if documents_needing_repair and verbose:
logger.info(f"Documents needing repair: {documents_needing_repair}")
return results
except Exception as e:
logger.error(f"Error verifying document embeddings: {str(e)}", exc_info=True)
raise
# create service
document_service = DocumentService()
# use elsewhere by:
# from lpm_kernel.file_data.service import document_service