[Python学习]基于python的子网内主机发现工具开发
实现功能:发现某子网内的存活主机并返回其IP地址
运行系统:Windows、Linux
import的第三方库:ipaddress,netaddr
技术框架:
数据处理部分:使用Python的ctyres模块创建类似于C的结构体,对数据包中IP头部分携带的如协议类型、源IP地址和目的IP地址等信息进行解码,处理并显示IP头和其中的组成部分。
主机发现部分:利用ipaddress库中的IPNetwork方法生成大量IP,并对每一个IP地址的某一常闭端口发送UDP消息,如该主机存活,则会返回特定类型的ICMP消息,对其解析并进行下一步筛选。
主机筛选部分:为确保发现的主机是目标子网内的主机,需要对该IP地址是否属于目标子网进行一次筛选。同时为确保该主机是由我们主动发送UDP消息而被拒收的主机,需要判断其返回的数据包的主体内容是否是我们发送的主体内容。
实现过程:
创建名为sniffer_ip_header_decode.pyd的文件
import及参数设置:
import socket
import threading
import time
import os
import struct
import ipaddress
from netaddr import IPNetwork, IPAddress
from ctypes import *
host = "10.8.241.63" #本机地址
subnet = "10.8.241.0/21" #子网地址
magic_message = "PYTHON" #向主机发送的消息内容
IP头信息解析:
class IP(Structure):
_fields_ = [
("ihl", c_ubyte, 4),
("version", c_ubyte, 4),
("tos", c_ubyte),
("len", c_ushort),
("id", c_ushort),
("offset", c_ushort),
("ttl", c_ubyte),
("protocol_num", c_ubyte),
("sum", c_ushort),
("src", c_ulong),
("dst", c_ulong)
]
def __new__(self, socket_buffer=None):
return self.from_buffer_copy(socket_buffer)
def __init__(self, socket_buffer=None):
self.protocol_map = {1:"ICMP", 6:"TCP", 17:"UDP"} #使用的协议
self.src_address = socket.inet_ntoa(struct.pack("<L", self.src)) #源地址
self.dst_address = socket.inet_ntoa(struct.pack("<L", self.dst)) #目标地址
try:
self.protocol = self.protocol_map[self.protocol_num]
except:
self.protocol = str(self.protocol_num)
ICMP信息解析:
class ICMP(Structure):
_fields_=[
("type", c_ubyte),
("code", c_ubyte),
("checksum", c_ushort),
("unused", c_ushort),
("next_hop_mtu", c_ushort)
]
def __new__(self, socket_buffer):
return self.from_buffer_copy(socket_buffer)
def __init__(self, socket_buffer):
pass
#windows系统默认支持IP协议,Linux系统默认支持ICMP协议
if os.name == "nt":
socket_protocol = socket.IPPROTO_IP
else:
socket_protocol = socket.IPPROTO_ICMP
sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)
sniffer.bind((host, 22))
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
#windows需要设置IOCTL以开启混杂模式
if os.name == "nt":
sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)
向子网内主机的65212端口发送UDP消息:
def udp_sender(subnet, magic_message):
time.sleep(5)
sender = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
for ip in IPNetwork(subnet):
try:
sender.sendto(magic_message.encode(), (str(ip) ,65212))
except:
pass
独立开启一个线程执行此任务,以确保发送任务与嗅探任务不会因资源重叠而漏报主机:
t = threading.Thread(target=udp_sender, args=(subnet,magic_message))
t.start()
持续嗅探部分及退出部分:
try:
while True:
raw_buffer = sniffer.recvfrom(65565)[0] #接收到的信息
ip_header = IP(raw_buffer[0:20]) #ip头部分
#为避免过多输出信息 该print语句加了注释
#print("Protocol: %s %s -> %s" %(ip_header.protocol, ip_header.src_address, ip_header.dst_address))
if ip_header.protocol == "ICMP":
#IP头的长度基于其中的ihl字段计算 代表IP头中32位(4字节)长的分片的个数
#即IP头的大小等于该字段的值乘以4
offset = ip_header.ihl * 4
buf = raw_buffer[offset:offset + sizeof(ICMP)]
#解析ICMP数据
icmp_header = ICMP(buf)
#print("ICMP -> Type:%d Code:%d"%(icmp_header.type, icmp_header.code))
#对于存活主机关闭的端口 会向我们的主机返回一个类型=3 代码值=3的信息
#以表示目标端口不可达
if icmp_header.code == 3 and icmp_header.type == 3:
#确保是目标子网内的主机
if IPAddress(ip_header.src_address) in IPNetwork(subnet):
#确保是我们发送信息的目标主机
if str(raw_buffer[len(raw_buffer)-len(magic_message):]) == magic_message:
#输出该主机的IP地址
print("Host up:%s" % ip_header.src_address)
except KeyboardInterrupt:
#对于windows系统 需要关闭混杂模式
if os.name == "nt":
sniffer.ioctl(socket.SIO_RCALL, socket.RCVALL_OFF)
完整代码:
import socket
import threading
import time
import os
import struct
import ipaddress
from netaddr import IPNetwork, IPAddress
from ctypes import *
host = "10.8.241.63"
subnet = "10.8.241.0/21"
magic_message = "PYTHON"
#define IP Head
class IP(Structure):
_fields_ = [
("ihl", c_ubyte, 4),
("version", c_ubyte, 4),
("tos", c_ubyte),
("len", c_ushort),
("id", c_ushort),
("offset", c_ushort),
("ttl", c_ubyte),
("protocol_num", c_ubyte),
("sum", c_ushort),
("src", c_ulong),
("dst", c_ulong)
]
def __new__(self, socket_buffer=None):
return self.from_buffer_copy(socket_buffer)
def __init__(self, socket_buffer=None):
self.protocol_map = {1:"ICMP", 6:"TCP", 17:"UDP"}
self.src_address = socket.inet_ntoa(struct.pack("<L", self.src))
self.dst_address = socket.inet_ntoa(struct.pack("<L", self.dst))
try:
self.protocol = self.protocol_map[self.protocol_num]
except:
self.protocol = str(self.protocol_num)
class ICMP(Structure):
_fields_=[
("type", c_ubyte),
("code", c_ubyte),
("checksum", c_ushort),
("unused", c_ushort),
("next_hop_mtu", c_ushort)
]
def __new__(self, socket_buffer):
return self.from_buffer_copy(socket_buffer)
def __init__(self, socket_buffer):
pass
if os.name == "nt":
socket_protocol = socket.IPPROTO_IP
else:
socket_protocol = socket.IPPROTO_ICMP
sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)
sniffer.bind((host, 22))
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
#windows plat set IOCTL for remix mode
if os.name == "nt":
sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)
def udp_sender(subnet, magic_message):
time.sleep(5)
sender = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
for ip in IPNetwork(subnet):
try:
sender.sendto(magic_message.encode(), (str(ip) ,65212))
except:
pass
t = threading.Thread(target=udp_sender, args=(subnet,magic_message))
t.start()
try:
while True:
raw_buffer = sniffer.recvfrom(65565)[0]
ip_header = IP(raw_buffer[0:20])
#print("Protocol: %s %s -> %s" %(ip_header.protocol, ip_header.src_address, ip_header.dst_address))
if ip_header.protocol == "ICMP":
offset = ip_header.ihl * 4
buf = raw_buffer[offset:offset + sizeof(ICMP)]
icmp_header = ICMP(buf)
#print("ICMP -> Type:%d Code:%d"%(icmp_header.type, icmp_header.code))
if icmp_header.code == 3 and icmp_header.type == 3:
if IPAddress(ip_header.src_address) in IPNetwork(subnet):
if str(raw_buffer[len(raw_buffer)-len(magic_message):]) == magic_message:
print("Host up:%s" % ip_header.src_address)
except KeyboardInterrupt:
if os.name == "nt":
sniffer.ioctl(socket.SIO_RCALL, socket.RCVALL_OFF)
运行结果:
运行平台:Windows
运行方式:
1. 以管理员身份运行cmd窗口
2. 关闭本机防火墙设置
3. 切换到脚本所在目录
4. 执行python sniffer_ip_header_decode.py命令
运行结果: