안녕하세요? 이번 글에서는 Milvus와 BGE-M3 모델을 사용하여 문서를 임베딩하고 이를 Milvus에 저장하는 방법,
그리고 이 데이터를 활용해 AI 기반 질의응답 시스템을 구축하는 방법을 소개하겠습니다.
Milvus는 대규모 벡터 데이터를 저장하고 검색하는 데 최적화된 고성능 벡터 데이터베이스입니다.
BGE-M3 모델은 중국 베이징 AI 연구소에서 개발한 언어 모델로, 문서 임베딩 작업과 이를 활용한 AI 질의응답 시스템 구현에 매우 유용합니다.
이 글에서는 Milvus에 문서를 저장하는 방법뿐만 아니라, 저장된 데이터를 기반으로 질문에 대한 답변을 생성하는 AI 질의응답 시스템을 구현하는 과정을 단계별로 안내합니다.
1. 필수 패키지 설치
먼저, 필요한 패키지를 설치해야 합니다. 아래 명령어를 사용해 Milvus 및 관련 패키지를 설치할 수 있습니다.
pip install --upgrade pymilvus
pip install "pymilvus[model]"
2. BGE-M3 모델 설정
BGE-M3 모델을 사용하기 위해 BGEM3EmbeddingFunction 클래스를 사용합니다.
이 클래스를 통해 모델을 초기화하고, 문서 및 쿼리를 임베딩할 수 있습니다.
아래 코드는 BGE-M3 모델을 설정하는 예시입니다.
from pymilvus.model.hybrid import BGEM3EmbeddingFunction
bge_m3_ef = BGEM3EmbeddingFunction(
model_name='BAAI/bge-m3', # 모델 이름 지정
device='cpu', # 사용할 장치 지정 ('cpu' 또는 'cuda:0')
use_fp16=False # fp16 사용 여부 (cpu를 사용할 경우 False로 설정)
)
여기서 코드에 노란 줄이나 경고가 발생할 수 있지만, 이는 무시해도 됩니다.
만약 불필요한 경고를 제거하고 싶다면 다음과 같이 코드를 수정할 수 있습니다.
import warnings
warnings.filterwarnings('ignore', category=FutureWarning)
from pymilvus import model
bge_m3 = model.hybrid.BGEM3EmbeddingFunction(
model_name='BAAI/bge-m3',
device='cuda:0',
use_fp16=False
)
milvus_bge-m3링크 : https://milvus.io/docs/embed-with-bgm-m3.md
3. Milvus와 BGE-M3 임베딩 통합
이제 Milvus와 BGE-M3 모델을 통합하여 문서를 임베딩하고 저장하는 기능을 구현하겠습니다.
이를 위해 MilvusManager 클래스를 작성합니다.
이 클래스는 Milvus와의 연결을 관리하고, 임베딩된 데이터를 Milvus에 저장하는 역할을 합니다.
3-1. BGE-M3 임베딩 래퍼 클래스
먼저, BGE-M3 모델을 간편하게 사용할 수 있도록 래퍼 클래스를 정의합니다.
※ 기존 retriever 코드를 사용을 할려고 하면 embed_query의 함수를 사용하는데
Milvus model에서 지원하는 bge-m3는 encode_queries를 사용하면서 에러가 발생하여 래퍼 클래스를 따로 정의하였습니다.
class BGEM3EmbeddingWrapper:
def __init__(self, embedding_function):
self.embedding_function = embedding_function
def embed_query(self, query):
if not query:
pass
else:
embeddings = self.embedding_function.encode_queries([query])
dense_embeddings = embeddings["dense"][0]
return dense_embeddings
def embed_documents(self, texts: list[str]):
if not texts:
pass
else:
embeddings = self.embedding_function.encode_documents(texts)
return embeddings
3-2. Milvus Manager 클래스
이제 Milvus에 연결하고 데이터를 삽입하는 기능을 갖춘 "MilvusManager" 클래스를 정의합니다.
from pymilvus import (
CollectionSchema, DataType, FieldSchema, Collection, connections, utility, model
)
import warnings
warnings.filterwarnings('ignore', category=FutureWarning)
from langchain_milvus import Milvus
bge_m3 = model.hybrid.BGEM3EmbeddingFunction(
model_name = 'BAAI/bge-m3',
device = 'cuda:0',
use_fp16 = False
)
class BGEM3EmbeddingWrapper:
def __init__(self, embedding_function):
self.embedding_function = embedding_function
def embed_query(self, query):
if not query:
pass
else:
embeddings = self.embedding_function.encode_queries([query])
dense_embeddings = embeddings["dense"][0]
return dense_embeddings
def embed_documents(self, texts: list[str]):
if not texts:
pass
else:
embeddings = self.embedding_function.encode_documents(texts)
return embeddings
class MilvusManager:
def __init__(self):
self.dbname = "default"
self.connection_args = {
# uri는 사용하시는 Milvus 주소를 입력하시면 됩니다.
"uri": "자신의 주소",
"db_name": self.dbname,
}
# collection_name은 처음에 생성하면서 만든 이름을 넣으시면 됩니다.
self.collection_name = "example_collection"
self.milvus = None
if self.connect():
self.milvus = self.get_milvus()
def connect(self):
try:
print("Milvus와 연결 중...")
connections.connect(
alias=self.dbname,
**self.connection_args
)
if connections.has_connection(self.dbname):
print("Milvus와 연결되었습니다.")
return True
else:
print("Milvus 연결 실패: 연결이 확인되지 않았습니다.")
return False
except Exception as e:
print(f"Milvus 연결 실패: {e}")
return False
def get_milvus(self):
try:
load_embedding = BGEM3EmbeddingWrapper(bge_m3)
milvus = Milvus(
connection_args=self.connection_args,
collection_name=self.collection_name,
embedding_function=load_embedding,
vector_field='text_vector'
)
return milvus
except Exception as e:
print(f"Milvus 객체 생성 실패: {e}")
return None
def insert_data(self, texts, text_vectors):
if not self.connect():
print("Milvus와 연결이 안되어있습니다.")
return
try:
collection = Collection(name=self.collection_name, using=self.dbname)
entities = [
texts,
text_vectors
]
collection.insert(entities)
# 데이터 삽입 후 flush 호출
collection.flush()
# 메모리 로드
collection.load()
print(f"'{self.collection_name}'이 메모리에 로드되었습니다.")
except Exception as e:
print(f"문서가 {self.collection_name}에 삽입 실패하였습니다. \n{e}")
def create_collection(self, collection_name):
try:
# Milvus 연결
if not self.connect():
return
# 컬렉션이 이미 존재하는지 확인
if utility.has_collection(collection_name, using=self.dbname):
print(f"Collection '{collection_name}'이 이미 존재합니다.")
return
fields = [
FieldSchema(name="pk", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535),
FieldSchema(name="text_vector", dtype=DataType.FLOAT_VECTOR, dim=1024),
]
collection_schema = CollectionSchema(
fields=fields,
enable_dynamic_fields=True,
description=f"{collection_name}의 컬렉션"
)
collection = Collection(name=collection_name, schema=collection_schema, using=self.dbname)
print(f"Collection: '{collection_name}'생성되었습니다.")
index_params = {
"metric_type": "COSINE",
"index_type": "HNSW",
"params": {"M": 64, "efConstruction": 200}
}
collection.create_index(field_name="text_vector", index_params=index_params)
print(f"'text_vector' 인덱스 생성 완료")
except Exception as e:
print(f"생성 실패'{collection_name}': {e}")
3.3 파일 로드 및 임베딩
파일을 로드하고 이를 임베딩하여 Milvus에 저장하는 과정을 구현합니다.
txt파일 형식을 지원하도록 작성하였습니다.
제가 넣은 데이터는 법령 데이터 "개인정보보호법"을 그냥 복사 후 txt 파일로 만들었습니다.
[코드]
import os
from langchain.text_splitter import RecursiveCharacterTextSplitter
from milvus.Milvus import MilvusManager
from langchain_community.document_loaders import TextLoader
# Milvusmanager 함수 초기화
milvus_manager = MilvusManager()
# 자신의 데이터가 있는 파일 경로
file_path = ""
def list_files(directory):
try:
# 디렉토리 내의 모든 파일 및 폴더 이름 가져오기
file_names = os.listdir(directory)
# 파일만 필터링
file_names = [f for f in file_names if os.path.isfile(os.path.join(directory, f))]
return file_names
except FileNotFoundError:
return "Directory not found"
except Exception as e:
return str(e)
file_names = list_files(file_path)
uploaded_file_names = []
for file in file_names:
file_full_path = os.path.join(file_path, file)
with open(file_full_path, "rb") as f:
content = f.read()
uploaded_file_names.append(file)
loader = None
# txt 확장자
if file.endswith(".txt"):
loader = TextLoader(file_full_path, encoding='utf-8')
# 문서를 로드 후 텍스트를 분활.
if loader is not None:
documents = loader.load()
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=512,
chunk_overlap=128,
separators=["\n\n"]
)
processed_texts = []
for document in documents:
split_texts = text_splitter.split_text(document.page_content)
processed_texts.extend(split_texts)
processed_vector = milvus_manager.milvus.embedding_func.embed_documents(processed_texts)
# 빈 processed_texts 처리
if not processed_vector:
print("처리된 텍스트가 없습니다.")
continue
try:
texts, text_vectors = [], []
for idx, (processed_texts, dense_vector) in enumerate(zip(processed_texts, processed_vector["dense"])):
texts.append(processed_texts)
text_vectors.append(dense_vector)
milvus_manager.insert_data(texts, text_vectors)
except Exception as e:
print(f"데이터 처리 중 오류 발생: {e}")
continue
else:
print("지원되지 않는 파일 형식")
데이터가 이렇게 잘 들어간것을 보실 수 있습니다.
해당 프로그램은 attu라는 모니터링을 도움을 주는 프로그램입니다.
다운로드 링크 : https://github.com/zilliztech/attu
위 링크에 접속 후 Releases를 클릭하시면 파일을 다운로드 하실 수 있습니다.
RAG의 꽃 Retriever 시작
이제 임베딩이 잘 이루어졌으니, Retriever를 사용하여 저장된 데이터를 기반으로 질문에 대한 답변을 생성해 보겠습니다.
이번 예제에서는 langchain_community.llms.ollama의 Ollama 모델과 MilvusManager를 활용하여 간단한 질의응답 체인을 만들어 보겠습니다.
[코드]
from langchain_community.llms.ollama import Ollama
from milvus.Milvus import MilvusManager
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
# Milvus에서 데이터 불러오기
milvus_manager = MilvusManager().get_milvus()
milvus_retriever = milvus_manager.as_retriever()
# 불러온 문서들을 형식에 맞게 출력하는 함수
def format_docs(docs):
for doc in docs:
print("가져온 문서들: ", doc.page_content)
return "\n\n".join(doc.page_content for doc in docs)
# Ollama 모델 초기화
llm = Ollama(model="gemma2")
# 프롬프트 템플릿 생성
prompt = PromptTemplate.from_template("""
너는 도움을 주는 AI 언어모델이야.
모든 답변은 한글(Korean)로 대답하고, 주어진 context에 기반해서 대답해.
잘 모르는 정답이면 "잘 모르겠습니다."라고 출력해.
#Question:
{question}
#Context:
{context}
#Answer:"""
)
# 질의응답 체인 구성
chain = (
{"context": milvus_retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
# 질의응답 실행
print("\nAI 답변:\n", chain.invoke("개인정보란 무엇인가요?"))
위 코드에서는 Milvus에 저장된 데이터를 기반으로 Retriever를 사용하여 문서를 검색한 뒤,
해당 문서를 Ollama 모델에 전달하여 질문에 대한 답변을 생성합니다.
이 과정은 다음과 같이 이루어집니다:
- Milvus에서 문서 검색: MilvusManager에서 제공하는 as_retriever() 메서드를 통해 문서 검색 기능을 활성화합니다.
- 문서 형식화: 검색된 문서를 format_docs 함수를 통해 적절한 형식으로 변환합니다.
- Ollama 모델 사용: Ollama 모델을 사용하여 문서의 내용에 기반한 답변을 생성합니다.
- 질의응답 실행: 마지막으로 chain.invoke() 메서드를 통해 질문을 입력하면,
AI가 해당 질문에 대한 답변을 생성하여 출력합니다.
이 코드를 실행하면, Milvus에 저장된 문서들에서 필요한 정보를 검색하고 이를 기반으로 답변을 생성할 수 있습니다.
Retriever 기반의 시스템은 특히 대규모 데이터베이스에서 빠르고 효율적인 검색을 가능하게 하며,
RAG(Retrieval-Augmented Generation) 접근 방식을 통해 높은 정확도의 응답을 생성할 수 있습니다.
[가져온 문서]
해상도가 낮지만 문서를 가져왔습니다.
가져온 문서를 보고 싶으시면 아래의 [더보기]을 클릭하시면 됩니다.
가져온문서들: 개인정보보호위원회(분쟁조정과 - 분쟁조정, 손해배상책임), 1833-6972, 02-2100-3142
개인정보보호위원회(범정부마이데이터 추진단(전략기획팀 - 전송요구권(마이데이터)), 02-2100-3173
제1장 총칙
조문체계도버튼연혁
제1조(목적) 이 법은 개인정보의 처리 및 보호에 관한 사항을 정함으로써 개인의 자유와 권리를 보호하고, 나아가 개인의 존엄과 가치를 구현함을 목적으로 한다. <개정 2014. 3. 24.>
조문체계도버튼연혁생활법령버튼
제2조(정의) 이 법에서 사용하는 용어의 뜻은 다음과 같다. <개정 2014. 3. 24., 2020. 2. 4., 2023. 3. 14.>
1. “개인정보”란 살아 있는 개인에 관한 정보로서 다음 각 목의 어느 하나에 해당하는 정보를 말한다.
가. 성명, 주민등록번호 및 영상 등을 통하여 개인을 알아볼 수 있는 정보
가져온문서들: 1. “개인정보”란 살아 있는 개인에 관한 정보로서 다음 각 목의 어느 하나에 해당하는 정보를 말한다.
나. 해당 정보만으로는 특정 개인을 알아볼 수 없더라도 다른 정보와 쉽게 결합하여 알아볼 수 있는 정보. 이 경우 쉽게 결합할 수 있는지 여부는 다른 정보의 입수 가능성 등 개인을 알아보는 데 소요되는 시간, 비용, 기술 등을 합리적으로 고려하여야 한다.
술 등을 합리적으로 고려하여야 한다.
다. 가목 또는 나목을 제1호의2에 따라 가명처리함으로써 원래의 상태로 복원하기 위한 추가 정보의 사용ㆍ결합 없이는 특정 개인을 알아볼 수 없는 정보(이하 “가명정보”라 한다)
1의2. “가명처리”란 개인정보의 일부를 삭제하거나 일부 또는 전부를 대체하는 등의 방법으로 추가 정보가 없이는 특정 개인을 알아볼 수 없도록 처리하는 것을 말한다.
가져온문서들: 1의2. “가명처리”란 개인정보의 일부를 삭제하거나 일부 또는 전부를 대체하는 등의 방법으로 추가 정보가 없이는 특정 개인을 알아볼 수 없도록 처리하는 것을 말한다.
2. “처리”란 개인정보의 수집, 생성, 연계, 연동, 기록, 저장, 보유, 가공, 편집, 검색, 출력, 정정(訂正), 복구, 이용, 제공, 공개, 파기(破棄), 그 밖에 이와 유사한 행위를 말한다.
3. “정보주체”란 처리되는 정보에 의하여 알아볼 수 있는 사람으로서 그 정보의 주체가 되는 사람을 말한다.
4. “개인정보파일”이란 개인정보를 쉽게 검색할 수 있도록 일정한 규칙에 따라 체계적으로 배열하거나 구성한 개인정보의 집합물(集合物)을 말한다.
5. “개인정보처리자”란 업무를 목적으로 개인정보파일을 운용하기 위하여 스스로 또는 다른 사람을 통하여 개인정보를 처리하는 공공기관, 법인, 단체 및 개인 등을 말한다.
6. “공공기관”이란 다음 각 목의 기관을 말한다.
가져온문서들: 조문체계도버튼연혁관련규제버튼생활법령버튼 는 개인정보파일에 대하여 개인정보 처리방침을 정한다. <개정 2
제30조(개인정보 처리방침의 수립 및 공개) ① 개인정보처리자는 다음 각 호의 사항이 포함된 개인정보의 처리 방침(이하 “개인정보 처리방침”이라 한다)을 정하여야 한다. 이 경우 공공기관은 제32조에 따라 등록대상이
되는 개인정보파일에 대하여 개인정보 처리방침을 정한다. <개정 2016. 3. 29., 2020. 2. 4., 2023. 3. 14.>
1. 개인정보의 처리 목적
2. 개인정보의 처리 및 보유 기간
3. 개인정보의 제3자 제공에 관한 사항(해당되는 경우에만 정한다)
3의2. 개인정보의 파기절차 및 파기방법(제21조제1항 단서에 따라 개인정보를 보존하여야 하는 경우에는 그 보존근거와 보존하는 개인정보 항목을 포함한다)
3의3. 제23조제3항에 따른 민감정보의 공개 가능성 및 비공개를 선택하는 방법(해당되는 경우에만 정한다)
4. 개인정보처리의 위탁에 관한 사항(해당되는 경우에만 정한다)
[AI답변]
아래의 [더보기]를 클릭하시면 답변내용을 보실 수 있습니다.
AI답변:
주어진 텍스트에 따르면, '개인정보'란 살아있는 개인에 관한 정보로서 다음과 같은 조건을 충족하는 정보를 말합니다.
1. 성명, 주민등록번호 및 영상 등으로 개인을 알아볼 수 있는 정보입니다.
2. 해당 정보만으로는 특정 개인을 알아보지 못하더라도 다른 정보와 쉽게 결합하여 알아볼 수 있는 정보입니다. (결합의 용이성은 다른 정보 입수 가능성, 소요 시간, 비용, 기술 등을 고려해야 합니다)
3. 가명처리된 후 원래 상태로 복원하기 위해 추가적인 정보가 필요하지 않고, 해당 정보만으로는 특정 개인을 알아볼 수 없는 가명정보입니다.
간단히 말해서, 누군가를 식별하거나 찾아낼 수 있는 모든 종류의 데이터가 '개인정보'에 속합니다.
오늘은 Milvus의 as_retriever 함수와 체인을 사용하여 AI 답변을 생성하는 방법을 알아보았습니다.
이 과정을 통해 Milvus와 LLM을 통합하여 효과적으로 질의응답 시스템을 구현할 수 있다는 것을 확인할 수 있었습니다.
Milvus를 사용하면서 많은 분들이 좋은 경험을 쌓으시길 바라며, 앞으로도 다양한 프로젝트에서 유용하게 활용하시길 기원합니다.
감사합니다!
'VectorDB' 카테고리의 다른 글
[VectorDB] 밀버스(milvus) DB 데이터 csv 추출 튜토리얼 (1) | 2024.09.30 |
---|---|
[VectorDB]Milvus와 하이브리드 서치을 활용한 고급 벡터 검색 구현하기 (1) | 2024.09.03 |
[VectorDB] Milvus 연결 + 컬렉션 및 스키마 생성하기 (0) | 2024.08.26 |
[VectorDB] Milvus docker에 설치하기 (1) | 2024.08.22 |
[VectorDB] Milvus 정의 (0) | 2024.08.22 |