近期项目有了一个过滤敏感词的功能需求,在网上找了一些方法及解说,发现DFA算法比较好用,容易实现,但很多文章解释得不太清楚,这里将其详细描述,并用python代码实现.
DFA,网上常译为确定有限自动机,俗话就是新数据在已有的数据基础上去有限次查找,比如:新数据为苹果公司年会,旧数据为苹果公司,这样就可以直接旧数据后面添加"年会"两个字. DFA算法思想是将敏感词组通过建立嵌套字典的方式去构建敏感词链表(以一个特定字符’\x00’作为结束),然后遍历新词的每个字是否出现在敏感词链表中,如果出现,即为敏感词,提取出来,可以用"*"代替. 例如:敏感词词组有:新疆骚乱,爆炸,骚乱 则构建出来的敏感词链表分别为: ‘新’: {‘疆’: {‘骚’: {‘乱’: {’\x00’: 0}}}} ‘爆’: {‘炸’: {’\x00’: 0}} ‘骚’: {‘乱’: {’\x00’: 0}} 合为一个链表为:{‘新’: {‘疆’: {‘骚’: {‘乱’: {’\x00’: 0}}}}, ‘爆’: {‘炸’: {’\x00’: 0}}, ‘骚’: {‘乱’: {’\x00’: 0}}} 其中’\x00’是指以该字符作为链表结尾 这样,当新词作为输入遍历敏感词链表时,遇到特定字符\x00才算敏感词识别出来,否则都不算入敏感词
采用类管理,初始化一个空字典链表,特定结束符以及生成敏感词链表的函数
Proj_path = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) SensitiveWord_Path = os.path.join(Proj_path, "main/minganci.txt") class DFAFilter(object): def __init__(self): self.keyword_chains = {} # 关键词链表 self.delimit = '\x00' # 限定 self.parse(SensitiveWord_Path)接下来是parse()读取敏感词库表,通过add()将敏感词以嵌套字典的方式形成链表,即为level,访问level即可匹配敏感词
def parse(self, path): with open(path, encoding='utf-8') as f: for keyword in f: self.add(str(keyword).strip()) def add(self, keyword): keyword = keyword.lower() # 关键词英文变为小写 chars = keyword.strip() # 关键字去除首尾空格和换行 if not chars: # 如果关键词为空直接返回 return level = self.keyword_chains # 遍历关键字的每个字 for i in range(len(chars)): # 如果这个字已经存在字符链的key中就进入其子字典 if chars[i] in level: level = level[chars[i]] else: if not isinstance(level, dict): break for j in range(i, len(chars)): level[chars[j]] = {} last_level, last_char = level, chars[j] level = level[chars[j]] last_level[last_char] = {self.delimit: 0} break if i == len(chars) - 1: # 最后一个字 level[self.delimit] = 0 print(level)最后,将传入的词组进行敏感词过滤,遍历新词组,只有当前字在敏感词链表中才能加入敏感词
def filter(self, message, repl="*"): message = message.lower() ret = [] start = 0 while start < len(message): level = self.keyword_chains step_ins = 0 for char in message[start:]: # 新字在敏感词字典链表中 if char in level: step_ins += 1 # 特定字符不在当前字的value值里,嵌套遍历下一个 if self.delimit not in level[char]: level = level[char] else: # 将敏感词以*代替,跳出for循环 ret.append(repl * step_ins) start += step_ins - 1 break else: # 新字不在敏感词字典链表中 ret.append(message[start]) break else: ret.append(message[start]) start += 1 return ''.join(ret)