我最近一直在做類似的事情,在蟒蛇結帳驗證碼:
#!/usr/bin/python
import socket, sys, io
from struct import *
from time import localtime, strftime
#-------------------------------------------------------------------------------
class ETHHeader: # ethernet header
def __init__(self, s):
data = s.read(14)
hdr = unpack("!6s6sH", data)
self.destination_addr = data[0:6] # mac address
self.source_addr = data[6:12] # mac address
self.protocol = socket.ntohs(hdr[2])
def src_addr(self):
return addr_to_str(self.srouce_addr)
def dst_addr(self):
return addr_to_str(self.destination_addr)
def is_IP(self): return self.protocol == 8
#Convert a string of 6 characters of ethernet address into a dash separated hex string
def addr_to_str (a) :
b = "%.2x:%.2x:%.2x:%.2x:%.2x:%.2x" % (ord(a[0]), ord(a[1]) , ord(a[2]), \
ord(a[3]), ord(a[4]) , ord(a[5]))
return b
#-------------------------------------------------------------------------------
class IPHeader:
def __init__(self, s):
iph = unpack('!BBHHHBBH4s4s', s.read(20))
self.protocol = iph[6]
self.src_addr = socket.inet_ntoa(iph[8]);
self.dst_addr = socket.inet_ntoa(iph[9]);
def __str__(self):
return "(" + self.proto() + " " + self.src_addr + " -> " + self.dst_addr + ")"
def proto(self):
return { 6: "TCP", 1: "ICMP", 17: "UDP" }.get(self.protocol, "???")
#-------------------------------------------------------------------------------
class UDPHeader:
def __init__(self, s):
hdr = unpack("!HHHH", s.read(8))
self.source_port = hdr[0]
self.destination_port = hdr[1]
self.length = hdr[2]
self.checksum = hdr[3]
#-------------------------------------------------------------------------------
try:
#s = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_UDP)
# Using this instead of the above we will get:
# Also incoming packets.
# Ethernet header as part of the received packet.
# TCP, UDP, ...
s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(0x0003))
except socket.error, msg:
print "Socket could not be created. Error Code : " + str(msg[0]) + ' Message ' + msg[1]
sys.exit()
#-------------------------------------------------------------------------------
def communication_between_ports(udp_header, ports):
src = udp_header.source_port
dst = udp_header.destination_port
return src in ports and dst in ports
def communication_between_ips(ip_header, ips):
src = ip_header.src_addr
dst = ip_header.dst_addr
return src in ips and dst in ips
#-------------------------------------------------------------------------------
while True:
packet = s.recvfrom(65535) # buffer size
data = io.BytesIO(packet[0])
eth = ETHHeader(data)
if not eth.is_IP():
continue
iph = IPHeader(data)
udph = UDPHeader(data)
if not communication_between_ips(iph, ["192.168.1.3", "192.168.1.102"]):
continue
if iph.proto() != "UDP":
continue
## To filter by port:
#if udph.source_port != <PORT-YOU-WANT>
# continue
time = localtime()
timestr = strftime("%H:%M:%S", time)
a = iph.src_addr
b = iph.dst_addr
direction = " -> "
if a > b:
tmp = a
a = b
b = tmp
direction = " <- "
print timestr + ": " + a + str(direction) + b
我只提取,我從每一層需要(ETH,IP,...),但你的數據應該能夠輕鬆地擴展它。
得到了this blog post的大部分信息。
您是否需要捕獲已發送到服務器的信息?或者你需要編寫一個程序來監聽傳入的數據包(如UDP服務器)? – 2010-11-02 17:13:46