Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support edit metadata #414

Merged
merged 7 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions lazyllm/tools/rag/chroma_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ def remove_nodes(self, group_name: str, uids: Optional[List[str]] = None) -> Non
self._db_client.delete_collection(name=group_name)
return self._map_store.remove_nodes(group_name, uids)

@override
def update_doc_meta(self, filepath: str, metadata: dict) -> None:
self._map_store.update_doc_meta(filepath, metadata)

@override
def get_nodes(self, group_name: str, uids: List[str] = None) -> List[DocNode]:
return self._map_store.get_nodes(group_name, uids)
Expand Down
7 changes: 7 additions & 0 deletions lazyllm/tools/rag/doc_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,13 @@ def add_reader(self, pattern: str, func: Optional[Callable] = None):

def worker(self):
while True:
# Apply meta changes
rows = self._dlm.fetch_docs_changed_meta(self._kb_group_name)
if rows:
for row in rows:
new_meta_dict = json.loads(row[1]) if row[1] else {}
self.store.update_doc_meta(row[0], new_meta_dict)

docs = self._dlm.get_docs_need_reparse(group=self._kb_group_name)
if docs:
filepaths = [doc.path for doc in docs]
Expand Down
143 changes: 142 additions & 1 deletion lazyllm/tools/rag/doc_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
import json
from typing import List, Optional, Dict
from typing import List, Optional, Dict, Union
from pydantic import BaseModel, Field

from starlette.responses import RedirectResponse
Expand Down Expand Up @@ -213,5 +213,146 @@ def delete_files_from_group(self, request: FileGroupRequest):
except Exception as e:
return BaseResponse(code=500, msg=str(e), data=None)

class AddMetadataRequest(BaseModel):
doc_ids: List[str]
kv_pair: Dict[str, Union[bool, int, float, str, list]]

@app.post("/add_metadata")
def add_metadata(self, add_metadata_request: AddMetadataRequest):
doc_ids = add_metadata_request.doc_ids
kv_pair = add_metadata_request.kv_pair
try:
docs = self._manager.get_docs(doc_ids)
if not docs:
return BaseResponse(code=400, msg="Failed, no doc found")
doc_meta = {}
for doc in docs:
meta_dict = json.loads(doc.meta) if doc.meta else {}
for k, v in kv_pair.items():
if k not in meta_dict or not meta_dict[k]:
meta_dict[k] = v
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

此处不合理,假设我有两个标签a和b先后add,第一次add的时候会赋值成a,然后第二次add时就报错了

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我建议metadata加的时候要简单指定一下是不是list(取一个适合业务的名字,比如vector_values=True)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

出于简化设计和易于使用的考量。改为了这样的设计:
前提约束:metadata 内容是扁平的,即 {key: val} val 中不会出现嵌套的dict。

增加metadata时, kv_pair类型约束为: Dict[str, Union[bool, int, float, str, list]]

  1. 若k为新增:使用v直接赋值
  2. k已存在,若meta[k] 为list:将v中元素追加至meta[k]中
  3. k已存在,meta[k] 非list:将meta[k]变为list,并追加v中所有元素

elif isinstance(meta_dict[k], list):
meta_dict[k].extend(v) if isinstance(v, list) else meta_dict[k].append(v)
else:
meta_dict[k] = ([meta_dict[k]] + v) if isinstance(v, list) else [meta_dict[k], v]
doc_meta[doc.doc_id] = meta_dict
self._manager.set_docs_new_meta(doc_meta)
return BaseResponse(data=None)
except Exception as e:
return BaseResponse(code=500, msg=str(e), data=None)

class DeleteMetadataRequest(BaseModel):
doc_ids: List[str]
keys: Optional[List[str]] = Field(None)
kv_pair: Optional[Dict[str, Union[bool, int, float, str, list]]] = Field(None)

def _inplace_del_meta(self, meta_dict, kv_pair: Dict[str, Union[None, bool, int, float, str, list]]):
# alert: meta_dict is not a deepcopy
for k, v in kv_pair.items():
if k not in meta_dict:
continue
if v is None:
meta_dict.pop(k, None)
elif isinstance(meta_dict[k], list):
if isinstance(v, (bool, int, float, str)):
v = [v]
# delete v exists in meta_dict[k]
meta_dict[k] = list(set(meta_dict[k]) - set(v))
else:
# old meta[k] not a list, use v as condition to delete the key
if meta_dict[k] == v:
meta_dict.pop(k, None)

@app.post("/delete_metadata_item")
def delete_metadata_item(self, del_metadata_request: DeleteMetadataRequest):
doc_ids = del_metadata_request.doc_ids
kv_pair = del_metadata_request.kv_pair
keys = del_metadata_request.keys
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果我想meta_dict[k].remove(v),该怎么办

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已更新delete_metadata_item 接口,解决此需求

try:
if keys is not None:
# convert keys to kv_pair
if kv_pair:
kv_pair.update({k: None for k in keys})
else:
kv_pair = {k: None for k in keys}
if not kv_pair:
# clear metadata
self._manager.set_docs_new_meta({doc_id: {} for doc_id in doc_ids})
else:
docs = self._manager.get_docs(doc_ids)
if not docs:
return BaseResponse(code=400, msg="Failed, no doc found")
doc_meta = {}
for doc in docs:
meta_dict = json.loads(doc.meta) if doc.meta else {}
self._inplace_del_meta(meta_dict, kv_pair)
doc_meta[doc.doc_id] = meta_dict
self._manager.set_docs_new_meta(doc_meta)
return BaseResponse(data=None)
except Exception as e:
return BaseResponse(code=500, msg=str(e), data=None)

class UpdateMetadataRequest(BaseModel):
doc_ids: List[str]
kv_pair: Dict[str, Union[bool, int, float, str, list]]

@app.post("/update_or_create_metadata_keys")
def update_or_create_metadata_keys(self, update_metadata_request: UpdateMetadataRequest):
doc_ids = update_metadata_request.doc_ids
kv_pair = update_metadata_request.kv_pair
try:
docs = self._manager.get_docs(doc_ids)
if not docs:
return BaseResponse(code=400, msg="Failed, no doc found")
for doc in docs:
doc_meta = {}
meta_dict = json.loads(doc.meta) if doc.meta else {}
for k, v in kv_pair.items():
meta_dict[k] = v
doc_meta[doc.doc_id] = meta_dict
self._manager.set_docs_new_meta(doc_meta)
return BaseResponse(data=None)
except Exception as e:
return BaseResponse(code=500, msg=str(e), data=None)

class ResetMetadataRequest(BaseModel):
doc_ids: List[str]
new_meta: Dict[str, Union[bool, int, float, str, list]]

@app.post("/reset_metadata")
def reset_metadata(self, reset_metadata_request: ResetMetadataRequest):
doc_ids = reset_metadata_request.doc_ids
new_meta = reset_metadata_request.new_meta
try:
docs = self._manager.get_docs(doc_ids)
if not docs:
return BaseResponse(code=400, msg="Failed, no doc found")
self._manager.set_docs_new_meta({doc.doc_id: new_meta for doc in docs})
return BaseResponse(data=None)
except Exception as e:
return BaseResponse(code=500, msg=str(e), data=None)

class QueryMetadataRequest(BaseModel):
doc_id: str
key: Optional[str] = None

@app.post("/query_metadata")
def query_metadata(self, query_metadata_request: QueryMetadataRequest):
doc_id = query_metadata_request.doc_id
key = query_metadata_request.key
try:
docs = self._manager.get_docs(doc_id)
if not docs:
return BaseResponse(data=None)
doc = docs[0]
meta_dict = json.loads(doc.meta) if doc.meta else {}
if not key:
return BaseResponse(data=meta_dict)
if key not in meta_dict:
return BaseResponse(code=400, msg=f"Failed, key {key} does not exist")
return BaseResponse(data=meta_dict[key])
except Exception as e:
return BaseResponse(code=500, msg=str(e), data=None)

def __repr__(self):
return lazyllm.make_repr("Module", "DocManager")
3 changes: 3 additions & 0 deletions lazyllm/tools/rag/global_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,6 @@ def __init__(self, data_type: int, element_type: Optional[int] = None,
RAG_DOC_CREATION_DATE = 'creation_date'
RAG_DOC_LAST_MODIFIED_DATE = 'last_modified_date'
RAG_DOC_LAST_ACCESSED_DATE = 'last_accessed_date'

RAG_SYSTEM_META_KEYS = set([RAG_DOC_ID, RAG_DOC_PATH, RAG_DOC_FILE_NAME, RAG_DOC_FILE_TYPE, RAG_DOC_FILE_SIZE,
RAG_DOC_CREATION_DATE, RAG_DOC_LAST_MODIFIED_DATE, RAG_DOC_LAST_ACCESSED_DATE])
15 changes: 15 additions & 0 deletions lazyllm/tools/rag/map_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from .utils import _FileNodeIndex
from .default_index import DefaultIndex
from lazyllm.common import override
from .global_metadata import RAG_SYSTEM_META_KEYS

def _update_indices(name2index: Dict[str, IndexBase], nodes: List[DocNode]) -> None:
for index in name2index.values():
Expand Down Expand Up @@ -33,6 +34,20 @@ def update_nodes(self, nodes: List[DocNode]) -> None:
self._group2docs[node._group][node._uid] = node
_update_indices(self._name2index, nodes)

@override
def update_doc_meta(self, filepath: str, metadata: dict) -> None:
doc_nodes: List[DocNode] = self._name2index['file_node_map'].query([filepath])
if not doc_nodes:
return
root_node = doc_nodes[0].root_node
keys_to_delete = []
for k in root_node.global_metadata:
if not (k in RAG_SYSTEM_META_KEYS or k in metadata):
keys_to_delete.append(k)
for k in keys_to_delete:
root_node.global_metadata.pop(k)
root_node.global_metadata.update(metadata)

@override
def remove_nodes(self, group_name: str, uids: List[str] = None) -> None:
if uids:
Expand Down
4 changes: 4 additions & 0 deletions lazyllm/tools/rag/milvus_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ def update_nodes(self, nodes: List[DocNode]) -> None:

self._map_store.update_nodes(nodes)

@override
def update_doc_meta(self, filepath: str, metadata: dict) -> None:
self._map_store.update_doc_meta(filepath, metadata)

@override
def remove_nodes(self, group_name: str, uids: Optional[List[str]] = None) -> None:
if uids:
Expand Down
4 changes: 4 additions & 0 deletions lazyllm/tools/rag/store_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ class StoreBase(ABC):
def update_nodes(self, nodes: List[DocNode]) -> None:
pass

@abstractmethod
def update_doc_meta(self, filepath: str, metadata: dict) -> None:
pass

@abstractmethod
def remove_nodes(self, group_name: str, uids: Optional[List[str]] = None) -> None:
pass
Expand Down
49 changes: 48 additions & 1 deletion lazyllm/tools/rag/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from lazyllm.common.queue import sqlite3_check_threadsafety
import sqlalchemy
from sqlalchemy.orm import DeclarativeBase, sessionmaker
from sqlalchemy import Column, select, insert, update, Row
from sqlalchemy import Column, select, insert, update, Row, bindparam
from sqlalchemy.exc import NoResultFound

import pydantic
Expand Down Expand Up @@ -69,7 +69,9 @@ class KBGroup(KBDataBase):
group_id = Column(sqlalchemy.Integer, primary_key=True, autoincrement=True)
group_name = Column(sqlalchemy.String, nullable=False, unique=True)

DocMetaChangedRow = Row
GroupDocPartRow = Row

class KBGroupDocuments(KBDataBase):
__tablename__ = "kb_group_documents"

Expand All @@ -79,6 +81,7 @@ class KBGroupDocuments(KBDataBase):
status = Column(sqlalchemy.Text, nullable=True)
log = Column(sqlalchemy.Text, nullable=True)
need_reparse = Column(sqlalchemy.Boolean, default=False, nullable=False)
new_meta = Column(sqlalchemy.Text, nullable=True)
# unique constraint
__table_args__ = (sqlalchemy.UniqueConstraint('doc_id', 'group_name', name='uq_doc_to_group'),)

Expand Down Expand Up @@ -148,6 +151,15 @@ def list_files(self, limit: Optional[int] = None, details: bool = False,
status: Union[str, List[str]] = Status.all,
exclude_status: Optional[Union[str, List[str]]] = None): pass

@abstractmethod
def get_docs(self, doc_ids: List[str]) -> List[KBDocument]: pass

@abstractmethod
def set_docs_new_meta(self, doc_meta: Dict[str, dict]): pass

@abstractmethod
def fetch_docs_changed_meta(self, group: str) -> List[DocMetaChangedRow]: pass

@abstractmethod
def list_all_kb_group(self): pass

Expand Down Expand Up @@ -312,6 +324,41 @@ def list_files(self, limit: Optional[int] = None, details: bool = False,
cursor = conn.execute(query, params)
return cursor.fetchall() if details else [row[0] for row in cursor]

def get_docs(self, doc_ids: List[str]) -> List[KBDocument]:
with self._db_lock, self._Session() as session:
docs = session.query(KBDocument).filter(KBDocument.doc_id.in_(doc_ids)).all()
return docs
return []

def set_docs_new_meta(self, doc_meta: Dict[str, dict]):
data_to_update = [{"_doc_id": k, "_meta": json.dumps(v)} for k, v in doc_meta.items()]
with self._db_lock, self._Session() as session:
# Use sqlalchemy core bulk update
stmt = KBDocument.__table__.update().where(
KBDocument.doc_id == bindparam("_doc_id")).values(meta=bindparam("_meta"))
session.execute(stmt, data_to_update)
session.commit()

stmt = KBGroupDocuments.__table__.update().where(
KBGroupDocuments.doc_id == bindparam("_doc_id"),
KBGroupDocuments.status != DocListManager.Status.waiting).values(new_meta=bindparam("_meta"))
session.execute(stmt, data_to_update)
session.commit()

def fetch_docs_changed_meta(self, group: str) -> List[DocMetaChangedRow]:
rows = []
conds = [KBGroupDocuments.group_name == group, KBGroupDocuments.new_meta.isnot(None)]
with self._db_lock, self._Session() as session:
rows = (
session.query(KBDocument.path, KBGroupDocuments.new_meta)
.join(KBGroupDocuments, KBDocument.doc_id == KBGroupDocuments.doc_id)
.filter(*conds).all()
)
stmt = update(KBGroupDocuments).where(sqlalchemy.and_(*conds)).values(new_meta=None)
session.execute(stmt)
session.commit()
return rows

def list_all_kb_group(self):
with self._db_lock, sqlite3.connect(self._db_path, check_same_thread=self._check_same_thread) as conn:
cursor = conn.execute("SELECT group_name FROM document_groups")
Expand Down
39 changes: 39 additions & 0 deletions tests/basic_tests/test_document.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,45 @@ def test_delete_files_in_store(self):
nodes = self.doc_impl.store.get_nodes(LAZY_ROOT_NAME)
assert len(nodes) == 1
assert nodes[0].global_metadata[RAG_DOC_ID] == test2_docid
cur_meta_dict = nodes[0].global_metadata

url = f'{self.doc_server_addr}/add_metadata'
response = httpx.post(url, json=dict(doc_ids=[test2_docid], kv_pair={"title": "title2"}))
assert response.status_code == 200 and response.json().get('code') == 200
time.sleep(20)
assert cur_meta_dict["title"] == "title2"

response = httpx.post(url, json=dict(doc_ids=[test2_docid], kv_pair={"title": "TITLE2"}))
assert response.status_code == 200 and response.json().get('code') == 200
time.sleep(20)
assert cur_meta_dict["title"] == ["title2", "TITLE2"]

url = f'{self.doc_server_addr}/delete_metadata_item'
response = httpx.post(url, json=dict(doc_ids=[test2_docid], keys=["signature"]))
assert response.status_code == 200 and response.json().get('code') == 200
time.sleep(20)
assert "signature" not in cur_meta_dict

response = httpx.post(url, json=dict(doc_ids=[test2_docid], kv_pair={"title": "TITLE2"}))
assert response.status_code == 200 and response.json().get('code') == 200
time.sleep(20)
assert cur_meta_dict["title"] == ["title2"]

url = f'{self.doc_server_addr}/update_or_create_metadata_keys'
response = httpx.post(url, json=dict(doc_ids=[test2_docid], kv_pair={"signature": "signature2"}))
assert response.status_code == 200 and response.json().get('code') == 200
time.sleep(20)
assert cur_meta_dict["signature"] == "signature2"

url = f'{self.doc_server_addr}/reset_metadata'
response = httpx.post(url, json=dict(doc_ids=[test2_docid],
new_meta={"author": "author2", "signature": "signature_new"}))
assert response.status_code == 200 and response.json().get('code') == 200
time.sleep(20)
assert cur_meta_dict["signature"] == "signature_new" and cur_meta_dict["author"] == "author2"

url = f'{self.doc_server_addr}/query_metadata'
response = httpx.post(url, json=dict(doc_id=test2_docid))

# make sure that only one file is left
response = httpx.get(f'{self.doc_server_addr}/list_files')
Expand Down
Loading