Navigation

    Gpushare.com

    • Register
    • Login
    • Search
    • Popular
    • Categories
    • Recent
    • Tags

    Salute!Seq2Seq的PyTorch实现

    语音识别与语义处理领域
    1
    1
    44
    Loading More Posts
    • Oldest to Newest
    • Newest to Oldest
    • Most Votes
    Reply
    • Reply as topic
    Log in to reply
    This topic has been deleted. Only users with topic management privileges can see it.
    • 155****7220
      155****7220 last edited by

      B站视频讲解

      本文介绍一下如何使用 PyTorch 复现 Seq2Seq,实现简单的机器翻译应用,请先简单阅读论文Learning Phrase Representations using RNN Encoder–Decoder for Statistical Machine Translation(2014),了解清楚Seq2Seq结构是什么样的,之后再阅读本篇文章,可达到事半功倍的效果

      我看了很多Seq2Seq网络结构图,感觉PyTorch官方提供的这个图是最好理解的

      首先,从上面的图可以很明显的看出,Seq2Seq需要对三个变量进行操作,这和之前我接触到的所有网络结构都不一样。我们把Encoder的输入称为 enc_input,Decoder的输入称为 dec_input, Decoder的输出称为 dec_output。下面以一个具体的例子来说明整个Seq2Seq的工作流程

      下图是一个由LSTM组成的Encoder结构,输入的是"go away"中的每个字母(包括空格),我们只需要最后一个时刻隐藏状态的信息,即hth_tht​和ctc_tct​

      然后将Encoder输出的hth_tht​和ctc_tct​作为Decoder初始时刻隐藏状态的输入h0h_0h0​、c0c_0c0​,如下图所示。同时Decoder初始时刻输入层输入的是代表一个句子开始的标志(由用户定义,“<SOS>”,“\t”,“S"等均可,这里以”\t"为例),之后得到输出"m",以及新的隐藏状态h1h_1h1​和c1c_1c1​

      再将h1h_1h1​、c1c_1c1​和"m"作为输入,得到输入"a",以及新的隐藏状态h2h_2h2​和c2c_2c2​

      重复上述步骤,直到最终输出句子的结束标志(由用户定义,“<EOS>”,“\n”,“E"等均可,这里以”\n"为例)

      在Decoder部分,大家可能会有以下几个问题,我做下解答

      • 训练过程中,如果Decoder停不下来怎么办?即一直不输出句子的终止标志

        • 首先,训练过程中Decoder应该要输出多长的句子,这个是已知的,假设当前时刻已经到了句子长度的最后一个字符了,并且预测的不是终止标志,那也没有关系,就此打住,计算loss即可
      • 测试过程中,如果Decoder停不下来怎么办?例如预测得到"wasd s w \n sdsw \n…(一直输出下去)"

        • 不会停不下来的,因为测试过程中,Decoder也会有输入,只不过这个输入是很多个没有意义的占位符,例如很多个"<pad>“。由于Decoder有有限长度的输入,所以Decoder一定会有有限长度的输出。那么只需要获取第一个终止标志之前的所有字符即可,对于上面的例子,最终的预测结果为"wasd s w”
      • Decoder的输入和输出,即 dec_input 和 dec_output 有什么关系?

        • 在训练阶段,不论当前时刻Decoder输出什么字符,下一时刻Decoder都按照原来的"计划"进行输入。举个例子,假设 dec_input="\twasted",首先输入"\t"之后,Decoder输出的是"m"这个字母,记录下来就行了,并不会影响到下一时刻Decoder继续输入"w"这个字母
        • 在验证或者测试阶段,Decoder每一时刻的输出是会影响到输入的,因为在验证或者测试时,网络是看不到结果的,所以它只能循环的进行下去。举个例子,我现在要将英语"wasted"翻译为德语"verschwenden"。那么Decoder一开始输入"\t",得到一个输出,假如是"m",下一时刻Decoder会输入"m",得到输出,假如是"a",之后会将"a"作为输入,得到输出…如此循环往复,直到最终时刻

      这里说句题外话,其实我个人觉得Seq2Seq与AutoEncoder非常相似

      下面开始代码讲解

      首先导库,这里我用’S’作为开始标志,‘E’作为结束标志,如果输入或者输入过短,我使用’?'进行填充

      # code by Tae Hwan Jung(Jeff Jung) @graykode, modify by wmathor
      import torch
      import numpy as np
      import torch.nn as nn
      import torch.utils.data as Data
      
      device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
      # S: Symbol that shows starting of decoding input
      # E: Symbol that shows starting of decoding output
      # ?: Symbol that will fill in blank sequence if current batch data size is short than n_step
      

      定义数据集以及参数,这里数据集我设定的非常简单,可以看作是翻译任务,只不过是将英语翻译成英语罢了。

      n_step 保存的是最长单词的长度,其它所有不够这个长度的单词,都会在其后用’?'填充

      letter = [c for c in 'SE?abcdefghijklmnopqrstuvwxyz']
      letter2idx = {n: i for i, n in enumerate(letter)}
      
      seq_data = [['man', 'women'], ['black', 'white'], ['king', 'queen'], ['girl', 'boy'], ['up', 'down'], ['high', 'low']]
      
      # Seq2Seq Parameter
      n_step = max([max(len(i), len(j)) for i, j in seq_data]) # max_len(=5)
      n_hidden = 128
      n_class = len(letter2idx) # classfication problem
      batch_size = 3
      

      下面是对数据进行处理,主要做的是,首先对单词长度不够的,用’?‘进行填充;然后将Deocder的输入数据末尾添加终止标志’E’,Decoder的输入数据开头添加开始标志’S’,Decoder的输出数据末尾添加结束标志’E’,其实也就如下图所示

      def make_data(seq_data):
          enc_input_all, dec_input_all, dec_output_all = [], [], []
      
          for seq in seq_data:
              for i in range(2):
                  seq[i] = seq[i] + '?' * (n_step - len(seq[i])) # 'man??', 'women'
      
              enc_input = [letter2idx[n] for n in (seq[0] + 'E')] # ['m', 'a', 'n', '?', '?', 'E']
              dec_input = [letter2idx[n] for n in ('S' + seq[1])] # ['S', 'w', 'o', 'm', 'e', 'n']
              dec_output = [letter2idx[n] for n in (seq[1] + 'E')] # ['w', 'o', 'm', 'e', 'n', 'E']
      
              enc_input_all.append(np.eye(n_class)[enc_input])
              dec_input_all.append(np.eye(n_class)[dec_input])
              dec_output_all.append(dec_output) # not one-hot
      
          # make tensor
          return torch.Tensor(enc_input_all), torch.Tensor(dec_input_all), torch.LongTensor(dec_output_all)
      
      '''
      enc_input_all: [6, n_step+1 (because of 'E'), n_class]
      dec_input_all: [6, n_step+1 (because of 'S'), n_class]
      dec_output_all: [6, n_step+1 (because of 'E')]
      '''
      enc_input_all, dec_input_all, dec_output_all = make_data(seq_data)
      

      由于这里有三个数据要返回,所以需要自定义DataSet,具体来说就是继承torch.utils.data.Dataset类,然后实现里面的__len__以及__getitem__方法

      class TranslateDataSet(Data.Dataset):
          def __init__(self, enc_input_all, dec_input_all, dec_output_all):
              self.enc_input_all = enc_input_all
              self.dec_input_all = dec_input_all
              self.dec_output_all = dec_output_all
          
          def __len__(self): # return dataset size
              return len(self.enc_input_all)
          
          def __getitem__(self, idx):
              return self.enc_input_all[idx], self.dec_input_all[idx], self.dec_output_all[idx]
      
      loader = Data.DataLoader(TranslateDataSet(enc_input_all, dec_input_all, dec_output_all), batch_size, True)
      

      下面定义Seq2Seq模型,我用的是简单的RNN作为编码器和解码器。如果你对RNN比较了解的话,定义网络结构的部分其实没什么说的,注释我也写的很清楚了,包括数据维度的变化

      # Model
      class Seq2Seq(nn.Module):
          def __init__(self):
              super(Seq2Seq, self).__init__()
              self.encoder = nn.RNN(input_size=n_class, hidden_size=n_hidden, dropout=0.5) # encoder
              self.decoder = nn.RNN(input_size=n_class, hidden_size=n_hidden, dropout=0.5) # decoder
              self.fc = nn.Linear(n_hidden, n_class)
      
          def forward(self, enc_input, enc_hidden, dec_input):
              # enc_input(=input_batch): [batch_size, n_step+1, n_class]
              # dec_inpu(=output_batch): [batch_size, n_step+1, n_class]
              enc_input = enc_input.transpose(0, 1) # enc_input: [n_step+1, batch_size, n_class]
              dec_input = dec_input.transpose(0, 1) # dec_input: [n_step+1, batch_size, n_class]
      
              # h_t : [num_layers(=1) * num_directions(=1), batch_size, n_hidden]
              _, h_t = self.encoder(enc_input, enc_hidden)
              # outputs : [n_step+1, batch_size, num_directions(=1) * n_hidden(=128)]
              outputs, _ = self.decoder(dec_input, h_t)
      
              model = self.fc(outputs) # model : [n_step+1, batch_size, n_class]
              return model
      
      model = Seq2Seq().to(device)
      criterion = nn.CrossEntropyLoss().to(device)
      optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
      

      下面是训练,由于输出的pred是个三维的数据,所以计算loss需要每个样本单独计算,因此就有了下面for循环的代码

      for epoch in range(5000):
        for enc_input_batch, dec_input_batch, dec_output_batch in loader:
            # make hidden shape [num_layers * num_directions, batch_size, n_hidden]
            h_0 = torch.zeros(1, batch_size, n_hidden).to(device)
      
            (enc_input_batch, dec_intput_batch, dec_output_batch) = (enc_input_batch.to(device), dec_input_batch.to(device), dec_output_batch.to(device))
            # enc_input_batch : [batch_size, n_step+1, n_class]
            # dec_intput_batch : [batch_size, n_step+1, n_class]
            # dec_output_batch : [batch_size, n_step+1], not one-hot
            pred = model(enc_input_batch, h_0, dec_intput_batch)
            # pred : [n_step+1, batch_size, n_class]
            pred = pred.transpose(0, 1) # [batch_size, n_step+1(=6), n_class]
            loss = 0
            for i in range(len(dec_output_batch)):
                # pred[i] : [n_step+1, n_class]
                # dec_output_batch[i] : [n_step+1]
                loss += criterion(pred[i], dec_output_batch[i])
            if (epoch + 1) % 1000 == 0:
                print('Epoch:', '%04d' % (epoch + 1), 'cost =', '{:.6f}'.format(loss))
                
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
      

      从下面测试的代码可以看出,在测试过程中,Decoder的input是没有意义占位符,所占位置的长度即最大长度 n_step 。并且在输出中找到第一个终止符的位置,截取在此之前的所有字符

      # Test
      def translate(word):
          enc_input, dec_input, _ = make_data([[word, '?' * n_step]])
          enc_input, dec_input = enc_input.to(device), dec_input.to(device)
          # make hidden shape [num_layers * num_directions, batch_size, n_hidden]
          hidden = torch.zeros(1, 1, n_hidden).to(device)
          output = model(enc_input, hidden, dec_input)
          # output : [n_step+1, batch_size, n_class]
      
          predict = output.data.max(2, keepdim=True)[1] # select n_class dimension
          decoded = [letter[i] for i in predict]
          translated = ''.join(decoded[:decoded.index('E')])
      
          return translated.replace('?', '')
      
      print('test')
      print('man ->', translate('man'))
      print('mans ->', translate('mans'))
      print('king ->', translate('king'))
      print('black ->', translate('black'))
      print('up ->', translate('up'))
      

      完整代码如下

      # code by Tae Hwan Jung(Jeff Jung) @graykode, modify by wmathor
      import torch
      import numpy as np
      import torch.nn as nn
      import torch.utils.data as Data
      
      device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
      # S: Symbol that shows starting of decoding input
      # E: Symbol that shows starting of decoding output
      # ?: Symbol that will fill in blank sequence if current batch data size is short than n_step
      
      letter = [c for c in 'SE?abcdefghijklmnopqrstuvwxyz']
      letter2idx = {n: i for i, n in enumerate(letter)}
      
      seq_data = [['man', 'women'], ['black', 'white'], ['king', 'queen'], ['girl', 'boy'], ['up', 'down'], ['high', 'low']]
      
      # Seq2Seq Parameter
      n_step = max([max(len(i), len(j)) for i, j in seq_data]) # max_len(=5)
      n_hidden = 128
      n_class = len(letter2idx) # classfication problem
      batch_size = 3
      
      def make_data(seq_data):
          enc_input_all, dec_input_all, dec_output_all = [], [], []
      
          for seq in seq_data:
              for i in range(2):
                  seq[i] = seq[i] + '?' * (n_step - len(seq[i])) # 'man??', 'women'
      
              enc_input = [letter2idx[n] for n in (seq[0] + 'E')] # ['m', 'a', 'n', '?', '?', 'E']
              dec_input = [letter2idx[n] for n in ('S' + seq[1])] # ['S', 'w', 'o', 'm', 'e', 'n']
              dec_output = [letter2idx[n] for n in (seq[1] + 'E')] # ['w', 'o', 'm', 'e', 'n', 'E']
      
              enc_input_all.append(np.eye(n_class)[enc_input])
              dec_input_all.append(np.eye(n_class)[dec_input])
              dec_output_all.append(dec_output) # not one-hot
      
          # make tensor
          return torch.Tensor(enc_input_all), torch.Tensor(dec_input_all), torch.LongTensor(dec_output_all)
      
      '''
      enc_input_all: [6, n_step+1 (because of 'E'), n_class]
      dec_input_all: [6, n_step+1 (because of 'S'), n_class]
      dec_output_all: [6, n_step+1 (because of 'E')]
      '''
      enc_input_all, dec_input_all, dec_output_all = make_data(seq_data)
      
      class TranslateDataSet(Data.Dataset):
          def __init__(self, enc_input_all, dec_input_all, dec_output_all):
              self.enc_input_all = enc_input_all
              self.dec_input_all = dec_input_all
              self.dec_output_all = dec_output_all
          
          def __len__(self): # return dataset size
              return len(self.enc_input_all)
          
          def __getitem__(self, idx):
              return self.enc_input_all[idx], self.dec_input_all[idx], self.dec_output_all[idx]
      
      loader = Data.DataLoader(TranslateDataSet(enc_input_all, dec_input_all, dec_output_all), batch_size, True)
      
      # Model
      class Seq2Seq(nn.Module):
          def __init__(self):
              super(Seq2Seq, self).__init__()
              self.encoder = nn.RNN(input_size=n_class, hidden_size=n_hidden, dropout=0.5) # encoder
              self.decoder = nn.RNN(input_size=n_class, hidden_size=n_hidden, dropout=0.5) # decoder
              self.fc = nn.Linear(n_hidden, n_class)
      
          def forward(self, enc_input, enc_hidden, dec_input):
              # enc_input(=input_batch): [batch_size, n_step+1, n_class]
              # dec_inpu(=output_batch): [batch_size, n_step+1, n_class]
              enc_input = enc_input.transpose(0, 1) # enc_input: [n_step+1, batch_size, n_class]
              dec_input = dec_input.transpose(0, 1) # dec_input: [n_step+1, batch_size, n_class]
      
              # h_t : [num_layers(=1) * num_directions(=1), batch_size, n_hidden]
              _, h_t = self.encoder(enc_input, enc_hidden)
              # outputs : [n_step+1, batch_size, num_directions(=1) * n_hidden(=128)]
              outputs, _ = self.decoder(dec_input, h_t)
      
              model = self.fc(outputs) # model : [n_step+1, batch_size, n_class]
              return model
      
      model = Seq2Seq().to(device)
      criterion = nn.CrossEntropyLoss().to(device)
      optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
      
      for epoch in range(5000):
        for enc_input_batch, dec_input_batch, dec_output_batch in loader:
            # make hidden shape [num_layers * num_directions, batch_size, n_hidden]
            h_0 = torch.zeros(1, batch_size, n_hidden).to(device)
      
            (enc_input_batch, dec_intput_batch, dec_output_batch) = (enc_input_batch.to(device), dec_input_batch.to(device), dec_output_batch.to(device))
            # enc_input_batch : [batch_size, n_step+1, n_class]
            # dec_intput_batch : [batch_size, n_step+1, n_class]
            # dec_output_batch : [batch_size, n_step+1], not one-hot
            pred = model(enc_input_batch, h_0, dec_intput_batch)
            # pred : [n_step+1, batch_size, n_class]
            pred = pred.transpose(0, 1) # [batch_size, n_step+1(=6), n_class]
            loss = 0
            for i in range(len(dec_output_batch)):
                # pred[i] : [n_step+1, n_class]
                # dec_output_batch[i] : [n_step+1]
                loss += criterion(pred[i], dec_output_batch[i])
            if (epoch + 1) % 1000 == 0:
                print('Epoch:', '%04d' % (epoch + 1), 'cost =', '{:.6f}'.format(loss))
                
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
          
      # Test
      def translate(word):
          enc_input, dec_input, _ = make_data([[word, '?' * n_step]])
          enc_input, dec_input = enc_input.to(device), dec_input.to(device)
          # make hidden shape [num_layers * num_directions, batch_size, n_hidden]
          hidden = torch.zeros(1, 1, n_hidden).to(device)
          output = model(enc_input, hidden, dec_input)
          # output : [n_step+1, batch_size, n_class]
      
          predict = output.data.max(2, keepdim=True)[1] # select n_class dimension
          decoded = [letter[i] for i in predict]
          translated = ''.join(decoded[:decoded.index('E')])
      
          return translated.replace('?', '')
      
      print('test')
      print('man ->', translate('man'))
      print('mans ->', translate('mans'))
      print('king ->', translate('king'))
      print('black ->', translate('black'))
      print('up ->', translate('up'))
      
      1 Reply Last reply Reply Quote 1
      • First post
        Last post