×

组播IP扫描Python脚本

ldpc520 ldpc520 发表于2026-05-11 07:43:52 浏览4 评论0

抢沙发发表评论

组播IP扫描Python脚本

import asyncio
import ipaddress
import logging
import os
import sys
import time
from datetime import datetime
from typing import List, Optional, Tuple
import aiohttp
from aiohttp import ClientTimeout, ClientError

# ================== 配置区域 ==================
# 修改为动态C段范围:135-255
START_C_SEGMENT = 145
END_C_SEGMENT = 255
BASE_IP = "125.66"
PORT = 8848
STREAM_PATH = "/rtp/239.94.2.52:5140"
CHECK_INTERVAL = 600  # 10分钟(秒)
MAX_CONCURRENT = 300  # 针对B段扫描优化并发
TIMEOUT = 5  # 连接超时(秒)
STREAM_TIMEOUT = 8  # 流验证超时(秒)
PROGRESS_INTERVAL = 500  # 每扫描500个IP显示一次进度
# ============================================

# 全局变量
found_ip: Optional[str] = None
scanned_count = 0
validated_count = 0
total_ips = 0
start_time = None

# 设置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler(sys.stdout),
        logging.FileHandler('scan.log', encoding='utf-8')
    ]
)
logger = logging.getLogger(__name__)


def ensure_ip_file():
    """确保 udp_ip.txt 存在"""
    if not os.path.exists("udp_ip.txt"):
        with open("udp_ip.txt", "w", encoding="utf-8") as f:
            pass
        logger.info("已创建 udp_ip.txt 文件")


def load_saved_ip() -> Optional[str]:
    """从 udp_ip.txt 读取已保存的 IP"""
    try:
        with open("udp_ip.txt", "r", encoding="utf-8") as f:
            content = f.read().strip()
            return content if content else None
    except FileNotFoundError:
        return None


def save_ip(ip: str):
    """保存有效 IP 到文件"""
    with open("udp_ip.txt", "w", encoding="utf-8") as f:
        f.write(f"{ip}:{PORT}")
    logger.info(f"💾 已保存有效 IP: {ip}:{PORT}")


def generate_ip_range(start_c: int, end_c: int, base_ip: str) -> List[str]:
    """生成指定C段范围内的所有IP地址"""
    ips = []
    for c_segment in range(start_c, end_c + 1):
        for d_segment in range(0, 256):
            ip = f"{base_ip}.{c_segment}.{d_segment}"
            ips.append(ip)
    return ips


async def check_port(ip: str) -> bool:
    """检查单个 IP 的端口是否开放"""
    try:
        reader, writer = await asyncio.wait_for(
            asyncio.open_connection(ip, PORT),
            timeout=TIMEOUT
        )
        writer.close()
        await writer.wait_closed()
        return True
    except (asyncio.TimeoutError, ConnectionRefusedError, OSError) as e:
        logger.debug(f"端口检查失败 {ip}:{PORT} - {type(e).__name__}")
        return False


async def verify_stream(session: aiohttp.ClientSession, ip: str) -> bool:
    """验证流媒体是否可以播放"""
    url = f"http://{ip}:{PORT}{STREAM_PATH}"
    try:
        # 只获取头部信息,不下载整个流
        async with session.get(url, timeout=ClientTimeout(total=STREAM_TIMEOUT)) as response:
            # 检查HTTP状态码
            if response.status == 200:
                logger.debug(f"流验证成功 {url} - 状态码: {response.status}")
                return True
            else:
                logger.debug(f"流验证失败 {url} - 状态码: {response.status}")
                return False
    except (ClientError, asyncio.TimeoutError) as e:
        logger.debug(f"流验证异常 {url} - {type(e).__name__}: {str(e)[:100]}")
        return False


async def scan_worker(
        semaphore: asyncio.Semaphore,
        ip_queue: asyncio.Queue,
        session: aiohttp.ClientSession
):
    """工作协程:从队列取 IP 并检查"""
    global scanned_count, validated_count, found_ip

    while True:
        ip = await ip_queue.get()
        if found_ip:  # 已找到有效 IP,停止工作
            ip_queue.task_done()
            break

        async with semaphore:
            scanned_count += 1

            # 第一步:检查端口是否开放
            if not await check_port(ip):
                ip_queue.task_done()
                continue

            validated_count += 1
            logger.debug(f"端口开放,开始验证流: {ip}:{PORT}")

            # 第二步:验证流媒体是否可播放
            if await verify_stream(session, ip):
                found_ip = ip
                logger.info(f"✅ 发现有效 IP(端口+流验证通过): {ip}:{PORT}")
                save_ip(ip)
                ip_queue.task_done()
                break

        ip_queue.task_done()


async def scan_network() -> Optional[str]:
    """扫描指定C段范围"""
    global scanned_count, validated_count, total_ips, start_time, found_ip
    found_ip = None
    scanned_count = 0
    validated_count = 0

    # 生成指定C段范围的IP
    ips = generate_ip_range(START_C_SEGMENT, END_C_SEGMENT, BASE_IP)
    total_ips = len(ips)
    start_time = datetime.now()

    logger.info(f"🚀 开始扫描 {BASE_IP}.{START_C_SEGMENT}.0-{BASE_IP}.{END_C_SEGMENT}.255:{PORT}{STREAM_PATH}")
    logger.info(f"   总IP数: {total_ips}, 并发数: {MAX_CONCURRENT}")

    # 创建HTTP会话
    connector = aiohttp.TCPConnector(limit=MAX_CONCURRENT, ssl=False)
    timeout = ClientTimeout(total=STREAM_TIMEOUT + 2)

    async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
        # 创建信号量控制并发
        semaphore = asyncio.Semaphore(MAX_CONCURRENT)
        ip_queue = asyncio.Queue(maxsize=MAX_CONCURRENT * 2)

        # 启动工作协程
        workers = []
        for _ in range(MAX_CONCURRENT):
            worker = asyncio.create_task(scan_worker(semaphore, ip_queue, session))
            workers.append(worker)

        # 填充队列
        for i, ip in enumerate(ips):
            if found_ip:
                break
            await ip_queue.put(ip)

            # 显示进度
            if i % PROGRESS_INTERVAL == 0 and i > 0:
                elapsed = (datetime.now() - start_time).total_seconds()
                rate = scanned_count / elapsed if elapsed > 0 else 0
                remaining = total_ips - scanned_count
                eta = remaining / rate if rate > 0 else 0
                logger.info(f"📊 进度: {scanned_count}/{total_ips} "
                            f"({scanned_count / total_ips * 100:.1f}%) "
                            f"速率: {rate:.1f}/s "
                            f"已验证: {validated_count} "
                            f"预计剩余: {eta:.0f}s")

        # 等待所有任务完成
        await ip_queue.join()

        # 取消剩余工作协程
        for worker in workers:
            worker.cancel()

    elapsed = (datetime.now() - start_time).total_seconds()
    logger.info(f"🏁 扫描完成,耗时 {elapsed:.2f} 秒")
    logger.info(f"   扫描: {scanned_count}/{total_ips}, 验证: {validated_count}")

    return found_ip


async def verify_saved_ip() -> bool:
    """验证已保存的 IP 是否仍然有效(包括流验证)"""
    saved_ip = load_saved_ip()
    if not saved_ip:
        return False

    ip = saved_ip.split(":")[0]
    logger.info(f"🔍 验证已保存的 IP: {saved_ip}")

    # 第一步:检查端口
    if not await check_port(ip):
        logger.warning(f"❌ 端口已关闭: {saved_ip}")
        return False

    # 第二步:验证流
    connector = aiohttp.TCPConnector(ssl=False)
    timeout = ClientTimeout(total=STREAM_TIMEOUT + 2)

    async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
        if await verify_stream(session, ip):
            logger.info(f"✅ 已保存的 IP 仍然有效: {saved_ip}")
            return True
        else:
            logger.warning(f"❌ 流验证失败: {saved_ip}")
            return False


async def main_loop():
    """主循环:每10分钟检查一次"""
    # ============== 关键修改:切换到脚本所在目录 ==============
    script_dir = os.path.dirname(os.path.abspath(__file__))
    os.chdir(script_dir)
    logger.info(f"切换到脚本所在目录: {script_dir}")
    # ====================================================
    
    ensure_ip_file()

    # 初始验证
    saved_ip = load_saved_ip()
    if saved_ip:
        logger.info(f"📄 检测到已保存的 IP: {saved_ip}")

    while True:
        logger.info("=" * 70)
        logger.info(f"⏰ 开始新一轮检查(间隔 {CHECK_INTERVAL // 60} 分钟)")
        logger.info(f"🎯 目标: {BASE_IP}.{START_C_SEGMENT}.0-{BASE_IP}.{END_C_SEGMENT}.255:{PORT}{STREAM_PATH}")

        # 1. 先检查已保存的 IP
        if await verify_saved_ip():
            logger.info("✅ IP 有效,跳过扫描")
        else:
            # 2. 需要重新扫描
            logger.info("🔄 开始扫描指定C段范围...")
            result = await scan_network()
            if not result:
                logger.warning("⚠️ 未找到有效 IP")
            else:
                logger.info(f"🎉 找到新有效 IP: {result}:{PORT}")

        # 3. 等待下一次检查
        next_check = datetime.fromtimestamp(time.time() + CHECK_INTERVAL)
        logger.info(f"⏳ 下次检查时间: {next_check.strftime('%Y-%m-%d %H:%M:%S')}")
        await asyncio.sleep(CHECK_INTERVAL)


if __name__ == "__main__":
    try:
        # 检查aiohttp是否安装
        import aiohttp
    except ImportError:
        print("❌ 缺少依赖库 aiohttp,请先安装:")
        print("   pip install aiohttp")
        sys.exit(1)

    try:
        asyncio.run(main_loop())
    except KeyboardInterrupt:
        logger.info("👋 程序被用户中断")
    except Exception as e:
        logger.error(f"💥 程序异常: {e}", exc_info=True)

在宝塔面板创建一个站点把脚本上传到相应的文件夹中,如scan_gdip.py,PHP版本7+以上吧

查看Python解释器的位置,执行命令:

which python

which python3

一般都是返回:/usr/bin/python3

方法一

哪么在计划任务里的shell执行脚本就是:/usr/bin/python3 /你的py程序路径/scan_gdip.py,如:

/usr/bin/python3 /www/wwwroot/103.193.151.158_8027/scan/scan_gdip.py

不知路径是什么,就对着你要执行的程序鼠标右键属性,复制它显示的路径就是了。

方法二(强烈推荐)

pkill -f scan_gdip.py  #结束上一次执行的进程
cd /www/wwwroot/192.168.9.105_6607/scan/gd && python3 scan_gdip.py #先进入到脚本目录再执行定时任务

定时时间为一天一次(启动脚本),因为脚本本身有循环检测ip的间隔时间
执行后就会脚本目录下生成一个udp_ip.txt文档的。使用PHP来代理这个IP就可以了

PHP代码

<?php
// udp.php
// 获取要转发的频道ID
$channel = $_GET['id'] ?? '';

if (empty($channel)) {
    header("HTTP/1.1 400 Bad Request");
    echo "缺少频道参数";
    exit;
}

// 读取同目录下的udp_ip.txt文件
$configFile = __DIR__ . '/ip.txt';
if (!file_exists($configFile)) {
    header("HTTP/1.1 500 Internal Server Error");
    echo "配置文件不存在";
    exit;
}

// 读取服务器地址
$server = trim(file_get_contents($configFile));
if (empty($server)) {
    header("HTTP/1.1 500 Internal Server Error");
    echo "服务器地址配置为空";
    exit;
}

// 构建目标URL
$targetUrl = "http://{$server}/udp/{$channel}";

// 设置响应头,告诉客户端这是M3U8流
header('Content-Type: application/vnd.apple.mpegurl');
header('Access-Control-Allow-Origin: *');
header('Access-Control-Allow-Methods: GET, OPTIONS');

// 如果是OPTIONS请求,直接返回
if ($_SERVER['REQUEST_METHOD'] === 'OPTIONS') {
    exit;
}

// 重定向到实际地址
header("Location: {$targetUrl}", true, 302);
exit;

访问格式:http://服务器ip:站点端口/脚本所在目录文件夹名称/scip.php?id=239.94.2.49:5140
后面的239.94.2.49:5140就是某电信组播的id