파이토치 간단한 ai 챗봇 소스 트랜스포머 모델이 아니라 성능은 안 좋음 초보자 분석용으로 추천 ~ > ai 자유게시판

본문 바로가기
사이트 내 전체검색

ai 자유게시판

파이토치 간단한 ai 챗봇 소스 트랜스포머 모델이 아니라 성능은 안 좋음 초보자 분석용으로 추천 ~

페이지 정보

profile_image
작성자 최고관리자
댓글 0건 조회 86회 작성일 24-04-25 04:18

본문

import torch
import torch.nn as nn

from io import open
import string

import re
import time
import random
import torchtext

from torchtext import data
import torch.optim as optim
from collections import Counter

from torch.utils.data import DataLoader, TensorDataset
from torch.utils.data import DataLoader, Dataset



device = 'cuda' if torch.cuda.is_available() else 'cpu'

# for reproducibility
torch.manual_seed(777)
if device == 'cuda':
    torch.cuda.manual_seed_all(777)


#####

MAX_LENGTH = 100  #문장의 최대 길이
max_vocab_size=150000


optimizer_learning_rate  = 0.03  # modified learning rate from 0.01 to 1
drop_out_rate = 0.01

# 파라미터 설정
embedding_dim = 1200
batch_size = 32
learning_rate = 0.002
num_epochs = 30

#####
import gc
gc.collect()



# 소문자, 다듬기, 그리고 문자가 아닌 문자 제거


def normalizeString(s):

    s = re.sub(r"([.!?])", r" \1", s)
    s = re.sub(r"[^a-zA-Zㄱ-ㅎㅏ-ㅣ가-힣.!?]+", r" ", s)
    return s



import requests

def readLangs():
    print("Reading lines...")

    response = requests.get('https://magu.co.kr/ai/sort-talk-and-gam.htm')
    lines = response.text.strip().split('\n')

    # 파일을 읽고 줄로 분리
    #lines = open(g_sourcetext, encoding='utf-8').\
    #    read().strip().split('\n')

    #print(lines)


    # 모든 줄을 쌍으로 분리하고 정규화
    pairs = [[normalizeString(s) for s in l.split('\t')] for l in lines]

    #print(pairs)
    temp = 0
    temp2 = len(pairs)/2
    input_lang = []
    output_lang = []

    for i in range(int(temp2)):  #8이면 4

        input_lang = input_lang + pairs[temp]  #0,2,4,6,
        output_lang = output_lang+pairs[temp+1]
        temp = temp +2



    return input_lang, output_lang, pairs



######################################################################
# 데이터 준비를 위한 전체 과정:
#
# -  텍스트 파일을 읽고 줄로 분리하고, 줄을 쌍으로 분리합니다.
# -  쌍을 이룬 문장들로 단어 리스트를 생성합니다.


def prepareData():
    input_lang, output_lang, pairs = readLangs()
    print("Read %s sentence pairs" % len(pairs))

    return input_lang, output_lang, pairs



inputs_data, outpus_data, pairs = prepareData()

print("outpus_data[5]", outpus_data[5])




#########################################



# 어휘 구성
def build_vocabulary(data, max_vocab_size):
    counter = Counter()
    for sentence in data:
        tokens = sentence.split()  # 단어 수준으로 토큰화
                                  # 중복을 제거하고 고유한 토큰 집합을 생성
        unique_tokens = list(set(tokens))

        counter.update(unique_tokens)

    # 가장 많이 나타나는 단어들을 선택하여 어휘를 구성
    vocab = {word: idx for idx, (word, count) in enumerate(counter.most_common(max_vocab_size))}
    vocab['<PAD>'] = len(vocab)  # 패딩 토큰을 어휘에 추가
    vocab['<UNK>'] = len(vocab)

    vocab2text = {idx: word for word, idx in vocab.items()}

    return vocab,vocab2text


g_vocab, g_vocab2text = build_vocabulary(inputs_data+ outpus_data, max_vocab_size)



print("len(g_vocab2text)",len(g_vocab2text))

# 토큰화 및 어휘 인덱싱
def tokenize_and_index(data, vocab):

    print("data[5]",data[5])
    original_word = data[5]


    tokenized_data = []
    for sentence in data:
        tokens = sentence.split()  # 단어 수준으로 토큰화
        indices = [vocab.get(token, vocab.get('<UNK>', len(vocab))) for token in tokens]
        tokenized_data.append(indices)


    print("tokenized_data[5]",tokenized_data[5])
    original_word = tokenized_data[5]
    original_words =""
    for i in range(len(original_word)):
            # 각 요소를 추출하여 스칼라 값으로 변환 (필요하면)
        original_words = original_words + " "+  g_vocab2text[original_word[i]]
    print("original_word len(original_word)",len(original_word))
    print("original_word",original_words)

    return tokenized_data



# 패딩
def pad_sequences(data, pad_idx, max_length):
    padded_data = []
    for sequence in data:
        padded_sequence = sequence[:max_length]
        padded_sequence += [pad_idx] * (max_length - len(padded_sequence))
        padded_data.append(padded_sequence)
    return torch.tensor(padded_data)

# 데이터 준비 단계
def prepare_data(questions, answers, max_length=MAX_LENGTH):
    # 어휘 구성
   
    pad_idx = g_vocab['<PAD>']  # 패딩 인덱스

    # 질문과 답변을 토큰화하고 어휘 인덱스에 맞게 변환
    questions_indexed = tokenize_and_index(questions, g_vocab)
    answers_indexed = tokenize_and_index(answers, g_vocab)

    # 패딩 적용
    questions_padded = pad_sequences(questions_indexed, pad_idx, max_length)
    answers_padded = pad_sequences(answers_indexed, pad_idx, max_length)

    return questions_padded, answers_padded


# 데이터 준비
questions_padded, answers_padded = prepare_data(inputs_data, outpus_data)




def adjust_data_lengths(questions_padded, answers_padded):
    # 두 데이터셋의 행 수를 비교
    num_questions = questions_padded.shape[0]
    num_answers = answers_padded.shape[0]

    # 데이터셋의 행 수를 짧은 것에 맞추어 조정
    if num_questions < num_answers:
        # answers_padded를 짧은 길이에 맞춰 자르기
        answers_padded = answers_padded[:num_questions]
    elif num_questions > num_answers:
        # questions_padded를 짧은 길이에 맞춰 자르기
        questions_padded = questions_padded[:num_answers]

    return questions_padded, answers_padded

# 데이터의 행 크기 조정 행 크기 동일하게 짧은 길이에 맞게 자르기
questions_padded, answers_padded = adjust_data_lengths(questions_padded, answers_padded)



# 결과 확인
#print(inputs_data)
print("질문 패딩 후:", questions_padded.size())
print("답변 패딩 후:", answers_padded.size())
#print("어휘:", vocab)




# 데이터셋 클래스
class ChatbotDataset(Dataset):
    def __init__(self, questions, answers):
        self.questions = questions
        self.answers = answers

    def __len__(self):
        return len(self.questions)

    def __getitem__(self, idx):
        return self.questions[idx], self.answers[idx]




# 모델 정의
class ChatbotModel(nn.Module):
    def __init__(self):
        super(ChatbotModel, self).__init__()
        self.embedding = nn.Embedding(len(g_vocab2text), embedding_dim)  #  임베딩 레이어
        self.elu = nn.ELU(alpha=1.0)
        self.dropout = nn.Dropout(drop_out_rate)
        self.output_layer = nn.Linear(embedding_dim, len(g_vocab2text))  # 출력 레이어:
       
       

    def forward(self, x):
        x = self.embedding(x)  # 임베딩 레이어를 통해 입력 처리
        x= self.elu(x)
        x = self.dropout(x)
        output = self.output_layer(x)  # 출력 레이어를 통해 최종 출력 계산
        return output




# 모델 생성
model = ChatbotModel().to(device)

# 손실 함수 및 옵티마이저 설정
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# 데이터셋을 미리 지정한 장치로 이동시킵니다.
questions_padded = questions_padded.to(device)
answers_padded = answers_padded.to(device)



# 데이터셋과 데이터 로더 생성
dataset = ChatbotDataset(questions_padded, answers_padded)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)


# 문장에서 <pad> 토큰을 삭제하는 함수
def remove_pad_from_sentence(sentence, pad_token='<PAD>'):
    # str.replace() 메서드를 사용하여 <pad> 토큰을 빈 문자열로 대체하여 삭제
    cleaned_sentence = sentence.replace(pad_token, "")
    return cleaned_sentence.strip()  # 문장 양쪽 공백을 제거하여 깔끔한 문자열 반환

import time

# 모델 훈련
for epoch in range(num_epochs):
    total_loss = 0

    start_time = time.time()  # 시작 시간 기록
    # 배치별로 데이터 처리
    #for question_batch, answer_batch in dataloader:
    for i, (question_batch, answer_batch) in enumerate(dataloader):


        start_time = time.time()  # 시작 시간 기록
       
        # 모델의 예측 수행
        output_batch = model(question_batch)

        end_time = time.time()  # 종료 시간 기록

        elapsed_time = end_time - start_time  # 총 학습 시간 계산
        print(f"i {i} 배치 크기 {batch_size}: 학습 시간 {elapsed_time:.4f} 초")
   

        loss = criterion(output_batch.view(-1, len(g_vocab2text)), answer_batch.view(-1))
        total_loss += loss.item()

        # 옵티마이저 초기화 및 가중치 업데이트
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    end_time = time.time()  # 종료 시간 기록

    elapsed_time = end_time - start_time  # 총 학습 시간 계산
    print(f"배치 크기 {batch_size}: 학습 시간 {elapsed_time:.4f} 초")

    # 각 에포크별 손실 출력
    if epoch % 1 == 0:

        print(f'Epoch {epoch + 1}, Loss: {total_loss / len(dataloader):.4f}')



        sorted_probs, sorted_indices = torch.sort(output_batch[1], dim=-1, descending=True)
   

        _, predicted_index = torch.max(output_batch[2], dim=-1)
        #가장 확률 높은 단어 추출

        predicted_word = ""
        for i in range(len(predicted_index)):
            predicted_word = predicted_word + " "+  g_vocab2text[predicted_index[i].item()]
        print("predicted_word",remove_pad_from_sentence(predicted_word))


        original_word = answer_batch[2]
        question_word = question_batch[2]

        question_words =""
        for i in range(30):
            question_words = question_words + " "+  g_vocab2text[question_word[i].item()]
        print("question_word", remove_pad_from_sentence(question_words))

        original_words =""
        for i in range(30):
            original_words = original_words + " "+  g_vocab2text[original_word[i].item()]
        print("original_word", remove_pad_from_sentence(original_words))



########################################
#모델 저장
       
import re, os
save_dir = "saved_models"
os.makedirs(save_dir, exist_ok=True)

#모델 저장
model_save_path = os.path.join(save_dir, "chatbot_model.pth")
torch.save(
    {
        "model_state_dict": model.state_dict(),
        "g_vocab": g_vocab,  # 어휘 정보 저장
        "g_vocab2text": g_vocab2text  # 인덱스에서 단어로 변환하기 위한 사전 저장
    },
    model_save_path,
)

########################################
#모델 로드
model2 = ChatbotModel().to(device)

model_load_path = os.path.join(save_dir, "chatbot_model.pth")
checkpoint = torch.load(model_load_path)
model2.load_state_dict(checkpoint["model_state_dict"])

# 어휘 정보 로드
vocab = checkpoint["g_vocab"]
vocab2text_loaded = checkpoint["g_vocab2text"]


#tokenized_data= 'ㅋㅋ 마음 같아서는 대신 먹어주고 싶다 이거죠 .'
tokenized_data = '[1, 460, 461, 462, 463, 77, 464, 0,0,0,0,0,0,0,0,0,0,  0,0,0,0,0,0,0,0,0,0,  0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0,]'

tokenized_data_list = eval(tokenized_data)

# 리스트를 파이토치 텐서로 변환
tokenized_data_tensor = torch.tensor(tokenized_data_list).to(device)

print(tokenized_data_tensor)


output_a = model2(tokenized_data_tensor).to(device)

print("output_a", output_a,output_a.size())


_, predicted_index = torch.max(output_a, dim=-1)

       
predicted_word =""

for i in range(50):
    predicted_word = predicted_word + " "+  vocab2text_loaded[predicted_index[i].item()]
print("answer", remove_pad_from_sentence(predicted_word))

첨부파일

댓글목록

등록된 댓글이 없습니다.


회사소개 개인정보취급방침 서비스이용약관 모바일 버전으로 보기 상단으로

Magu.co.kr. All rights reserved.