自然语言处理(实现bigram)
-
题目描述
思路
先观察下给定的语料库是什么样的
从上图可以看出,每一行语料有三列,第一列表示这个句子的ID,第二列是句子,第三列是第二列句子的规范形式。具体来说,第二列句子中有可能会出现阿拉伯数字等一些特殊字符,那么第三列就会将这些字符转换成英文读音(例如将1455改写为fourteen fifty-five)
了解完语料之后整理一下思路。我们需要得到的bigram是一张大表,即一个n×nn\times nn×n的矩阵,其中nnn表示不重复的单词个数。这个矩阵第iii行第jjj列的值表示:前一个词是wiw_iwi,当前词是wjw_jwj的概率。例如下面这个矩阵,第一行第二列表示,前一个词是iii,当前词是wantwantwant的概率为0.33
直接计算这个概率似乎是非常难的,我们应该先计算频次,即同样是这个n×nn\times nn×n的矩阵,但这个矩阵里的值不再是频率,而是频次。例如下面这个矩阵,第二行第三列表示,前一个词是wantwantwant,当前词是tototo总共出现了608次
有了这个频次表之后,只需再统计一下每个词出现的次数,用这个频次表的每一行除以每个词出现的次数,就得到频率了。例如下面是所有词出现的次数
代码
具体的代码实现中有很多细节,例如单词大小写,标点符号处理,以及平滑方法等等
首先获取第三列的句子,将其去除标点符号,并且将所有单词转为小写(因为大小写不同的单词会被认为是两个不同的单词,这样在统计的时候似乎不太合理),并且在句子的开头和结尾分别添加上
<s>
和</s>
import re import numpy as np def removePunctuation(sentence_str): '''给定字符串,进行特殊符号过滤,以及小写转换 Args: sentence_str (str): 句子 Returns: sentence_str.lower() (str): 预处理过的句子 ''' punctuation = '.!,;:?\-"\'' sentence_str = re.sub(r'[{}]+'.format(punctuation),'',sentence_str) return sentence_str.lower() def get_sentence(filename): '''从给定文件中获取句子 Args: filename (str): 语料库的文件名 Returns: sentences_list (list): 1-D,存放了所有经过预处理后的句子 ''' sentences_list = [] with open(filename, encoding='utf-8') as f: for line in f.readlines(): sentence_str = removePunctuation(line.strip().split('|')[-1]) sentences_list.append('<s> ' + sentence_str + ' </s>') return sentences_list
接着统计每个单词出现的次数
def count_word(sentences_list): '''给定大量句子,统计出所有单词出现的频次 Args: sentences_list (list): 所有经过预处理后的句子 Returns: wordcount_dict (dict): 键是str类型,表示单词;值是int类型,表示次数,例如{'the': 1234} ''' wordcount_dict = {} for sentence_str in sentences_list: for word in sentence_str.split(): if word in wordcount_dict: wordcount_dict[word] += 1 else: wordcount_dict[word] = 1 return wordcount_dict
构建一个二维矩阵,不论是用list还是numpy,他们进行索引的方式都是下标,而无法直接使用单词。所以我们应该先构建一个单词到索引的映射,以及索引到单词的映射
def word2idx(wordcount_dict): '''构建单词到索引的映射与逆映射 Args: wordcount_dict (dict): 键是str类型,表示单词;值是int类型,表示次数 Returns: word2idx_dict (dict): 键是str类型,表示单词;值是int类型,表示索引,例如{'the': 0} idx2word_dict (dict): 键是int类型,表示索引;值是str类型,表示单词,例如{0: 'the'} ''' word2idx_dict = {} idx2word_dict = {} for idx, word in enumerate(list(wordcount_dict.keys())): word2idx_dict[word] = idx idx2word_dict[idx] = word return word2idx_dict, idx2word_dict
接下来要做的就是统计两个单词同时出现的次数。基本做法就是遍历每个句子,同时遍历句子中的每个单词。记前一个词为wiw_iwi,当前词为wjw_jwj,通过
word2idx_dict
查得wiw_iwi对应的索引为iii,wjw_jwj对应的索引为jjj,则矩阵中(i,j)(i,j)(i,j)位置的值就加1。最后我还设置了一个可选项,如果用户想要使用加一平滑,那一开始就生成一个全1的矩阵;如果不用平滑,一开始生成的是全0的矩阵def c_table(word2idx_dict, sentences_list, smooth=False): '''构建两个单词连续出现的频次矩阵 Args: word2idx_dict (dict): 键是str类型,表示单词;值是int类型,表示索引 sentences_list (list): 所有经过预处理后的句子 smooth (bool): 是否进行加一平滑 Returns: c_table_np (numpy): 2-D,c_table_np[i][j] = a表示 前一个索引为i的词和当前索引为j的词 同时出现的次数为a ''' n = len(word2idx_dict) # 单词个数 if smooth: # 加一平滑 c_table_np = np.ones((n, n)) # n*n 全1矩阵 else: c_table_np = np.zeros((n, n)) # n*n 全0矩阵 for sentence_str in sentences_list: words_list = sentence_str.split() # ['i', 'like', 'apple'] for i in range(1, len(words_list)): w_i = word2idx_dict[words_list[i]] # w_i w_j = word2idx_dict[words_list[i-1]] # w_{i-1} c_table_np[w_j][w_i] += 1 return c_table_np
最后用上述生成的
c_table_np
的每一行同除以wordcount_dict
中的每个值即可,下面代码利用了numpy的广播机制,加快了运算速度def compute_bigram_table(c_table_np, wordcount_dict): '''构建bigram概率矩阵 Args: c_table_np (numpy): bigram频次矩阵 wordcount_dict (dict): 所有单词出现的次数 Returns: c_table_np / count_np[:, None] (numpy): 2-D,bigram概率矩阵 ''' count_np = np.array(list(wordcount_dict.values())) # [800, 900, 788, ...] return c_table_np / count_np[:, None]
如果想要获取一个句子的bigram概率,连乘即可
def compute_sentence_bigram(bigram_table_np, word2idx_dict, sentences_list): '''计算每个句子的bigram概率 Args: bigram_table_np (numpy): bigram概率矩阵 word2idx_dict (dict): 单词到索引的映射 sentences_list (list): 预处理后的句子 Returns: scores_list (list): 所有句子的bigram概率 ''' scores_list = [] for sentence_str in sentences_list: words_list = sentence_str.split() score = 1 for i in range(1, len(words_list)): w_i = word2idx_dict[words_list[i]] # w_i w_j = word2idx_dict[words_list[i-1]] # w_{i-1} score *= bigram_table_np[w_j][w_i] scores_list.append(score) return scores_list
完整代码如下:
import re import numpy as np def removePunctuation(sentence_str): '''给定字符串,进行特殊符号过滤,以及小写转换 Args: sentence_str (str): 句子 Returns: sentence_str.lower() (str): 预处理过的句子 ''' punctuation = '.!,;:?\-"\'' sentence_str = re.sub(r'[{}]+'.format(punctuation),'',sentence_str) return sentence_str.lower() def get_sentence(filename): '''从给定文件中获取句子 Args: filename (str): 语料库的文件名 Returns: sentences_list (list): 1-D,存放了所有经过预处理后的句子 ''' sentences_list = [] with open(filename, encoding='utf-8') as f: for line in f.readlines(): sentence_str = removePunctuation(line.strip().split('|')[-1]) sentences_list.append('<s> ' + sentence_str + ' </s>') return sentences_list def count_word(sentences_list): '''给定大量句子,统计出所有单词出现的频次 Args: sentences_list (list): 所有经过预处理后的句子 Returns: wordcount_dict (dict): 键是str类型,表示单词;值是int类型,表示次数,例如{'the': 1234} ''' wordcount_dict = {} for sentence_str in sentences_list: for word in sentence_str.split(): if word in wordcount_dict: wordcount_dict[word] += 1 else: wordcount_dict[word] = 1 return wordcount_dict def word2idx(wordcount_dict): '''构建单词到索引的映射与逆映射 Args: wordcount_dict (dict): 键是str类型,表示单词;值是int类型,表示次数 Returns: word2idx_dict (dict): 键是str类型,表示单词;值是int类型,表示索引,例如{'the': 0} idx2word_dict (dict): 键是int类型,表示索引;值是str类型,表示单词,例如{0: 'the'} ''' word2idx_dict = {} idx2word_dict = {} for idx, word in enumerate(list(wordcount_dict.keys())): word2idx_dict[word] = idx idx2word_dict[idx] = word return word2idx_dict, idx2word_dict def c_table(word2idx_dict, sentences_list, smooth=False): '''构建两个单词连续出现的频次矩阵 Args: word2idx_dict (dict): 键是str类型,表示单词;值是int类型,表示索引 sentences_list (list): 所有经过预处理后的句子 smooth (bool): 是否进行加一平滑 Returns: c_table_np (numpy): 2-D,c_table_np[i][j] = a表示 前一个索引为i的词和当前索引为j的词 同时出现的次数为a ''' n = len(word2idx_dict) # 单词个数 c_table_np = np.zeros((n, n)) # n*n 全0矩阵 for sentence_str in sentences_list: words_list = sentence_str.split() # ['i', 'like', 'apple'] for i in range(1, len(words_list)): w_i = word2idx_dict[words_list[i]] # w_i w_j = word2idx_dict[words_list[i-1]] # w_{i-1} c_table_np[w_j][w_i] += 1 if smooth: # 加一平滑 c_table_np[c_table_np == 0] = 1 return c_table_np def compute_bigram_table(c_table_np, wordcount_dict): '''构建bigram概率矩阵 Args: c_table_np (numpy): bigram频次矩阵 wordcount_dict (dict): 所有单词出现的次数 Returns: c_table_np / count_np[:, None] (numpy): 2-D,bigram概率矩阵 ''' count_np = np.array(list(wordcount_dict.values())) # [800, 900, 788, ...] return c_table_np / count_np[:, None] def compute_sentence_bigram(bigram_table_np, word2idx_dict, sentences_list): '''计算每个句子的bigram概率 Args: bigram_table_np (numpy): bigram概率矩阵 word2idx_dict (dict): 单词到索引的映射 sentences_list (list): 预处理后的句子 Returns: scores_list (list): 所有句子的bigram概率 ''' scores_list = [] for sentence_str in sentences_list: words_list = sentence_str.split() score = 1 for i in range(1, len(words_list)): w_i = word2idx_dict[words_list[i]] # w_i w_j = word2idx_dict[words_list[i-1]] # w_{i-1} score *= bigram_table_np[w_j][w_i] scores_list.append(score) return scores_list if __name__ == '__main__': sentences_list = get_sentence('metadata.txt') wordcount_dict = count_word(sentences_list) word2idx_dict, idx2word_dict = word2idx(wordcount_dict) c_table_np = c_table(word2idx_dict, sentences_list, True) bigram_table_np = compute_bigram_table(c_table_np, wordcount_dict) scores_list = compute_sentence_bigram(bigram_table_np, word2idx_dict, sentences_list) print(scores_list[:10])
以上是第一题的代码,第二题实际上也比较简单,在第一题的基础上稍作修改即可。例如用户输入thethethe这个单词,首先通过
word2idx_dict
查出$the$对应的下标,记作$k$。然后在bigran_table_np
中寻找第$k$行最大的前5个数对应的下标即可 (argmax)。得到下标,再通过idx2word
返回单词。下面我只贴出不一样部分的代码,相同的代码不再重复import heapq def get_recommend_words(idx, bigram_table_np, idx2word_dict): m_list = list(bigram_table_np[idx]) max_number = heapq.nlargest(7, m_list) max_index_list = [] for t in max_number: index = m_list.index(t) max_index_list.append(index) m_list[index] = 0 return [idx2word_dict[i] for i in max_index_list if idx2word_dict[i] not in ['<s>', '</s>']] if __name__ == '__main__': sentences_list = get_sentence('metadata.txt') wordcount_dict = count_word(sentences_list) word2idx_dict, idx2word_dict = word2idx(wordcount_dict) c_table_np = c_table(word2idx_dict, sentences_list, False) bigram_table_np = compute_bigram_table(c_table_np, wordcount_dict) while True: print('Please input a word: ', end="") word = input().lower() if word not in word2idx_dict: print('None') break else: recommend_word = get_recommend_words(word2idx_dict[word], bigram_table_np, idx2word_dict) print(recommend_word[:5])
-
大佬的文章,每次都要期待的