Python3dpkt包解析DNS数据包

it2025-02-14  5

从网卡直接抓包,只过滤端口53,过滤出dpkt.ip.IP类和dpkt.udp.UDP类。提取DNS包的各种信息储存为字典格式。代码如下。

import pcap import dpkt import socket # RR types DNS_A = 1 DNS_NS = 2 DNS_CNAME = 5 DNS_SOA = 6 DNS_NULL = 10 DNS_PTR = 12 DNS_HINFO = 13 DNS_MX = 15 DNS_TXT = 16 DNS_AAAA = 28 DNS_SRV = 33 DNS_OPT = 41 DNS_UID = 65 # RR classes DNS_IN = 1 DNS_CHAOS = 3 DNS_HESIOD = 4 DNS_ANY = 255 RR_TYPES = { 1:"A", 2:"NS", 5:"CNAME", 6:"SOA", 10:"NULL", 11:"WKS", 12:"PTR", 13:"HINFO", 15:"MX", 16:"TXT", 28:"AAAA", 33:"SRV", 41:"OPT", 65:"UID", 252:"AXFR", 255:"ANY", 35:"unknown", 48:"unknown", 43:"unknown", 32:"unknown" } RR_CLASSES={ 1:"IN", 3:"CHAOS", 4:"HESIOD", 255:"ANY" } def mac_addr(mac): return '%02x:%02x:%02x:%02x:%02x:%02x' % tuple(mac) def ip_addr(ip): try: result = socket.inet_ntop(socket.AF_INET, ip)#ipv4地址 except: result = socket.inet_ntop(socket.AF_INET6, ip)#ipv6地址 return result def captureData(iface): pkt = pcap.pcap(iface, promisc=True, immediate=True, timeout_ms=50) # print(pkt) # filter method filters = { 'DNS': 'udp port 53', 'HTTP': 'tcp port 80' } #pkt.setfilter(filters['DNS']) print(filters['DNS']) print('Start capture...') try: pkts_count = 0 for ptime, pdata in pkt: mydict = anlysisData(ptime,pdata) if mydict: pkts_count+=1 print(mydict) except KeyboardInterrupt as e: print(e) print('%d packets received' % (pkts_count)) def anlysisData(ptime,data): eth = dpkt.ethernet.Ethernet(data) ip = eth.data if not isinstance(ip, dpkt.ip.IP): return None if isinstance(ip,bytes): return None if isinstance(ip, dpkt.ip6.IP6): return None myudp = ip.data if not isinstance(myudp, dpkt.udp.UDP): return None try: my_dns = dpkt.dns.DNS(myudp.data) except: return None #不是DNS类型或者出错 Transaction_ID = my_dns.id datadict = {} datadict["Transaction_ID"] = Transaction_ID datadict["Questions"] = len(my_dns.qd) datadict["Answer_RRs"] = len(my_dns.an) if datadict["Questions"] > 0: q = my_dns.qd[0] datadict["qname"] = q.name datadict["qType"] = RR_TYPES[q.type] try : datadict["qClass"] = RR_CLASSES[q.cls] except KeyError: datadict["qClass"] = RR_CLASSES[1]#出现奇怪的选项 datadict["sip"] = ip_addr(ip.src) datadict["dip"] = ip_addr(ip.dst) datadict["sport"] = myudp.sport datadict["dport"] = myudp.dport datadict["qr"] = my_dns.qr datadict["addr_type"] = ip.v if ip.v==4: datadict["ttl"] = ip.ttl else: datadict["ttl"] = ip.hlim datadict["rcode"] = my_dns.rcode datadict["aa"] = my_dns.aa datadict["rd"] = my_dns.rd if my_dns.qr == 0: datadict["when"] = ptime datadict["msg_size_req"] = myudp.ulen return datadict else: datadict["msg_size_rcvd"] = myudp.ulen datadict["collect_time"] = ptime answers = [] value = [] cname =[] ips=[] for rr in my_dns.an: an = {} an["name"] = rr.name an["Type"] = RR_TYPES[rr.type] an["Class"] = RR_CLASSES[rr.cls] an["ttl"] = rr.ttl an["Data_length"] = rr.rlen if rr.type == DNS_A: an["Address"] = ip_addr(rr.rdata) value.append(ip_addr(rr.rdata)) ips.append(ip_addr(rr.rdata)) elif rr.type == DNS_NS: an["nsname"] = rr.nsname value.append(rr.nsname) elif rr.type == DNS_CNAME: an["cname"] = rr.cname value.append(rr.cname) cname.append(rr.cname) elif rr.type == DNS_PTR: an["ptrname"] = rr.ptrname value.append(rr.ptrname) elif rr.type == DNS_SOA: an["mname"] = rr.mname an["rname"] = rr.rname value.append(rr.rname) value.append(rr.mname) elif rr.type == DNS_MX: an["mxname"] = rr.mxname value.append(rr.mxname) elif rr.type == DNS_TXT or rr.type == DNS_HINFO: an["text"] = rr.text value.append(rr.text) elif rr.type == DNS_AAAA: # ipv6_dec = str(int(binascii.b2a_hex(rr.ip6), 16)) # an["ip6"] = dec2ipv6(ipv6_dec) an["ip6"]=socket.inet_ntop(socket.AF_INET6, rr.ip6) value.append(an["ip6"]) ips.append(an["ip6"]) elif rr.type == DNS_NULL: an["null"] = rr.null value.append(an["null"]) elif rr.type == DNS_SRV: an["srvname"] = rr.srvname value.append(an["srvname"]) answers.append(an) datadict["answers"] = answers datadict["value"] =value datadict["cnames"] = cname datadict["ips"] = ips #print(ips) return datadict def main(): iface = 'enp61s0f3' print(iface) captureData(iface) if __name__ == "__main__": main()#11522

dpkt官方文档地址,里面有各种报文类的结构和示例。 https://dpkt.readthedocs.io/en/latest/api/api_auto.html#module-dpkt.dns

最新回复(0)