您好,欢迎来到爱够旅游网。
搜索
您的当前位置:首页python3 raw socket syn半连接扫描

python3 raw socket syn半连接扫描

来源:爱够旅游网

如何使用python完成syn半连接扫描

以前一直使用masscan作为端口探测的神器,但是发现在某些linux(ubuntu)下,masscan可能存在一些问题,比如扫描完成了,等待时间变成负的也不结束,如果单独使用可以接受,但是内置到项目中就比较难受了。还有其他路由问题导致masscan卡住,但是很少遇到。。。。。。

环境

什么是SYN扫描

tcp的三次握手就不多说了,而第一次握手发送的就是syn包,扫描端口发送SYN 数据包,如果能够收到SYN+ACK 数据包,则代表此端口开放,如收到RST 数据包,则证明此端口关闭。构造syn包需要构造ip头部信息和tcp包信息。

构造IP头部信息

ip 头部信息需要源ip和目标ip
根据wireshark抓包构造一个ip头部包

# 校验和函数
def checksum(msg):
    """TCP 以及 IP校验和"""
    s = 0
    # 每次取2个字节
    for i in range(0, len(msg), 2):
        w = (msg[i] << 8) + (msg[i + 1])
        s = s + w
    s = (s >> 16) + (s & 0xffff)
    s = ~s & 0xffff
    return s


# 构造IP头部信息
hl_version = (4 << 4) + 5  # version 4
diff = 0
total_len = 44
ident = random.randint(18000, 65535)
flags = 0
offset = 0
ttl = 255  #  ttl
protocol = 6  # 协议类型
check = 0  # 校验和,在没校验之前先置0
src_addr = socket.inet_aton("127.0.0.1")  # 源ip
dst_addr = socket.inet_aton("127.0.0.1")  # 目标ip
buffer = struct.pack('!BBHHBBBBH4s4s', hl_version, diff, total_len, ident, flags, offset, ttl, protocol, check,
                     src_addr, dst_addr)
checkSum = checksum(buffer)
_header = struct.pack('!BBHHBBBBH4s4s', hl_version, diff, total_len, ident, flags, offset, ttl, protocol,
                      checkSum, src_addr, dst_addr)

构造SYN TCP头部信息

tcp 头部信息包含源端口和目标端口 同时还有伪头部

seq = seq
ack_seq = 0
doff = 5
check = 0
offset_res = (doff << 4) + 0
fin, syn, rst, psh, ack, urg = (0, 1, 0, 0, 0, 0) 
tcp_flags = fin + (syn << 1) + (rst << 2) + (psh << 3) + (ack << 4) + (urg << 5)
window = 1200
urg_ptr = 0
tcp_header = struct.pack('!HHLLBBHHH', self.src_port, dst_port, seq, ack_seq, offset_res, tcp_flags, window,
                         check, urg_ptr)
# 伪头部选项 源IP地址(4字节)、目的IP地址(4字节)、协议(2字节)、TCP/UDP包长(2字节)
source_address = socket.inet_aton(self.src_ip)
dest_address = socket.inet_aton(dst_ip)
protocol = 6
tcp_length = len(tcp_header)
psh = struct.pack('!4s4sHH', source_address, dest_address, protocol, tcp_length)
psh = psh + tcp_header
tcp_checksum = checksum(psh)
tcp_header = struct.pack('!HHLLBBHHH', self.src_port, dst_port, seq, ack_seq, offset_res, tcp_flags, window,
                         tcp_checksum, urg_ptr)

完整发送代码

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-

import os
import queue
import random
import socket
import struct
import threading
import time


def checksum(msg):
    """TCP 以及 IP校验和"""
    s = 0
    # 每次取2个字节
    for i in range(0, len(msg), 2):
        w = (msg[i] << 8) + (msg[i + 1])
        s = s + w
    s = (s >> 16) + (s & 0xffff)
    s = ~s & 0xffff
    return s


class SynScan:
    def __init__(self, ip_list, port_list, rate=2000, timeout=5):
        self.ips = ip_list
        self.ports = port_list
        self.timeout = timeout
        self.socket = self._socket()
        self.rate = rate
        self.src_ip = None
        self.finished = False

        if not self.get_local_ip():
            print("请检查网络......")
            os._exit(1)
        self.seq = random.randint(1000000000, 2000000000)

	# 获取本地IP
    def get_local_ip(self):
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        try:
            s.connect(("8.8.8.8", 0))
            self.src_ip = s.getsockname()[0]
            return True
        except:
            pass
        return False
	
	# 创建IP包
    def ip_headers(self, src_ip, dst_ip):
        hl_version = (4 << 4) + 5
        diff = 0
        total_len = 40
        ident = random.randint(18000, 65535)
        flags = 0
        offset = 0
        ttl = 255
        protocol = 6
        check = 0
        src_addr = socket.inet_aton(src_ip)
        dst_addr = socket.inet_aton(dst_ip)
        buffer = struct.pack('!BBHHBBBBH4s4s', hl_version, diff, total_len, ident, flags, offset, ttl, protocol, check, src_addr, dst_addr)
        checkSum = checksum(buffer)
        _header = struct.pack('!BBHHBBBBH4s4s', hl_version, diff, total_len, ident, flags, offset, ttl, protocol, checkSum, src_addr, dst_addr)
        return _header
	
	# 创建TCP包
    def tcp_headers(self,  src_ip, dst_ip, src_port, dst_port):
        seq = self.seq
        ack_seq = 0
        doff = 5
        check = 0
        offset_res = (doff << 4) + 0
        fin, syn, rst, psh, ack, urg = (0, 1, 0, 0, 0, 0)
        tcp_flags = fin + (syn << 1) + (rst << 2) + (psh << 3) + (ack << 4) + (urg << 5)
        window = 1024
        urg_ptr = 0
        tcp_header = struct.pack('!HHLLBBHHH', src_port, dst_port, seq, ack_seq, offset_res, tcp_flags, window,  check, urg_ptr)
        # 伪头部选项 源IP地址(4字节)、目的IP地址(4字节)、协议(2字节)、TCP/UDP包长(2字节)
        source_address = socket.inet_aton(src_ip)
        dest_address = socket.inet_aton(dst_ip)
        protocol = 6
        tcp_length = len(tcp_header)
        psh = struct.pack('!4s4sHH', source_address, dest_address, protocol, tcp_length)
        psh = psh + tcp_header
        tcp_checksum = checksum(psh)
        tcp_header = struct.pack('!HHLLBBHHH', src_port, dst_port, seq, ack_seq, offset_res, tcp_flags, window, tcp_checksum, urg_ptr)
        return tcp_header

    def _socket(self):
        try:
            s = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_TCP)  # raw socket 创建
            s.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)  # 可自定义 IP头部信息
            s.settimeout(self.timeout)

            return s
        except:
            # print(e)
            return self._socket()

    def recv_packet(self, src_port):
        while 1:
            try:
                recv_packet, addr = self.socket.recvfrom(1024)
                _ip = addr[0]  # ip 信息
                ack_seq = struct.unpack("!L", recv_packet[28:32])[0]  # 回答的seq
                s_port = struct.unpack("!H", recv_packet[20:22])[0]  # 包内源端口
                d_port = struct.unpack("!H", recv_packet[22:24])[0]  # 包内目的端口
                if src_port == d_port and ack_seq == self.seq + 1:
                    if recv_packet[33] == 18:   #  18 代表回包 syn 和 ack 确认包, 端口开放
                        print(f"地址:{_ip}  开放端口:{s_port}")
            except:
                pass
            if self.finished:
                break

    def start(self):
        src_port = random.randint(30000, 60000)  # 随机一个源端口
        t = threading.Thread(target=self.recv_packet, args=(src_port, ))
        t.start()
        time_sleep = 1 / self.rate
        failed_packet = queue.Queue()
        for ip in self.ips:
            for port in self.ports:
                packet = self.ip_headers(self.src_ip, ip) + self.tcp_headers(self.src_ip, ip, src_port, port)
                try:
                    self.socket.sendto(packet, (ip, 0))  # 因为某些操作系统协议栈问题,可能会失败
                except Exception as e:
                    failed_packet.put((ip, packet))
                time.sleep(time_sleep)

        while not failed_packet.empty():  # 重新发送失败的队列数据
            ip, packet = failed_packet.get()
            try:
                self.socket.sendto(packet, (ip, 0))
            except:
                failed_packet.put((ip, packet))
            time.sleep(time_sleep)

        time.sleep(self.timeout+1)
        self.finished = True
        self.socket.close()


if __name__ == '__main__':

    ip_list = ['172.16.0.' + str(i) for i in range(1, 255)]
    port_list = [135, 139, 445, 33, 22, 21, 23, 9100, 9200, 7001]
    synScan = SynScan(ip_list, port_list)
    synScan.start()



结果如下

[root@localhost ]# python3 synscan.py 
地址:172.16.0.10  开放端口:135
地址:172.16.0.10  开放端口:139
地址:172.16.0.10  开放端口:445
地址:172.16.0.10  开放端口:21
地址:172.16.0.28  开放端口:135
地址:172.16.0.28  开放端口:139
地址:172.16.0.28  开放端口:445
地址:172.16.0.28  开放端口:33
地址:172.16.0.31  开放端口:135
地址:172.16.0.31  开放端口:21
地址:172.16.0.38  开放端口:135
地址:172.16.0.38  开放端口:139
地址:172.16.0.38  开放端口:445
地址:172.16.0.47  开放端口:135
地址:172.16.0.47  开放端口:139
地址:172.16.0.47  开放端口:445
地址:172.16.0.53  开放端口:135
地址:172.16.0.53  开放端口:139
地址:172.16.0.53  开放端口:445
地址:172.16.0.  开放端口:135
地址:172.16.0.  开放端口:139
地址:172.16.0.  开放端口:445
地址:172.16.0.  开放端口:33
地址:172.16.0.55  开放端口:135
地址:172.16.0.55  开放端口:139
地址:172.16.0.55  开放端口:445
地址:172.16.0.  开放端口:22
地址:172.16.0.44  开放端口:139
地址:172.16.0.44  开放端口:445

因篇幅问题不能全部显示,请点此查看更多更全内容

Copyright © 2019- igbc.cn 版权所有 湘ICP备2023023988号-5

违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com

本站由北京市万商天勤律师事务所王兴未律师提供法律服务