组播IP扫描Python脚本
import asyncio
import ipaddress
import logging
import os
import sys
import time
from datetime import datetime
from typing import List, Optional, Tuple
import aiohttp
from aiohttp import ClientTimeout, ClientError
# ================== 配置区域 ==================
# 修改为动态C段范围:135-255
START_C_SEGMENT = 145
END_C_SEGMENT = 255
BASE_IP = "125.66"
PORT = 8848
STREAM_PATH = "/rtp/239.94.2.52:5140"
CHECK_INTERVAL = 600 # 10分钟(秒)
MAX_CONCURRENT = 300 # 针对B段扫描优化并发
TIMEOUT = 5 # 连接超时(秒)
STREAM_TIMEOUT = 8 # 流验证超时(秒)
PROGRESS_INTERVAL = 500 # 每扫描500个IP显示一次进度
# ============================================
# 全局变量
found_ip: Optional[str] = None
scanned_count = 0
validated_count = 0
total_ips = 0
start_time = None
# 设置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler('scan.log', encoding='utf-8')
]
)
logger = logging.getLogger(__name__)
def ensure_ip_file():
"""确保 udp_ip.txt 存在"""
if not os.path.exists("udp_ip.txt"):
with open("udp_ip.txt", "w", encoding="utf-8") as f:
pass
logger.info("已创建 udp_ip.txt 文件")
def load_saved_ip() -> Optional[str]:
"""从 udp_ip.txt 读取已保存的 IP"""
try:
with open("udp_ip.txt", "r", encoding="utf-8") as f:
content = f.read().strip()
return content if content else None
except FileNotFoundError:
return None
def save_ip(ip: str):
"""保存有效 IP 到文件"""
with open("udp_ip.txt", "w", encoding="utf-8") as f:
f.write(f"{ip}:{PORT}")
logger.info(f"💾 已保存有效 IP: {ip}:{PORT}")
def generate_ip_range(start_c: int, end_c: int, base_ip: str) -> List[str]:
"""生成指定C段范围内的所有IP地址"""
ips = []
for c_segment in range(start_c, end_c + 1):
for d_segment in range(0, 256):
ip = f"{base_ip}.{c_segment}.{d_segment}"
ips.append(ip)
return ips
async def check_port(ip: str) -> bool:
"""检查单个 IP 的端口是否开放"""
try:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(ip, PORT),
timeout=TIMEOUT
)
writer.close()
await writer.wait_closed()
return True
except (asyncio.TimeoutError, ConnectionRefusedError, OSError) as e:
logger.debug(f"端口检查失败 {ip}:{PORT} - {type(e).__name__}")
return False
async def verify_stream(session: aiohttp.ClientSession, ip: str) -> bool:
"""验证流媒体是否可以播放"""
url = f"http://{ip}:{PORT}{STREAM_PATH}"
try:
# 只获取头部信息,不下载整个流
async with session.get(url, timeout=ClientTimeout(total=STREAM_TIMEOUT)) as response:
# 检查HTTP状态码
if response.status == 200:
logger.debug(f"流验证成功 {url} - 状态码: {response.status}")
return True
else:
logger.debug(f"流验证失败 {url} - 状态码: {response.status}")
return False
except (ClientError, asyncio.TimeoutError) as e:
logger.debug(f"流验证异常 {url} - {type(e).__name__}: {str(e)[:100]}")
return False
async def scan_worker(
semaphore: asyncio.Semaphore,
ip_queue: asyncio.Queue,
session: aiohttp.ClientSession
):
"""工作协程:从队列取 IP 并检查"""
global scanned_count, validated_count, found_ip
while True:
ip = await ip_queue.get()
if found_ip: # 已找到有效 IP,停止工作
ip_queue.task_done()
break
async with semaphore:
scanned_count += 1
# 第一步:检查端口是否开放
if not await check_port(ip):
ip_queue.task_done()
continue
validated_count += 1
logger.debug(f"端口开放,开始验证流: {ip}:{PORT}")
# 第二步:验证流媒体是否可播放
if await verify_stream(session, ip):
found_ip = ip
logger.info(f"✅ 发现有效 IP(端口+流验证通过): {ip}:{PORT}")
save_ip(ip)
ip_queue.task_done()
break
ip_queue.task_done()
async def scan_network() -> Optional[str]:
"""扫描指定C段范围"""
global scanned_count, validated_count, total_ips, start_time, found_ip
found_ip = None
scanned_count = 0
validated_count = 0
# 生成指定C段范围的IP
ips = generate_ip_range(START_C_SEGMENT, END_C_SEGMENT, BASE_IP)
total_ips = len(ips)
start_time = datetime.now()
logger.info(f"🚀 开始扫描 {BASE_IP}.{START_C_SEGMENT}.0-{BASE_IP}.{END_C_SEGMENT}.255:{PORT}{STREAM_PATH}")
logger.info(f" 总IP数: {total_ips}, 并发数: {MAX_CONCURRENT}")
# 创建HTTP会话
connector = aiohttp.TCPConnector(limit=MAX_CONCURRENT, ssl=False)
timeout = ClientTimeout(total=STREAM_TIMEOUT + 2)
async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
# 创建信号量控制并发
semaphore = asyncio.Semaphore(MAX_CONCURRENT)
ip_queue = asyncio.Queue(maxsize=MAX_CONCURRENT * 2)
# 启动工作协程
workers = []
for _ in range(MAX_CONCURRENT):
worker = asyncio.create_task(scan_worker(semaphore, ip_queue, session))
workers.append(worker)
# 填充队列
for i, ip in enumerate(ips):
if found_ip:
break
await ip_queue.put(ip)
# 显示进度
if i % PROGRESS_INTERVAL == 0 and i > 0:
elapsed = (datetime.now() - start_time).total_seconds()
rate = scanned_count / elapsed if elapsed > 0 else 0
remaining = total_ips - scanned_count
eta = remaining / rate if rate > 0 else 0
logger.info(f"📊 进度: {scanned_count}/{total_ips} "
f"({scanned_count / total_ips * 100:.1f}%) "
f"速率: {rate:.1f}/s "
f"已验证: {validated_count} "
f"预计剩余: {eta:.0f}s")
# 等待所有任务完成
await ip_queue.join()
# 取消剩余工作协程
for worker in workers:
worker.cancel()
elapsed = (datetime.now() - start_time).total_seconds()
logger.info(f"🏁 扫描完成,耗时 {elapsed:.2f} 秒")
logger.info(f" 扫描: {scanned_count}/{total_ips}, 验证: {validated_count}")
return found_ip
async def verify_saved_ip() -> bool:
"""验证已保存的 IP 是否仍然有效(包括流验证)"""
saved_ip = load_saved_ip()
if not saved_ip:
return False
ip = saved_ip.split(":")[0]
logger.info(f"🔍 验证已保存的 IP: {saved_ip}")
# 第一步:检查端口
if not await check_port(ip):
logger.warning(f"❌ 端口已关闭: {saved_ip}")
return False
# 第二步:验证流
connector = aiohttp.TCPConnector(ssl=False)
timeout = ClientTimeout(total=STREAM_TIMEOUT + 2)
async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
if await verify_stream(session, ip):
logger.info(f"✅ 已保存的 IP 仍然有效: {saved_ip}")
return True
else:
logger.warning(f"❌ 流验证失败: {saved_ip}")
return False
async def main_loop():
"""主循环:每10分钟检查一次"""
# ============== 关键修改:切换到脚本所在目录 ==============
script_dir = os.path.dirname(os.path.abspath(__file__))
os.chdir(script_dir)
logger.info(f"切换到脚本所在目录: {script_dir}")
# ====================================================
ensure_ip_file()
# 初始验证
saved_ip = load_saved_ip()
if saved_ip:
logger.info(f"📄 检测到已保存的 IP: {saved_ip}")
while True:
logger.info("=" * 70)
logger.info(f"⏰ 开始新一轮检查(间隔 {CHECK_INTERVAL // 60} 分钟)")
logger.info(f"🎯 目标: {BASE_IP}.{START_C_SEGMENT}.0-{BASE_IP}.{END_C_SEGMENT}.255:{PORT}{STREAM_PATH}")
# 1. 先检查已保存的 IP
if await verify_saved_ip():
logger.info("✅ IP 有效,跳过扫描")
else:
# 2. 需要重新扫描
logger.info("🔄 开始扫描指定C段范围...")
result = await scan_network()
if not result:
logger.warning("⚠️ 未找到有效 IP")
else:
logger.info(f"🎉 找到新有效 IP: {result}:{PORT}")
# 3. 等待下一次检查
next_check = datetime.fromtimestamp(time.time() + CHECK_INTERVAL)
logger.info(f"⏳ 下次检查时间: {next_check.strftime('%Y-%m-%d %H:%M:%S')}")
await asyncio.sleep(CHECK_INTERVAL)
if __name__ == "__main__":
try:
# 检查aiohttp是否安装
import aiohttp
except ImportError:
print("❌ 缺少依赖库 aiohttp,请先安装:")
print(" pip install aiohttp")
sys.exit(1)
try:
asyncio.run(main_loop())
except KeyboardInterrupt:
logger.info("👋 程序被用户中断")
except Exception as e:
logger.error(f"💥 程序异常: {e}", exc_info=True)
在宝塔面板创建一个站点把脚本上传到相应的文件夹中,如scan_gdip.py,PHP版本7+以上吧
查看Python解释器的位置,执行命令:
which python
或
which python3
一般都是返回:/usr/bin/python3
方法一
哪么在计划任务里的shell执行脚本就是:/usr/bin/python3 /你的py程序路径/scan_gdip.py,如:
/usr/bin/python3 /www/wwwroot/103.193.151.158_8027/scan/scan_gdip.py
不知路径是什么,就对着你要执行的程序鼠标右键属性,复制它显示的路径就是了。
方法二(强烈推荐)
pkill -f scan_gdip.py #结束上一次执行的进程
cd /www/wwwroot/192.168.9.105_6607/scan/gd && python3 scan_gdip.py #先进入到脚本目录再执行定时任务
定时时间为一天一次(启动脚本),因为脚本本身有循环检测ip的间隔时间
执行后就会脚本目录下生成一个udp_ip.txt文档的。使用PHP来代理这个IP就可以了
PHP代码
<?php
// udp.php
// 获取要转发的频道ID
$channel = $_GET['id'] ?? '';
if (empty($channel)) {
header("HTTP/1.1 400 Bad Request");
echo "缺少频道参数";
exit;
}
// 读取同目录下的udp_ip.txt文件
$configFile = __DIR__ . '/ip.txt';
if (!file_exists($configFile)) {
header("HTTP/1.1 500 Internal Server Error");
echo "配置文件不存在";
exit;
}
// 读取服务器地址
$server = trim(file_get_contents($configFile));
if (empty($server)) {
header("HTTP/1.1 500 Internal Server Error");
echo "服务器地址配置为空";
exit;
}
// 构建目标URL
$targetUrl = "http://{$server}/udp/{$channel}";
// 设置响应头,告诉客户端这是M3U8流
header('Content-Type: application/vnd.apple.mpegurl');
header('Access-Control-Allow-Origin: *');
header('Access-Control-Allow-Methods: GET, OPTIONS');
// 如果是OPTIONS请求,直接返回
if ($_SERVER['REQUEST_METHOD'] === 'OPTIONS') {
exit;
}
// 重定向到实际地址
header("Location: {$targetUrl}", true, 302);
exit;
访问格式:http://服务器ip:站点端口/脚本所在目录文件夹名称/scip.php?id=239.94.2.49:5140
后面的239.94.2.49:5140就是某电信组播的id