简单抓包工具——仿照wireshark设计的网络协议解析器(python+

简单抓包⼯具——仿照wireshark设计的⽹络协议解析器
(python+scapy+py。。。
⼀、实现功能
基于Tkinter的Python  GUI界⾯设计,能分条展⽰数据包的概要信息(summary()),分层解析数据包,可显⽰数据包的⼗六进制编码值(hexdump());在抓包的同时解析数据包(不能等抓包停⽌后才解析),可判断IP、TCP或UDP数据包的校验和是否正确;⽀持BPF过滤器,
抓包过程可以暂停和停⽌;可将数据包存储在pcap⽂件中,以供wireshark或其它数据包解析⼯具分析;可以在退出时提⽰⽤户进⾏保存
保存的数据包,进⾏保存⼯作;可以在再次开始新的抓包前提⽰⽤户保存未保存的数据包。
注:
⼆、主要函数说明
1.抓取数据包并对抓到的数据包进⾏相应处理
def capture_packet():
1. 获取并设置过滤条件
filters = ()
2.设置停⽌抓包的条件stop_filter
stop_sending.clear()
3.清空抓到的数据包列表
packet_list.clear()
4.⽤sniff(prn=处理抓到的数据包,filter=过滤条件,stop_filter=停⽌抓包的条件)进⾏抓包
sniff(prn=处理抓到的数据包, filter=过滤条件filters, stop_filter=停⽌抓包的条件)
2.对抓到的数据包进⾏处理的函数
def process_packet(packet):
if 没有点击暂停按钮:
1.将抓到的包存在列表中
packet_list.append(packet)
2.获取抓包的时间
packet_time= timestamp2time(packet.time)
3.设置默认源地址和⽬的地址
src = packet[Ether].src
dst = packet[Ether].dst
4.建⽴协议查询字典判断Ether帧中封装的数据包的类型,设置proto字段的值,如果是IP数据包则进⾏第⼆次判断,如果IP数据包中封装的是TCP或UDP数据包则进⾏        type = packet[Ether].type
types = {0x0800:'IPv4',0x0806:'ARP',0x86dd:'IPv6',0x88cc:'LLDP',0x891D:'TTE'}
5.判断Ether帧中封装的数据包类型
6.判断是否为IP包
if proto == 'IPv4':
7.建⽴协议查询字典,更改源地址和⽬的地址为源IP和⽬的IP地址,判断更改proto字段的值
protos = {1: 'ICMP', 2: 'IGMP', 4: 'IP', 6: 'TCP',……17: 'UDP'}
src = packet[IP].src
dst = packet[IP].dst
8. 判断是否包含TCP或UDP包
if TCP包:
protos_tcp = {80: 'Http', 443: 'Https',……, 25: 'SMTP'}
9.建⽴字典,根据端⼝号判断协议类型并修改proto字段的值
elif UDP包:
10.根据端⼝号判断协议类型并修改proto字段的值
11.⾏列表中插⼊捕获的数据包的信息并显⽰
packet_list_tree.insert("", 'end', packet_id, text=packet_id,
values=(packet_id, packet_time, src, dst, proto, length, info))
3.开始按钮的单击响应函数,如果是停⽌后再次开始捕获,要提⽰⽤户保存已经捕获的数据
if 已经停⽌捕获发,但是没进⾏保存操作:
1.弹出选择窗⼝,选择是否进⾏保存数据包。如果选择是则弹出保存⽂件窗⼝进⾏⽂件保存操作再开始新的⼀次抓包操作,选择否则直接开始⼀次新的抓包,选择
2. 设置开始、保存按钮为不可⽤,暂停、停⽌按钮可操作,并设置停⽌标识stop_flag = False
start_button['state'] = DISABLED  # 不可操作
……
stop_button['state'] = NORMAL
stop_flag = False
if 没进⾏暂停操作:
3.清空已经抓到的数据包列表并设置数据包编号为1
4. 开启新线程进⾏抓包,设置保存标识save_flag = False
t = threading.Thread(target=capture_packet)
t.setDaemon(True)
t.start()
save_flag = False
else:
5.设置暂停标识pause_flag = False
4.暂停按钮的单击响应函数,这时仍然在抓包,只是不对抓到的包进⾏处理
def pause_capture():
1.设置开始按钮为可⽤,暂停按钮为不可⽤,并修改暂停标识pause_flag = True
start_button['state'] = NORMAL  # 可操作
pause_button['state'] = DISABLED  # 不可操作
global pause_flag
pause_flag = True
5.停⽌按钮单击响应函数,终⽌线程,退出sniff()函数停⽌抓包
def stop_capture():
1. 终⽌线程,停⽌抓包
stop_sending.set()
2. 设置开始按钮为可⽤,暂停按钮为不可⽤,保存为可⽤,修改暂停标识pause_flag = False,停⽌标识stop_flag = True
start_button['state'] = NORMAL # 可操作
stop_button['state'] = DISABLED
pause_flag = False
stop_flag = True
6.保存按钮的单击响应函数,将抓到的数据包保存为pcap格式的⽂件
def save_captured_data_to_file():
1.弹出保存⽂件窗⼝,获取⽂件保存的位置及名字,并修改保存标识save_flag = True    filename=tkinter.filedialog.asksaveasfilename(title=窗⼝名, filetypes=保存⽂件    if ⽂件命名不包含.pcap:
2.设置默认⽂件格式为 pcap
filename = filename+'.pcap'
3.使⽤wrpcap()函数进⾏保存⽂件操作
wrpcap(filename, packet_list)
7.退出按钮的单击响应函数,终⽌线程,停⽌抓包,退出程序前要提⽰⽤户保存已经捕获的数据
1.终⽌线程,设置停⽌抓包标识,停⽌抓包
stop_sending.set()
if 已经暂停或停⽌:
# 没进⾏保存操作
if 没进⾏保存:
2.弹出保存提醒窗⼝,如果选择保存则先弹出保存窗⼝进⾏保存操作再关闭窗⼝,如果选择否,则直接退出,选择取消则取消退出操作
else已经保存:
3.直接关闭窗⼝
tk.destroy()
8. 数据包列表单击事件响应函数,在数据包列表单击某数据包时,在协议解析区解析此数据包,并在hexdump区显⽰此数据包的⼗六进制
内容
def on_click_packet_list_tree(event):
1.从数据包列表中取出要分析的数据包
packet = packet_list[packet_id]
2.将选取的数据包分层显⽰,遇到#,则新建⼀个分层
3.如果抓到的数据包包含IP包则检查IP、TCP、UDP的校验和
if IP包存在:
4.重新计算选取的数据包的校验和并与接收到的IP包的原校验和进⾏⽐较
if TCP包分装在IP包中:
1.取出TCP包的校验和的值
2.重新计算TCP的校验和的值与原校验和进⾏⽐较
3.如果IP与TCP的校验和值都正确则弹出校验和检查通过提⽰框,如果有⼀项不正确则弹出错误警告窗⼝。
elif UDP包封装在IP包中:
1.取出UDP包的校验和的值
2.重新计算UDP数据包的校验和
3.如果IP与UDP的校验和值都正确则弹出校验和检查通过提⽰框,如果有⼀项不正确则弹出错误警告窗⼝。
三、基础知识准备
1.sniff()函数⽤于抓包,运⾏scapy后,使⽤help(sniff)查看sniff命令的⽤法。
sniff(count=0(抓包数量), store=True(是否存储), offline=None, prn=None(如何处理抓到的包),
lfilter=None(过滤条件), L2socket=None, timeout=None(运count选项
>>> sniff(count=10)
<Sniffed: TCP:0 UDP:6 ICMP:0 Other:4>
>>> a=sniff(count=10)
>>> a.summary()
Ether / ARP who has 10.8.0.91 says 10.8.180.129 / Padding
Ether / IP / UDP 10.8.250.223:55985 > 224.0.0.252:llmnr / LLMNRQuery
Ether / ARP who has 10.8.0.130 says 10.8.180.129 / Padding
Ether / IP / UDP 10.8.164.101:netbios_ns > 10.8.255.255:netbios_ns / NBNSQueryRequest
Ether / ARP who has 10.8.143.141 says 10.8.39.64 / Padding
Ether / ARP who has 10.8.9.147 says 10.8.9.179 / Padding
Ether / ARP who has 10.8.9.154 says 10.8.9.179 / Padding
Ether / ARP who has 10.8.9.175 says 10.8.9.179 / Padding
Ether / ARP who has 10.8.39.3 says 10.8.39.64 / Padding
Ether / IP / UDP 10.8.146.215:netbios_ns > 10.8.255.255:netbios_ns / NBNSQueryRequest
Prn选项(可以是⾃定义函数,lambda表达式)——lambda x : x.summary()----lambda 对象 : 执⾏的操作
>>> sniff(count=10,prn=(lambda x : x.summary()))
Ether / ARP who has 10.8.205.116 says 10.8.4.238 / Padding
Ether / ARP who has 10.8.234.117 says 10.8.66.32 / Padding
Ether / IP / UDP / DNS Ans "b'Fate._companion-link._tcp.local.'"
Ether / ARP who has 10.8.0.64 says 10.8.11.181 / Padding
Ether / ARP who has 10.8.87.211 says 10.8.87.217 / Padding
Ether / IP / UDP 10.8.223.215:netbios_ns > 10.8.255.255:netbios_ns / NBNSQueryRequest
Ether / ARP who has 10.8.87.212 says 10.8.87.217 / Padding
Ether / IPv6 / UDP / DNS Ans "b'Fate._companion-link._tcp.local.'"
Ether / IP / UDP / DNS Ans "b'si=BCD53060-A820-4DFC-B776-071FEDDAA131'"
Ether / ARP who has 10.8.87.214 says 10.8.87.217 / Padding
<Sniffed: TCP:0 UDP:4 ICMP:0 Other:6>
lfilter选项----添加过滤器,python语⾔写的
>>> sniff(count=10,lfilter=(lambda x : x.haslayer(TCP)))
<Sniffed: TCP:10 UDP:0 ICMP:0 Other:0>
>>> sniff(count=10,lfilter=(lambda x : x.haslayer(Padding)))
<Sniffed: TCP:0 UDP:0 ICMP:0 Other:10>
Filter选项-BPF过滤——filter: BPF filter to apply
>>> sniff(count=10,filter='tcp dst port 80')
<Sniffed: TCP:10 UDP:0 ICMP:0 Other:0>
>>>
2.使⽤wrpcap()函数进⾏保存⽂件操作,wrpcap(⽂件保存的绝对路径, 要保存的数据包)
3.filename=tkinter.filedialog.asksaveasfilename(title=窗⼝名, filetypes=保存⽂件类型, initialfile=默认命名),弹出保存⽂件窗⼝,获取⽂件保存的位置及名字。
4. resault = ssagebox.askyesnocancel("保存提醒", "是否保存抓到的数据包"),弹出提⽰⽂本框,如果选择保存
(resault=True)则先弹出保存窗⼝进⾏保存操作再关闭窗⼝,如果选择否(resault=False),则直接退出,选择取消则取消退出操作。
四、主要功能实现代码
1.抓取数据包并对抓到的数据包进⾏相应处理
# 抓取数据包并处理
def capture_packet():
# 设置过滤条件
filters = ()
print("抓包条件:"+filters)
# 设置停⽌抓包的条件stop_filter
简易过滤器
stop_sending.clear()
global packet_list
# 清空列表
packet_list.clear()
# 抓取数据包并将抓到的包存在列表中
sniff(prn=(lambda x: process_packet(x)), filter=filters, stop_filter=(lambda x: stop_sending.is_set()))
2.对抓到的数据包进⾏处理的函数
# 处理抓到的数据包
def process_packet(packet):
if pause_flag == False:
global packet_list
# 将抓到的包存在列表中
packet_list.append(packet)
# 抓包的时间
packet_time= timestamp2time(packet.time)
src = packet[Ether].src
dst = packet[Ether].dst
type = packet[Ether].type
types = {0x0800:'IPv4',0x0806:'ARP',0x86dd:'IPv6',0x88cc:'LLDP',0x891D:'TTE'}
if type in types:
proto = types[type]
else:
proto = 'LOOP'  # 协议
# IP
if proto == 'IPv4':
# 建⽴协议查询字典
protos = {1: 'ICMP', 2: 'IGMP', 4: 'IP', 6: 'TCP', 8: 'EGP', 9: 'IGP', 17: 'UDP', 41: 'IPv6', 50: 'ESP', 89:'OSPF'}            src = packet[IP].src
dst = packet[IP].dst
proto=packet[IP].proto
if proto in protos:
proto=protos[proto]
# tcp
if TCP in packet:
protos_tcp = {80: 'Http', 443: 'Https', 23: 'Telnet', 21: 'Ftp', 20: 'ftp_data', 22: 'SSH', 25: 'SMTP'}
sport = packet[TCP].sport
dport = packet[TCP].dport
if sport in protos_tcp:
proto = protos_tcp[sport]
elif dport in protos_tcp:
proto = protos_tcp[dport]
elif UDP in packet:
if packet[UDP].sport == 53 or packet[UDP].dport == 53:
proto = 'DNS'
length = len(packet)  # 长度
info = packet.summary()  # 信息
global packet_id  # 数据包的编号
packet_list_tree.insert("", 'end', packet_id, text=packet_id,
values=(packet_id, packet_time, src, dst, proto, length, info))
packet_list_tree.update_idletasks()  # 更新列表,不需要修改
packet_id = packet_id + 1
3.开始按钮的单击响应函数,如果是停⽌后再次开始捕获,要提⽰⽤户保存已经捕获的数据

本文发布于:2024-09-21 19:27:11,感谢您对本站的认可!

本文链接:https://www.17tex.com/tex/3/351140.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:数据包   保存   抓到   设置   列表
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2024 Comsenz Inc.Powered by © 易纺专利技术学习网 豫ICP备2022007602号 豫公网安备41160202000603 站长QQ:729038198 关于我们 投诉建议