본문 바로가기
SageMaker

SageMaker, Streamlit, Opensearch를 사용한 RAG챗봇 구성하기 1. KoSimCSE-RoBERTa를 사용한 한국어 문장 임베딩

by Hyeon Cloud 2023. 11. 6.

목차

SageMaker, Streamlit, Opensearch를 사용한 RAG챗봇 구성하기 1. KoSimCSE-RoBERTa를 사용한 한국어 문장 임베딩

SageMaker, Streamlit, Opensearch를 사용한 RAG챗봇 구성하기 2. KoSimCSE-RoBERTAa SageMaker Studio 테스트

SageMaker, Streamlit, Opensearch를 사용한 RAG챗봇 구성하기 3. KULLM(구름)모델 AWS Large Model Container DLC사용하여 배포하기

SageMaker, Streamlit, Opensearch를 사용한 RAG챗봇 구성하기 4. FAQ with FAISS - Vector Store Test

SageMaker, Streamlit, Opensearch를 사용한 RAG챗봇 구성하기 5. FAQ with OpenSearch - Vector Store Test

SageMaker, Streamlit, Opensearch 사용한 RAG챗봇 구성하기 6. OpenSearch Rag Chatbot Application with Streamlit

 

HuggingFace 의 KoSimCSE-RoBERTa 모델을 사용하여 한국어 문장 임베딩을 수행해보겠습니다.

AWS의 비슷한 서비스로는 Amazon SageMaker Titan Embeddings 가 있으나, 한국어 문장의 경우 해당 모델의 유사성 지표가 높기때문에 이를 사용하겠습니다.

한국어 FAQ문장셋의 임베딩을 수행하고, 생성한 문장 벡터를 Faiss 엔진을 사용한 OpenSearch를 통해 검색하는 작업을 진행하겠습니다.

사용된 모델

BM-K/KoSimCSE-roberta

로컬테스트

로컬에서 KoSimCSE-RoBERTa 모델을 로드하여 유사도계산과 테스트를 진행해보겠습니다.

git lfs install
git clone <https://huggingface.co/BM-K/KoSimCSE-roberta>

localtest.ipynb

모델과 토크나이저를 로드하겠습니다.

import torch
from transformers import AutoModel, AutoTokenizer
model = AutoModel.from_pretrained('BM-K/KoSimCSE-roberta')
tokenizer = AutoTokenizer.from_pretrained('BM-K/KoSimCSE-roberta')

문장을 벡터임베딩 해보겠습니다.

sample = "나는 김현민"
inputs = tokenizer(sample, padding=True, truncation=True, return_tensors="pt")
embeddings, _ = model(**inputs, return_dict=False)
emb_len = len(embeddings[0][0])
print("Sample Sentence: \\n", sample)
print("Size of the Embedding Vector: ", emb_len)
print(f"First 10 Elements of the Embedding Vector (Total Elements: {emb_len}): \\n", embeddings[0][0][0:10])

Sample Sentence: 나는 김현민 Size of the Embedding Vector: 768 First 10 Elements of the Embedding Vector (Total Elements: 768): tensor([ 0.0241, -1.0539, 0.0650, -1.4253, 0.2898, -0.3100, -0.5611, 0.2530, 0.0636, -0.2037], grad_fn=<SliceBackward0>)

유사도를 구해 비교해보겠습니다.

# 코사인 유사도
def cal_score(a, b):
    if len(a.shape) == 1: a = a.unsqueeze(0)
    if len(b.shape) == 1: b = b.unsqueeze(0)

    a_norm = a / a.norm(dim=1)[:, None]
    b_norm = b / b.norm(dim=1)[:, None]
    return torch.mm(a_norm, b_norm.transpose(0, 1)) * 100
def show_embedding_score(tokenizer, model, sentences):
    inputs = tokenizer(sentences, padding=True, truncation=True, return_tensors="pt")
    embeddings, _ = model(**inputs, return_dict=False)

    score01 = cal_score(embeddings[0][0], embeddings[1][0])
    score02 = cal_score(embeddings[0][0], embeddings[2][0])

    print(score01, score02)
sentences1 = [sample,"나는 김현민",'동해물과 백두산이']

show_embedding_score(tokenizer, model, sentences1)

tensor([[100.0000]], grad_fn=<MulBackward0>) tensor([[11.6377]], grad_fn=<MulBackward0>)

SageMaker Endpoint 에 모델 배포

로컬에서 임베딩 모델테스트가 성공적으로 완료되었다면, Amazon SageMaker를 사용하여 프로덕션 환경에 배포해보겠습니다.

SageMaker를 사용하면 API호출을 통해 실시간으로 임베딩을 생성하거나 유사도를 계산할 수 있습니다.

필요한 라이브러리들을 호출하겠습니다. SageMaker의 HuggingFace 라이브러리도 임포트합니다.

import boto3
import sagemaker
from datetime import datetime
from sagemaker.huggingface import HuggingFaceModel
try:
    role = sagemaker.get_execution_role()
except ValueError:
    iam = boto3.client('iam')
    role = iam.get_role(RoleName='sagemaker_execution_role')['Role']['Arn']

print(f"sagemaker role arn: {role}")

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml sagemaker role arn: arn:aws:iam::xxxxxxxxxx:role/service-role/AmazonSageMaker-ExecutionRole-20231030T104069

EndPoint 생성시 허깅페이스 모델을 로드하도록 하겠습니다.

import threading
from datetime import datetime
from sagemaker.huggingface import HuggingFaceModel

# Hub Model configuration
hub = {
  'HF_MODEL_ID': 'BM-K/KoSimCSE-roberta',
  'HF_TASK': 'feature-extraction'
}

# Create Hugging Face Model Class
huggingface_model = HuggingFaceModel(
   env=hub,
   role=role,
   transformers_version="4.26",
   pytorch_version="1.13",
   py_version="py39",
)

# Generate a unique endpoint name
time_stamp = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
endpoint_name = f"KoSimCSE-roberta-{time_stamp}"

# Function to deploy the model
def deploy_model():
    predictor = huggingface_model.deploy(
       initial_instance_count=1,
       endpoint_name=endpoint_name,
       instance_type="ml.g5.2xlarge"
    )
    print(f"Endpoint created: {endpoint_name}")

# Create a thread to run the deploy function in the background
deploy_thread = threading.Thread(target=deploy_model)

# Start the thread
deploy_thread.start()

# Optional: If you want to wait for the thread to complete
# deploy_thread.join()

print("Deployment is in progress in the background...")

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml Deployment is in progress in the background…

import time
from IPython.display import display, HTML
def make_console_link(region, endpoint_name, task='[SageMaker LLM Serving]'):
    endpoint_link = f' {task} Check Endpoint Status'   
    return endpoint_link

def describe_endpoint(endpoint_name):
    '''
    엔드폰인트 생성 유무를 확인. 생성 중이면 기다림.
    '''
    sm_client = boto3.client("sagemaker")

    while(True):
        response = sm_client.describe_endpoint(
            EndpointName= endpoint_name
        )    
        status = response['EndpointStatus']
        if status == 'Creating':
            print("Endpoint is ", status)
            time.sleep(60)
        else:
            print("Endpoint is ", status)
            break

sess = sagemaker.session.Session()  # sagemaker session for interacting with different AWS APIs
region = sess._region_name  # region name of the current SageMaker Studio environment

endpoint_link = make_console_link(region, endpoint_name)
display(HTML(endpoint_link))

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml

[SageMaker LLM Serving] Check Endpoint Status

%%time
describe_endpoint(endpoint_name)

Endpoint is Creating --Endpoint is Creating --Endpoint is Creating --Endpoint is Creating --Endpoint is Creating -!Endpoint created: KoSimCSE-roberta-2023-10-30-04-30-24 Endpoint is InService CPU times: user 141 ms, sys: 13.5 ms, total: 155 ms Wall time: 5min

SageMaker 임베딩 모델 테스트

그대로 세이지메이커 스튜디오에서 진행하여도되고, 개인 환경에서 진행해도 무방합니다.

저는 개인환경에서 호출해보도록 하겠습니다. (AWS Configure 필요)

필요한 라이브러리 임포트 후 정상적으로 동작하는지 확인합니다.

import json
import boto3
import numpy as np
from sagemaker.predictor import Predictor
endpoint_name = "KoSimCSE-roberta-2023-10-30-04-30-24"
predictor = Predictor(endpoint_name=endpoint_name)
predictor

agemaker.config INFO - Not applying SDK defaults from location: /Library/Application Support/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /Users/hyeonminkim/Library/Application Support/sagemaker/config.yaml

<sagemaker.base_predictor.Predictor at 0x12fbb2c50>

boto3 invoke_endpoint() 메서드를 사용하여 inferecne(추론) 해보겠습니다.

def query_endpoint_embedding_with_json_payload(encoded_json, endpoint_name, content_type="application/json"):
    client = boto3.client("runtime.sagemaker")
    response = client.invoke_endpoint(
        EndpointName=endpoint_name, ContentType=content_type, Body=encoded_json
    )
    return response

def transform_output(output: bytes) -> str:
    response_json = json.loads(output.read().decode("utf-8"))
    # return response_json
    return response_json[0][0]
sentences2_1 = "타기관OTP 이용등록방법 알려주세요"
sentences2_2 = "OTP 이용등록방법 알려주세요"

payload_2_1 = {
    "inputs" : sentences2_1
}

payload_2_2 = {
    "inputs" : sentences2_2
}

# 첫번째 문장
query_response = query_endpoint_embedding_with_json_payload(
    json.dumps(payload_2_1).encode("utf-8"), endpoint_name=endpoint_name
)

emb_1 = transform_output(query_response['Body'])
print("첫문장 임베딩 사이즈: ", len(emb_1))

# 두번째 문장
query_response = query_endpoint_embedding_with_json_payload(
    json.dumps(payload_2_2).encode("utf-8"), endpoint_name=endpoint_name
)
 
emb_2 = transform_output(query_response['Body'])
print("두번째 문장 임베딩 사이즈: ", len(emb_2))

첫문장 임베딩 사이즈: 768 두번째 문장 임베딩 사이즈: 768

import torch
def cal_score(a, b):
    '''
    코사인 유사도 구하는 함수
    '''
    if len(a.shape) == 1: a = a.unsqueeze(0)
    if len(b.shape) == 1: b = b.unsqueeze(0)

    a_norm = a / a.norm(dim=1)[:, None]
    b_norm = b / b.norm(dim=1)[:, None]
    return torch.mm(a_norm, b_norm.transpose(0, 1)) * 100
def show_embedding_score3(emb1, emb2):

    embeddings_0 = torch.Tensor(emb1) 
    embeddings_1 = torch.Tensor(emb2)

    score01 = cal_score(embeddings_0, embeddings_1)

    print(score01)

show_embedding_score3(emb_1, emb_2)

tensor([[84.7164]])

엔드포인트 삭제

다음 코드를 사용해 생성하였던 엔드포인트를 삭제할 수 있습니다.

class clean_up():
    
    def __init__(self, ):    
        pass
    
    def delete_endpoint(self, client, endpoint_name ,is_del_model=True):
        
        response = client.describe_endpoint(EndpointName=endpoint_name)
        EndpointConfigName = response['EndpointConfigName']

        response = client.describe_endpoint_config(EndpointConfigName=EndpointConfigName)
        model_name = response['ProductionVariants'][0]['ModelName']    

        if is_del_model: # 모델도 삭제 여부 임.
            client.delete_model(ModelName=model_name)    

        client.delete_endpoint(EndpointName=endpoint_name)
        client.delete_endpoint_config(EndpointConfigName=EndpointConfigName)    

        print(f'--- Deleted model: {model_name}')
        print(f'--- Deleted endpoint: {endpoint_name}')
        print(f'--- Deleted endpoint_config: {EndpointConfigName}')
endpoint_name_emb = endpoint_name
%store endpoint_name_emb
clean = clean_up()
sm_client = boto3.client('sagemaker')

## 2.training 
clean.delete_endpoint(sm_client, endpoint_name ,is_del_model=True)