2014-09-04 33 views
2

當我收到帶scapy的ICMP Destination unreachable(需要分段碎片的ICMP TYPE = 3 CODE = 4)消息時,我正在進行TCP重傳行爲測試。如何使用scapy從數據包中讀取整個ip層和tcp層?

測試流程是這樣的:
1.建立TCP連接到服務器
2.發送HTTP GET請求到服務器時TCP建立
3.當HTTP響應回
4.發送一個ICMP類型3代碼4消息到服務器,帶有小的MTU集合

問題是,ICMP TYPE = 3 CODE = 4消息包含該HTTP的IP標頭和部分TCP標頭(srt,dst和seq編號)響應數據包。目前,我只是從HTTP響應數據包中讀取每個參數(如IP標識,frag標記,ttl等)。現在的問題是:有沒有什麼辦法,我可以讀取該數據包的全IP和TCP報頭:

ICMP(TYPE=3 CODE=4)/IP Header/TCP Header

回答

1

希望以下將幫助:

>>> pkt = ICMP()/IP()/TCP() 
>>> ipHeader = pkt.getlayer(IP) 
>>> ipHeader 
<IP frag=0 proto=tcp |<TCP |>> 
>>> 

只獲得IP報頭:

>>> pkt = Ether()/IP()/TCP() 
>>> ip = pkt.getlayer(IP) 
>>> ip 
<IP frag=0 proto=tcp |<TCP |>> 
>>> ip.remove_payload() 
>>> ip 
<IP |> 
>>> 
+1

謝謝,但這似乎得到全IP層,包括IP包頭,TCP報頭和TCP有效載荷,而我只需要IP和TCP報頭。 – Johnson 2014-09-19 06:33:38

0

我將數據包對象轉換爲dict對象,使我的解析生活更輕鬆。 代碼:

from scapy.all import * 
from cStringIO import StringIO 
import sys 
class Capturing(list): 
     """ 
     This class will capture sys.out. 
     More info: 
     http://stackoverflow.com/questions/16571150/how-to-capture-stdout-output-from-a-python-function-call 
     """ 
    def __enter__(self): 
     self._stdout = sys.stdout 
     sys.stdout = self._stringio = StringIO() 
     return self 
    def __exit__(self, *args): 
     self.extend(self._stringio.getvalue().splitlines()) 
     del self._stringio # free up some memory 
     sys.stdout = self._stdout 
class PacketDict(dict): 
     """ 
     This class will convert packet into a dict by using the result of packet.show2(). Furthermore the original 
     packet will be also saved as attribute '.packet'. 
     More class functions could be added, currently only support 'haslayer()'. 
     Scapy version: scapy-2.3.3 
     """ 
    def __init__(self, pkt): 
     self.packet = pkt 
     self.__packet_to_dict() 
    def __extract_key(self, line): 
     a = line.lstrip("###[ ").rstrip(" ]### ") 
     return a 
    def __extract_value_to_dict(self, line): 
     if line.find("=") > -1: 
      b = line.replace(" ","") 
      a = b.split("=") 
      return {a[0]: a[1]} 
     return {line.replace(" ",""): None} 
    def __packet_to_dict(self): 
     with Capturing() as packet_in_list: 
      self.packet.show2() 
     current_dict = self 
     for line in packet_in_list: 
      if line.strip() != "": 
       line = line.replace("|","") 
       if line.find('###[') > -1: 
        key = self.__extract_key(line) 
        current_dict[key] = {} 
        current_dict = current_dict[key] 
        continue 
       current_dict.update(self.__extract_value_to_dict(line)) 
    def haslayer(self, pkt_cls): 
     return self.packet.haslayer(pkt_cls) 

if __name__ == "__main__": 
    packet_list = rdpcap("/media/sf_ubshare/pcap/test.pcap") 
    for packet in packet_list: 
     a = PacketDict(packet) 
     print a['Ethernet']['IP']['ihl'] 
     print a.haslayer('ISAKMP') 

輸出:

/usr/bin/python2.7 /home/yuanzhi/workspace/scaptest/scaptest.py 
5L 
1 

字典的樣子:

{ 
    "Ethernet": { 
    "src": "5e:22:73:12:50:02", 
    "dst": "6e:30:96:e3:a0:6c", 
    "type": "0x800", 
    "IP": { 
     "frag": "0L", 
     "src": "1.0.3.0", 
     "UDP": { 
     "dport": "isakmp", 
     "ISAKMP": { 
      "resp_cookie": "'\\xb5A\\x06\\xef\\x126~\\x95'", 
      "exch_type": "identityprot.", 
      "length": "204", 
      "version": "0x10", 
      "flags": "", 
      "init_cookie": "'2\\x12\\xbda\\xee\\xa8\\xba\\xa6'", 
      "ISAKMP SA": { 
      "IKE proposal": { 
       "SPI": "''", 
       "length": "44", 
       "IKE Transform": { 
       "length": "36", 
       "num": "0", 
       "transforms": "[('Encryption','AES-CBC'),('KeyLength',256),('Hash','SHA'),('Authentication','PSK'),('GroupDesc','1024MODPgr'),('LifeType','Seconds'),('LifeDuration',43200)]", 
       "ISAKMP Vendor ID": {......