from typing import List, Optional, Dict from sqlalchemy import select from lpm_kernel.common.repository.base_repository import BaseRepository from lpm_kernel.file_data.document import Document from lpm_kernel.file_data.process_status import ProcessStatus from lpm_kernel.file_data.document_dto import DocumentDTO from lpm_kernel.file_data.models import ChunkModel, DocumentModel from .dto.chunk_dto import ChunkDTO import logging logger = logging.getLogger(__name__) class DocumentRepository(BaseRepository[Document]): def __init__(self): super().__init__(Document) def update_document_analysis( self, doc_id: int, insight: Dict, summary: Dict ) -> Optional[DocumentDTO]: """update doc's insight and summary""" with self._db.session() as session: document = session.get(self.model, doc_id) if document: document.insight = insight document.summary = summary document.analyze_status = ProcessStatus.SUCCESS session.commit() return Document.to_dto(document) return None def find_unanalyzed(self) -> List[DocumentDTO]: """search unanalyzed doc according to analyze_status""" with self._db.session() as session: query = select(self.model).where( self.model.analyze_status.in_([ProcessStatus.INITIALIZED, ProcessStatus.FAILED]) ) result = session.execute(query) return [Document.to_dto(doc) for doc in result.scalars().all()] def find_chunks(self, document_id: int) -> List[ChunkDTO]: """search all chunks of the specified document""" with self._db.session() as session: chunks = ( session.query(ChunkModel) .filter(ChunkModel.document_id == document_id) .all() ) return [ ChunkDTO( id=chunk.id, document_id=chunk.document_id, has_embedding=chunk.has_embedding, # embedding=chunk.embedding, length=len(chunk.content) if chunk.content else 0, content=chunk.content, tags=chunk.tags, topic=chunk.topic, ) for chunk in chunks ] def save_chunk(self, chunk: ChunkModel) -> ChunkModel: """save chunk""" with self._db.session() as session: session.add(chunk) session.flush() # get auto-gen ID session.refresh(chunk) return chunk def find_one(self, document_id: int) -> Optional[DocumentDTO]: """search doc by id""" with self._db.session() as session: document = session.get(self.model, document_id) return Document.to_dto(document) if document else None def update_chunk_embedding_status(self, chunk_id: int, has_embedding: bool) -> None: """update chunk embedding""" try: with self._db.session() as session: chunk = ( session.query(ChunkModel).filter(ChunkModel.id == chunk_id).first() ) if chunk: chunk.has_embedding = has_embedding session.commit() logger.debug(f"Updated embedding status for chunk {chunk_id}") else: logger.warning(f"Chunk not found with id: {chunk_id}") except Exception as e: logger.error(f"Error updating chunk embedding status: {str(e)}") raise def find_unembedding(self) -> List[DocumentDTO]: """search unembedding documents according to embedding_status""" with self._db.session() as session: query = select(self.model).where( self.model.embedding_status.in_([ProcessStatus.INITIALIZED, ProcessStatus.FAILED]) ) result = session.execute(query) return [Document.to_dto(doc) for doc in result.scalars().all()] def update_embedding_status(self, document_id: int, status: ProcessStatus) -> None: """update doc embedding""" try: with self._db.session() as session: document = ( session.query(DocumentModel) .filter(DocumentModel.id == document_id) .first() ) if document: document.embedding_status = status.value session.commit() logger.debug( f"Updated embedding status for document {document_id} to {status.value}" ) else: logger.warning(f"Document not found with id: {document_id}") except Exception as e: logger.error(f"Error updating document embedding status: {str(e)}") raise