We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
使用特定ip当作执行命令的指令 用于爬虫无感切换IP
例如访问特定的 http://1.0.9.9 用于执行一段shell 访问一个切换ip的http api / 或者切换一个 socks5的账号密码 这对爬虫程序非常有用
AI的例子
import asyncio import aiohttp import socket import struct import logging import random from aiohttp import web logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class ProxyServer: def __init__(self, host='0.0.0.0', port=8080): self.host = host self.port = port self.upstream_proxies = [] # 上游代理池 self.current_ip = None async def handle_socks5(self, reader, writer): # SOCKS5 协议实现 # 身份验证 await reader.read(2) writer.write(b'\x05\x00') await writer.drain() # 请求 version, cmd, _, address_type = struct.unpack('!BBBB', await reader.read(4)) if address_type == 1: # IPv4 addr = socket.inet_ntoa(await reader.read(4)) elif address_type == 3: # Domain name domain_length = (await reader.read(1))[0] addr = (await reader.read(domain_length)).decode() else: writer.close() return port = struct.unpack('!H', await reader.read(2))[0] try: if addr == '1.0.9.9': # 特殊 IP 地址,触发代理切换 success = await self.switch_proxy() if success: writer.write(b'\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00') else: writer.write(b'\x05\x01\x00\x01\x00\x00\x00\x00\x00\x00') await writer.drain() writer.close() return # 连接到目标服务器(通过上游代理) upstream_proxy = random.choice(self.upstream_proxies) remote_reader, remote_writer = await asyncio.open_connection( upstream_proxy['host'], upstream_proxy['port']) writer.write(b'\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00') await writer.drain() await self.proxy_data(reader, writer, remote_reader, remote_writer) except Exception as e: logger.error(f"Error in SOCKS5 proxy: {e}") writer.close() async def handle_http(self, request): if request.path == 'http://1.0.9.9/': # 特殊 URL,触发代理切换 success = await self.switch_proxy() if success: return web.Response(text="Proxy switched successfully", status=200) else: return web.Response(text="Failed to switch proxy", status=500) # 处理普通 HTTP 请求 upstream_proxy = random.choice(self.upstream_proxies) async with aiohttp.ClientSession() as session: async with session.request( method=request.method, url=request.url, headers=request.headers, data=await request.read(), proxy=f"http://{upstream_proxy['host']}:{upstream_proxy['port']}" ) as resp: headers = dict(resp.headers) body = await resp.read() return web.Response(body=body, headers=headers, status=resp.status) async def proxy_data(self, reader, writer, remote_reader, remote_writer): async def pipe(reader, writer): try: while True: data = await reader.read(8192) if not data: break writer.write(data) await writer.drain() except Exception as e: logger.error(f"Error in proxy_data: {e}") finally: writer.close() await asyncio.gather( pipe(reader, remote_writer), pipe(remote_reader, writer) ) async def switch_proxy(self): # 实现代理切换逻辑 # 这里可以是 HTTP API 调用、重新拨号或其他方式 # 示例使用 HTTP API async with aiohttp.ClientSession() as session: async with session.get('http://api.example.com/switch_proxy') as resp: if resp.status == 200: new_ip = await resp.text() if new_ip != self.current_ip: self.current_ip = new_ip logger.info(f"Switched to new IP: {self.current_ip}") return True return False async def start_server(self): socks_server = await asyncio.start_server( self.handle_socks5, self.host, self.port) app = web.Application() app.router.add_route('*', '/{path:.*}', self.handle_http) runner = web.AppRunner(app) await runner.setup() http_server = web.TCPSite(runner, self.host, self.port + 1) await asyncio.gather( socks_server.serve_forever(), http_server.start() ) if __name__ == '__main__': proxy_server = ProxyServer() proxy_server.upstream_proxies = [ {'host': 'proxy1.example.com', 'port': 8080}, {'host': 'proxy2.example.com', 'port': 8080}, # 添加更多上游代理 ] asyncio.run(proxy_server.start_server())
The text was updated successfully, but these errors were encountered:
No branches or pull requests
使用特定ip当作执行命令的指令 用于爬虫无感切换IP
例如访问特定的 http://1.0.9.9 用于执行一段shell 访问一个切换ip的http api / 或者切换一个 socks5的账号密码 这对爬虫程序非常有用
AI的例子
The text was updated successfully, but these errors were encountered: