TensorFlow实现聊天机器人(Chatbot)的完整代码解析

2025-06发布3次浏览

聊天机器人(Chatbot)是人工智能领域的重要应用之一,结合深度学习技术可以实现更智能的对话系统。本篇文章将深入解析如何使用TensorFlow框架构建一个简单的基于序列到序列(Seq2Seq)模型的聊天机器人,并提供完整代码及详细解析。

1. 环境准备

首先,确保安装了Python环境以及必要的库。主要依赖包括:

  • TensorFlow (推荐版本 >= 2.x)
  • NumPy
  • pandas

安装这些库可以通过pip完成:

pip install tensorflow numpy pandas

2. 数据准备

聊天机器人的训练需要大量的对话数据。这里假设我们已经有一组问答对的数据集,例如Cornell Movie Dialogs Corpus。数据预处理步骤包括清洗、分词、向量化等。

数据清洗与分词

import re
import numpy as np

def preprocess_sentence(sentence):
    sentence = sentence.lower().strip()
    # 创建一个字典来保存每个单词及其出现次数
    sentence = re.sub(r"([?.!,])", r" \1 ", sentence)
    sentence = re.sub(r'[" "]+', " ", sentence)
    sentence = re.sub(r"[^a-zA-Z?.!,]+", " ", sentence)
    sentence = sentence.strip()
    return sentence

# 示例:加载和清理数据
questions = ["How are you?", "What's your name?"]
answers = ["I'm fine, thank you.", "I am a chatbot."]

clean_questions = [preprocess_sentence(q) for q in questions]
clean_answers = [preprocess_sentence(a) for a in answers]

3. 构建词汇表

为了将文本转换为模型可理解的数字形式,我们需要构建一个词汇表。

from collections import Counter

def build_vocab(sentences, threshold=5):
    word_counts = Counter()
    for sentence in sentences:
        for word in sentence.split():
            word_counts[word] += 1
    
    vocab = [word for word, count in word_counts.items() if count >= threshold]
    vocab = sorted(vocab)
    vocab.insert(0, '<PAD>')
    vocab.insert(1, '<START>')
    vocab.insert(2, '<END>')
    vocab.insert(3, '<UNK>')
    return vocab

vocab = build_vocab(clean_questions + clean_answers)
word2idx = {w: i for i, w in enumerate(vocab)}
idx2word = {i: w for i, w in enumerate(vocab)}

4. 序列到序列模型(Seq2Seq)

Seq2Seq模型通常由编码器和解码器两部分组成。我们将使用LSTM作为编码器和解码器的核心组件。

模型定义

import tensorflow as tf

class Encoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, enc_units, batch_sz):
        super(Encoder, self).__init__()
        self.batch_sz = batch_sz
        self.enc_units = enc_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.lstm = tf.keras.layers.LSTM(self.enc_units,
                                        return_sequences=True,
                                        return_state=True,
                                        recurrent_initializer='glorot_uniform')

    def call(self, x, hidden):
        x = self.embedding(x)
        output, state_h, state_c = self.lstm(x, initial_state=hidden)
        return output, state_h, state_c

    def initialize_hidden_state(self):
        return [tf.zeros((self.batch_sz, self.enc_units)), 
                tf.zeros((self.batch_sz, self.enc_units))]

class Decoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, dec_units, batch_sz):
        super(Decoder, self).__init__()
        self.batch_sz = batch_sz
        self.dec_units = dec_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.lstm = tf.keras.layers.LSTM(self.dec_units,
                                        return_sequences=True,
                                        return_state=True,
                                        recurrent_initializer='glorot_uniform')
        self.fc = tf.keras.layers.Dense(vocab_size)

    def call(self, x, hidden, enc_output):
        x = self.embedding(x)
        output, state_h, state_c = self.lstm(x, initial_state=hidden)
        output = tf.reshape(output, (-1, output.shape[2]))
        x = self.fc(output)
        return x, state_h, state_c

embedding_dim = 256
units = 1024
BATCH_SIZE = 64
VOCAB_SIZE = len(vocab)

encoder = Encoder(VOCAB_SIZE, embedding_dim, units, BATCH_SIZE)
decoder = Decoder(VOCAB_SIZE, embedding_dim, units, BATCH_SIZE)

训练过程

在训练过程中,我们将输入句子传递给编码器,然后将编码器的输出状态传递给解码器以生成目标句子。

optimizer = tf.keras.optimizers.Adam()
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')

def loss_function(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss_ = loss_object(real, pred)
    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask
    return tf.reduce_mean(loss_)

@tf.function
def train_step(inp, targ, enc_hidden):
    loss = 0
    with tf.GradientTape() as tape:
        enc_output, enc_hidden_h, enc_hidden_c = encoder(inp, enc_hidden)
        dec_hidden = [enc_hidden_h, enc_hidden_c]
        dec_input = tf.expand_dims([word2idx['<start>']] * BATCH_SIZE, 1)
        
        for t in range(1, targ.shape[1]):
            predictions, dec_hidden_h, dec_hidden_c = decoder(dec_input, dec_hidden, enc_output)
            loss += loss_function(targ[:, t], predictions)
            dec_hidden = [dec_hidden_h, dec_hidden_c]
            dec_input = tf.expand_dims(targ[:, t], 1)
    
    batch_loss = (loss / int(targ.shape[1]))
    variables = encoder.trainable_variables + decoder.trainable_variables
    gradients = tape.gradient(loss, variables)
    optimizer.apply_gradients(zip(gradients, variables))
    return batch_loss

5. 模型评估

在模型训练完成后,我们可以使用它来生成回复。

def evaluate(sentence):
    sentence = preprocess_sentence(sentence)
    inputs = [word2idx[i] for i in sentence.split(' ')]
    inputs = tf.keras.preprocessing.sequence.pad_sequences([inputs], maxlen=max_length_inp, padding='post')
    inputs = tf.convert_to_tensor(inputs)
    
    result = ''
    hidden = [tf.zeros((1, units)), tf.zeros((1, units))]
    enc_out, enc_hidden_h, enc_hidden_c = encoder(inputs, hidden)
    
    dec_hidden = [enc_hidden_h, enc_hidden_c]
    dec_input = tf.expand_dims([word2idx['<start>']], 0)
    
    for t in range(max_length_targ):
        predictions, dec_hidden_h, dec_hidden_c = decoder(dec_input, dec_hidden, enc_out)
        predicted_id = tf.argmax(predictions[0]).numpy()
        result += idx2word[predicted_id] + ' '
        if idx2word[predicted_id] == '<end>':
            return result
        dec_input = tf.expand_dims([predicted_id], 0)
    
    return result

# 测试示例
print(evaluate("how are you"))