modbus和siemensS7协议鉴别攻击

说白了就是个扫描脚本啦。是在一本工控书籍上看到的,可以扫描出运行modbus协议的设备或者西门子s7-200和西门子s7-300的plc的一些信息,可以作为一个演示的方法,python写的python下运行就ok了,运行方法python plcscan.py ip

modbus和siemensS7协议鉴别攻击

第一个  modbus.py

from struct import pack,unpack

import socket

from optparse import OptionGroup

import string

__FILTER = "".join([' '] + [' ' if chr(x) not in string.printable or chr(x) in string.whitespace else chr(x) for x in range(1,256)])
def StripUnprintable(msg):
    return msg.translate(__FILTER)

class ModbusProtocolError(Exception):
    def __init__(self, message, packet=''):
        self.message = message
        self.packet = packet
    def __str__(self):
        return "[Error][ModbusProtocol] %s" % self.message

class ModbusError(Exception):
    _errors = {
        0:      'No reply',
        # Modbus errors
        1:      'ILLEGAL FUNCTION',
        2:      'ILLEGAL DATA ADDRESS',
        3:      'ILLEGAL DATA VALUE',
        4:      'SLAVE DEVICE FAILURE',
        5:      'ACKNOWLEDGE',
        6:      'SLAVE DEVICE BUSY',
        8:      'MEMORY PARITY ERROR',
        0x0A:   'GATEWAY PATH UNAVAILABLE',
        0x0B:   'GATEWAY TARGET DEVICE FAILED TO RESPOND'
    }
    def __init__(self,  code):
        self.code = code
        self.message = ModbusError._errors[code] if ModbusError._errors.has_key(code) else 'Unknown Error'
    def __str__(self):
        return "[Error][Modbus][%d] %s" % (self.code, self.message)


class ModbusPacket:
    def __init__(self, transactionId=0, unitId=0, functionId=0, data=''):
        self.transactionId = transactionId
        self.unitId = unitId
        self.functionId = functionId
        self.data = data

    def pack(self):
        return pack('!HHHBB',
            self.transactionId,          # transaction id
            0,                           # protocol identifier (reserved 0)
            len(self.data)+2,            # remaining length
            self.unitId,                 # unit id
            self.functionId              # function id
        ) + self.data                    # data

    def unpack(self,packet):
        if len(packet)<8:
            raise ModbusProtocolError('Response too short', packet)

        self.transactionId, self.protocolId, length, self.unitId, self.functionId = unpack('!HHHBB',packet[:8])
        if len(packet) < 6+length:
            raise ModbusProtocolError('Response too short', packet)

        self.data = packet[8:]

        return self

class Modbus:
    def __init__(self, ip, port=502, uid=0, timeout=8):
        self.ip = ip
        self.port = port
        self.uid = uid
        self.timeout = timeout

    def Request(self, functionId, data=''):
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(self.timeout)

        sock.connect((self.ip,self.port))

        sock.send(ModbusPacket(0, self.uid, functionId, data).pack())

        reply = sock.recv(1024)

        if not reply:
            raise ModbusError(0)

        response = ModbusPacket().unpack(reply)

        if response.unitId != self.uid:
            raise ModbusProtocolError('Unexpected unit ID or incorrect packet', reply)

        if response.functionId != functionId:
            raise ModbusError(ord(response.data[0]))

        return response.data

    def DeviceInfo(self):
        res = self.Request(0x2b, '\x0e\x01\00')

        if res and len(res)>5:
            objectsCount = ord(res[5])
            data = res[6:]
            info = ''
            for i in range(0, objectsCount):
                info += data[2:2+ord(data[1])]
                info += ' '
                data = data[2+ord(data[1]):]
            return info
        else:
            raise ModbusProtocolError('Packet format (reply for device info) wrong', res)

def ScanUnit(ip, port, uid, timeout, function=None, data=''):
    con = Modbus(ip, port, uid, timeout)

    unitInfo = []
    if function:
        try:
            response = con.Request(function, data)
            unitInfo.append("Response: %s\t(%s)" % (StripUnprintable(response), response.encode('hex')))
        except ModbusError as e:
            if e.code:
                unitInfo.append("Response error: %s" % e.message)
            else:
                return unitInfo

    try:
        deviceInfo = con.DeviceInfo()
        unitInfo.append("Device: %s" % deviceInfo)
    except ModbusError as e:
        if e.code:
            unitInfo.append("Device info error: %s" % e.message)
        else:
            return unitInfo

    return unitInfo

def Scan(ip, port, options):
    res = False
    try:
        data = options.modbus_data.decode('string-escape') if options.modbus_data else ''

        if options.brute_uid:
            uids = [0,255] + range(1,255)
        elif options.modbus_uid:
            uids = [int(uid.strip()) for uid in options.modbus_uid.split(',')]
        else:
            uids = [0,255]

        for uid in uids:
            unitInfo = ScanUnit(ip, port, uid, options.modbus_timeout, options.modbus_function, data)

            if unitInfo:
                if not res:
                    print "%s:%d Modbus/TCP" % (ip, port)
                    res = True
                print "  Unit ID: %d" % uid
                for line in unitInfo:
                    print "    %s" % line

        return res

    except ModbusProtocolError as e:
        print "%s:%d Modbus protocol error: %s (packet: %s)" % (ip, port, e.message, e.packet.encode('hex'))
        return res
    except socket.error as e:
        print "%s:%d %s" % (ip, port, e)
        return res

def AddOptions(parser):
    group = OptionGroup(parser, "Modbus scanner")
    group.add_option("--brute-uid", action="store_true", help="Brute units ID", default=False)
    group.add_option("--modbus-uid", help="Use uids from list", type="string", metavar="UID")
    group.add_option("--modbus-function", help="Use modbus function NOM for discover units", type="int", metavar="NOM")
    group.add_option("--modbus-data", help="Use data for for modbus function", default="", metavar="DATA")
    group.add_option("--modbus-timeout", help="Timeout for modbus protocol (seconds)", default=8, type="float", metavar="TIMEOUT")
    parser.add_option_group(group)

第二个  s7.py

from struct import *
from random import randint
from optparse import OptionGroup

import struct
import socket
import string

__FILTER = "".join([' '] + [' ' if chr(x) not in string.printable or chr(x) in string.whitespace else chr(x) for x in range(1,256)])
def StripUnprintable(msg):
    return msg.translate(__FILTER)

class TPKTPacket:
    """ TPKT packet. RFC 1006
    """
    def __init__(self, data=''):
        self.data = str(data)
    def pack(self):
        return pack('!BBH',
            3,                  # version
            0,                  # reserved
            len(self.data)+4    # packet size
        ) + str(self.data)
    def unpack(self,packet):
        try:
            header = unpack('!BBH', packet[:4])
        except struct.error as e:
            raise S7ProtocolError("Unknown TPKT format")

        self.data = packet[4:4+header[2]]
        return self

class COTPConnectionPacket:
    """ COTP Connection Request or Connection Confirm packet (ISO on TCP). RFC 1006
    """
    def __init__(self, dst_ref=0, src_ref=0, dst_tsap=0, src_tsap=0, tpdu_size=0):
        self.dst_ref    = dst_ref
        self.src_ref    = src_ref
        self.dst_tsap   = dst_tsap
        self.src_tsap   = src_tsap
        self.tpdu_size  = tpdu_size

    def pack(self):
        """ make Connection Request Packet
        """
        return pack('!BBHHBBBHBBHBBB',
            17,             # size
            0xe0,           # pdu type: CR
            self.dst_ref,
            self.src_ref,
            0,              # flag
            0xc1, 2, self.src_tsap,
            0xc2, 2, self.dst_tsap,
            0xc0, 1, self.tpdu_size )
    def __str__(self):
        return self.pack()

    def unpack(self, packet):
        """ parse Connection Confirm Packet (header only)
        """
        try:
            size, pdu_type, self.dst_ref, self.src_ref, flags = unpack('!BBHHB', packet[:7])
        except struct.error as e:
            raise S7ProtocolError("Wrong CC packet format")
        if len(packet) != size + 1:
            raise S7ProtocolError("Wrong CC packet size")
        if pdu_type != 0xd0:
            raise S7ProtocolError("Not a CC packet")

        return self

class COTPDataPacket:
    """ COTP Data packet (ISO on TCP). RFC 1006
    """
    def __init__(self, data=''):
        self.data = data
    def pack(self):
        return pack('!BBB',
            2,                      # header len
            0xf0,                   # data packet
            0x80) + str(self.data)
    def unpack(self, packet):
        self.data = packet[ord(packet[0])+1:]
        return self
    def __str__(self):
        return self.pack()

class S7Packet:
    """ S7 packet
    """
    def __init__(self, type=1, req_id=0, parameters='', data=''):
        self.type       = type
        self.req_id     = req_id
        self.parameters = parameters
        self.data       = data
        self.error      = 0

    def pack(self):
        if self.type not in [1,7]:
            raise S7ProtocolError("Unknown pdu type")
        return ( pack('!BBHHHH',
            0x32,                   # protocol s7 magic
            self.type,              # pdu-type
            0,                      # reserved
            self.req_id,            # request id
            len(self.parameters),   # parameters length
            len(self.data)) +       # data length
                 self.parameters +
                 self.data )

    def unpack(self, packet):
        try:
            if ord(packet[1]) in [3,2]:   # pdu-type = response
                header_size = 12
                magic0x32, self.type, reserved, self.req_id, parameters_length, data_length, self.error = unpack('!BBHHHHH', packet[:header_size])
                if self.error:
                    raise S7Error(self.error)
            elif ord(packet[1]) in [1,7]:
                header_size = 10
                magic0x32, self.type, reserved, self.req_id, parameters_length, data_length = unpack('!BBHHHH', packet[:header_size])
            else:
                raise S7ProtocolError("Unknown pdu type (%d)" % ord(packet[1]))
        except struct.error as e:
            raise S7ProtocolError("Wrong S7 packet format")

        self.parameters = packet[header_size:header_size+parameters_length]
        self.data = packet[header_size+parameters_length:header_size+parameters_length+data_length]
        return self

    def __str__(self):
        return self.pack()


class S7ProtocolError(Exception):
    def __init__(self, message, packet=''):
        self.message = message
        self.packet = packet
    def __str__(self):
        return "[ERROR][S7Protocol] %s" % self.message

class S7Error( Exception ):
    _errors = {
        # s7 data errors
        0x05: 'Address Error',
        0x0a: 'Item not available',
        # s7 header errors
        0x8104: 'Context not supported',
        0x8500: 'Wrong PDU size'
    }
    def __init__(self, code):
        self.code = code
    def __str__(self):
        if S7Error._errors.has_key(self.code):
            message = S7Error._errors[self.code]
        else:
            message = 'Unknown error'
        return "[ERROR][S7][0x%x] %s" % (self.code, message)


def Split(ar,size):
    """ split sequence into blocks of given size
    """
    return [ar[i:i+size] for i in range(0, len(ar), size)]

class s7:
    def __init__(self, ip, port, src_tsap=0x200, dst_tsap=0x201, timeout=8):
        self.ip       = ip
        self.port     = port
        self.req_id   = 0
        self.s        = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

        self.dst_ref  = 0
        self.src_ref  = 0x04
        self.dst_tsap = dst_tsap
        self.src_tsap = src_tsap
        self.timeout  = timeout

    def Connect(self):
        """ Establish ISO on TCP connection and negotiate PDU
        """
        #sleep(1)
        self.src_ref = randint(1, 20)
        self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.s.settimeout(self.timeout)
        self.s.connect((self.ip, self.port))
        self.s.send(TPKTPacket(COTPConnectionPacket(self.dst_ref, self.src_ref, self.dst_tsap, self.src_tsap, 0x0a)).pack())
        reply = self.s.recv(1024)
        response = COTPConnectionPacket().unpack(TPKTPacket().unpack(reply).data)

        self.NegotiatePDU()

    def Request(self, type, parameters='', data=''):
        """ Send s7 request and receive response
        """
        packet = TPKTPacket(COTPDataPacket(S7Packet(type, self.req_id, parameters, data))).pack()
        self.s.send(packet)
        reply = self.s.recv(1024)
        response = S7Packet().unpack(COTPDataPacket().unpack(TPKTPacket().unpack(reply).data).data)
        if self.req_id != response.req_id:
            raise S7ProtocolError('Sequence ID not correct')
        return response


    def NegotiatePDU(self, pdu=480):
        """ Send negotiate pdu request and receive response. Reply no matter
        """
        response = self.Request(0x01, pack('!BBHHH',
            0xf0,       # function NegotiatePDU
            0x00,       # unknown
            0x01,       # max number of parallel jobs
            0x01,       # max number of parallel jobs
            pdu))      # pdu length

        func, unknown, pj1, pj2, pdu = unpack('!BBHHH', response.parameters)
        return pdu

    def Function(self, type, group, function, data=''):
        parameters = pack('!LBBBB',
            0x00011200 +            # parameter head (magic)
            0x04,                   # parameter length
            0x11,                   # unknown
            type*0x10+group,        # type, function group
            function,               # function
            0x00 )                  # sequence

        data = pack('!BBH', 0xFF, 0x09, len(data)) + data
        response = self.Request(0x07, parameters, data)

        code, transport_size, data_len = unpack('!BBH', response.data[:4])
        if code != 0xFF:
            raise S7Error(code)
        return response.data[4:]

    def ReadSZL(self, szl_id):
        szl_data = self.Function(
            0x04,                   # request
            0x04,                   # szl-functions
            0x01,                   # read szl
            pack('!HH',
                szl_id,             # szl id
                1))                 # szl index

        szl_id, szl_index, element_size, element_count = unpack('!HHHH', szl_data[:8])

        return Split(szl_data[8:], element_size)

def BruteTsap(ip, port, src_tsaps=(0x100, 0x200), dst_tsaps=(0x102, 0x200, 0x201) ):
    for src_tsap in src_tsaps:
        for dst_tsap in dst_tsaps:
            try:
                con = s7(ip, port)
                con.src_tsap = src_tsap
                con.dst_tsap = dst_tsap
                con.Connect()
                return src_tsap, dst_tsap

            except S7ProtocolError as e:
                pass

    return None

def GetIdentity(ip, port, src_tsap, dst_tsap):
    res = []

    szl_dict = {
        0x11:
                { 'title': 'Module Identification',
                  'indexes': {
                      1:'Module',
                      6:'Basic Hardware',
                      7:'Basic Firmware'
                  },
                  'packer': {
                      (1, 6): lambda(packet): "{0:s} v.{2:d}.{3:d}".format(*unpack('!20sHBBH', packet)),
                      (7,): lambda(packet): "{0:s} v.{3:d}.{4:d}.{5:d}".format(*unpack('!20sHBBBB', packet))
                  }
                },
        0x1c:
                { 'title': 'Component Identification',
                  'indexes': {
                      1: 'Name of the PLC',
                      2: 'Name of the module',
                      3: 'Plant identification',
                      4: 'Copyright',
                      5: 'Serial number of module',
                      6: 'Reserved for operating system',
                      7: 'Module type name',
                      8: 'Serial number of memory card',
                      9: 'Manufacturer and profile of a CPU module',
                      10:'OEM ID of a module',
                      11:'Location designation of a module'
                  },
                  'packer': {
                      (1, 2, 5): lambda(packet): "%s" % packet[:24],
                      (3, 7, 8): lambda(packet): "%s" % packet[:32],
                      (4,): lambda(packet): "%s" % packet[:26]
                  }
                }
    }

    con = s7(ip, port, src_tsap, dst_tsap)
    con.Connect()

    for szl_id in szl_dict.keys():
        try:
            entities = con.ReadSZL(szl_id)
        except S7Error:
            continue

        indexes = szl_dict[szl_id]['indexes']
        packers = szl_dict[szl_id]['packer']
        for item in entities:
            if len(item)>2:
                n, = unpack('!H', item[:2])
                item = item[2:]
                title = indexes[n] if indexes.has_key(n) else "Unknown (%d)" % n

                try:
                    packers_keys = [ i for i in packers.keys() if n in i ]
                    formated_item = packers[packers_keys[0]](item).strip('\x00')
                except (struct.error, IndexError) :
                    formated_item = StripUnprintable(item).strip('\x00')

                res.append("%s: %s\t(%s)" % (title.ljust(25), formated_item.ljust(30), item.encode('hex')))

    return res

def Scan(ip, port, options):
    src_tsaps = [ int(n.strip(), 0) for n in options.src_tsap.split(',') ] if options.src_tsap else [0x100, 0x200]
    dst_tsaps = [ int(n.strip(), 0) for n in options.dst_tsap.split(',') ] if options.dst_tsap else [0x102, 0x200, 0x201]

    res = ()
    try:
        res = BruteTsap(ip, port, src_tsaps, dst_tsaps)
    except socket.error as e:
        print "%s:%d %s" % (ip, port, e)

    if not res:
        return False

    print "%s:%d S7comm (src_tsap=0x%x, dst_tsap=0x%x)" % (ip, port, res[0], res[1])

    # sometimes unexpected exceptions occures, so try to get identity several time
    identities = []
    for attempt in [0, 1]:
        try:
            identities = GetIdentity(ip, port, res[0], res[1])
            break
        except (S7ProtocolError, socket.error) as e:
            print "  %s" % e

    for line in identities:
        print "  %s" % line

    return True

def AddOptions(parser):
    group = OptionGroup(parser, "S7 scanner options")
    group.add_option("--src-tsap", help="Try this src-tsap (list) (default: 0x100,0x200)", type="string", metavar="LIST")
    group.add_option("--dst-tsap", help="Try this dst-tsap (list) (default: 0x102,0x200,0x201)", type="string", metavar="LIST")
    parser.add_option_group(group)
第三个 plcscan.py
import modbus
import s7
import sys
from optparse import OptionParser
import socket
import struct
import time

def status(msg):
    sys.stderr.write(msg[:-1][:39].ljust(39,' ')+msg[-1:])

def get_ip_list(mask):
    try:
        net_addr,mask = mask.split('/')
        mask = int(mask)
        start, = struct.unpack('!L', socket.inet_aton(net_addr))
        start &= 0xFFFFFFFF << (32-mask)
        end = start | ( 0xFFFFFFFF >> mask )
        return [socket.inet_ntoa(struct.pack('!L', addr)) for addr in range(start+1, end)]
    except (struct.error,socket.error):
        return []

def scan(argv):
    parser = OptionParser(
        usage = "usage: %prog [options] [ip range]...",
        description = """Scan IP range for PLC devices. Support MODBUS and S7COMM protocols
        """
    )
    parser.add_option("--hosts-list", dest="hosts_file", help="Scan hosts from FILE", metavar="FILE")
    parser.add_option("--ports", dest="ports", help="Scan ports from PORTS", metavar="PORTS", default="102,502")
    parser.add_option("--timeout", dest="connect_timeout", help="Connection timeout (seconds)", metavar="TIMEOUT", type="float", default=1)

    modbus.AddOptions(parser)
    s7.AddOptions(parser)

    (options, args) = parser.parse_args(argv)

    scan_hosts = []
    if options.hosts_file:
        try:
            scan_hosts = [file.strip() for file in open(options.hosts_file, 'r')]
        except IOError:
            print "Can't open file %s" % options.hosts_file

    for ip in args:
        scan_hosts.extend(get_ip_list(ip) if '/' in ip else
                          [ip])

    scan_ports = [int(port) for port in options.ports.split(',')]

    if not scan_hosts:
        print "No targets to scan\n\n"
        parser.print_help()
        exit()

    status("Scan start...\n")
    for host in scan_hosts:
        splitted = host.split(':')
        host = splitted[0]
        if len(splitted)==2:
            ports = [int(splitted[1])]
        else:
            ports = scan_ports
        for port in ports:
            status("%s:%d...\r" % (host, port))
            try:
                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                time.sleep(1)
                sock.settimeout(options.connect_timeout)
                sock.connect((host,port))
                sock.close()
            except socket.error:
                continue

            if port == 102:
                res = s7.Scan(host, port, options)
            elif port == 502:
                res = modbus.Scan(host, port, options)
            else:
                res = modbus.Scan(host, port, options) or s7.Scan(host, port, options)

            if not res:
                print "%s:%d unknown protocol" % (host, port)


    status("Scan complete\n")

if __name__=="__main__":
    try:
        scan(sys.argv[1:])
    except KeyboardInterrupt:
        status("Scan terminated\n")