Skip to content

Commit

Permalink
Generate deterministic document ids
Browse files Browse the repository at this point in the history
  • Loading branch information
mawandm committed Aug 16, 2024
1 parent d6135df commit 724afc3
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 11 deletions.
19 changes: 11 additions & 8 deletions nesis/api/core/document_loaders/minio.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import queue
import tempfile
import uuid
from typing import Dict, Any, Optional

import memcache
Expand Down Expand Up @@ -196,15 +197,17 @@ def _sync_document(
Here we check if this file has been updated.
If the file has been updated, we delete it from the vector store and re-ingest the new updated file
"""
document: Document = get_document(document_id=item.etag)
document_id = None if document is None else document.uuid
document_id = str(
uuid.uuid5(uuid.NAMESPACE_DNS, f"{datasource.uuid}/{item.etag}")
)
document: Document = get_document(document_id=document_id)

for _ingest_runner in self._ingest_runners:
try:
response_json = _ingest_runner.run(
file_path=file_path,
metadata=metadata,
document_id=document_id,
document_id=None if document is None else document.uuid,
last_modified=item.last_modified.replace(tzinfo=None).replace(
microsecond=0
),
Expand All @@ -214,22 +217,22 @@ def _sync_document(
_LOG.warning(f"File {file_path} ingestion failed", exc_info=True)
response_json = None
except UserWarning:
_LOG.debug(f"File {file_path} is already processing")
return
_LOG.warning(f"File {file_path} is already processing")
continue

if response_json is None:
return
_LOG.warning("No response from ingest runner received")
continue

_ingest_runner.save(
document_id=item.etag,
document_id=document_id,
datasource_id=datasource.uuid,
filename=item.object_name,
base_uri=endpoint,
rag_metadata=response_json,
store_metadata={
"bucket_name": item.bucket_name,
"object_name": item.object_name,
"etag": item.etag,
"size": item.size,
"last_modified": item.last_modified.strftime(
DEFAULT_DATETIME_FORMAT
Expand Down
4 changes: 3 additions & 1 deletion nesis/api/core/document_loaders/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,16 @@ def run(
) -> Union[Dict[str, Any], None]:

if document_id is not None:
_LOG.debug(f"Checking if document {document_id} is modified")
_is_modified = self._is_modified(
document_id=document_id, last_modified=last_modified
)
if _is_modified is None or not _is_modified:
_LOG.debug(f"Document {document_id} is not modified")
return

url = f"{self._rag_endpoint}/v1/extractions/text"

_LOG.debug(f"Document {document_id} is modified, performing extraction")
response = self._http_client.upload(
url=url,
filepath=file_path,
Expand Down
8 changes: 6 additions & 2 deletions nesis/api/tests/core/document_loaders/test_minio.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,10 +401,14 @@ def test_update_ingest_documents(
session.commit()

# The document record

document = Document(
base_uri="http://localhost:4566",
document_id="d41d8cd98f00b204e9800998ecf8427e",
document_id=str(
uuid.uuid5(
uuid.NAMESPACE_DNS,
f"{datasource.uuid}/d41d8cd98f00b204e9800998ecf8427e",
)
),
filename="invalid.pdf",
rag_metadata={"data": [{"doc_id": str(uuid.uuid4())}]},
store_metadata={
Expand Down

0 comments on commit 724afc3

Please sign in to comment.