File size: 12,523 Bytes
b190b45 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 |
"""
Smart Proxy/DNS Manager
Handles proxy rotation for sanctioned exchanges (Binance, etc.)
"""
import asyncio
import aiohttp
import random
import time
from typing import List, Dict, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
@dataclass
class ProxyServer:
"""Proxy server configuration"""
url: str
protocol: str = "http" # http, https, socks5
username: Optional[str] = None
password: Optional[str] = None
success_count: int = 0
failure_count: int = 0
last_used: Optional[datetime] = None
avg_response_time: float = 0.0
is_active: bool = True
def get_proxy_url(self) -> str:
"""Get full proxy URL with auth"""
if self.username and self.password:
return f"{self.protocol}://{self.username}:{self.password}@{self.url}"
return f"{self.protocol}://{self.url}"
def record_success(self, response_time: float):
"""Record successful proxy usage"""
self.success_count += 1
self.last_used = datetime.now()
if self.avg_response_time == 0:
self.avg_response_time = response_time
else:
self.avg_response_time = 0.7 * self.avg_response_time + 0.3 * response_time
def record_failure(self):
"""Record proxy failure"""
self.failure_count += 1
self.last_used = datetime.now()
# Deactivate if too many failures
if self.failure_count > 10:
self.is_active = False
def get_success_rate(self) -> float:
"""Get success rate"""
total = self.success_count + self.failure_count
return self.success_count / max(total, 1)
@dataclass
class DNSServer:
"""Smart DNS server"""
address: str
port: int = 53
protocol: str = "udp" # udp, tcp, doh (DNS over HTTPS)
is_active: bool = True
success_count: int = 0
failure_count: int = 0
def get_address(self) -> str:
"""Get DNS server address"""
return f"{self.address}:{self.port}"
class SmartProxyManager:
"""
Smart proxy manager with rotation and health tracking
Supports multiple proxy types and smart DNS
"""
def __init__(self):
self.proxies: List[ProxyServer] = []
self.dns_servers: List[DNSServer] = []
self.current_proxy_index = 0
self.rotation_enabled = True
self.rotation_interval = 60 # Rotate every 60 seconds
self.last_rotation = datetime.now()
# Initialize with free/public proxies
self._load_default_proxies()
self._load_default_dns()
logger.info(f"β
SmartProxyManager initialized with {len(self.proxies)} proxies and {len(self.dns_servers)} DNS servers")
def _load_default_proxies(self):
"""Load default free proxy list"""
# Free proxy list (you can expand this)
default_proxies = [
# Public HTTP proxies (example - replace with real ones)
"proxy1.example.com:8080",
"proxy2.example.com:3128",
# SOCKS5 proxies
"socks5://proxy3.example.com:1080",
]
# Note: In production, use a proxy provider service
# or rotate through a large list of tested proxies
for proxy_url in default_proxies:
if proxy_url.startswith("socks5://"):
protocol = "socks5"
url = proxy_url.replace("socks5://", "")
else:
protocol = "http"
url = proxy_url
self.proxies.append(ProxyServer(
url=url,
protocol=protocol
))
# Add environment-based proxies
import os
env_proxy = os.getenv("PROXY_URL")
if env_proxy:
self.proxies.append(ProxyServer(url=env_proxy, protocol="http"))
def _load_default_dns(self):
"""Load default smart DNS servers"""
# Public DNS servers
self.dns_servers = [
DNSServer(address="1.1.1.1", port=53), # Cloudflare
DNSServer(address="8.8.8.8", port=53), # Google
DNSServer(address="9.9.9.9", port=53), # Quad9
DNSServer(address="208.67.222.222", port=53), # OpenDNS
]
async def get_proxy(self) -> Optional[str]:
"""Get next available proxy with rotation"""
if not self.proxies:
logger.warning("β οΈ No proxies configured")
return None
# Check if rotation needed
if self.rotation_enabled:
now = datetime.now()
if (now - self.last_rotation).seconds > self.rotation_interval:
self._rotate_proxy()
self.last_rotation = now
# Get active proxies
active_proxies = [p for p in self.proxies if p.is_active]
if not active_proxies:
logger.error("β All proxies are inactive!")
return None
# Sort by success rate and response time
active_proxies.sort(
key=lambda p: (p.get_success_rate(), -p.avg_response_time),
reverse=True
)
# Get best proxy
best_proxy = active_proxies[0]
proxy_url = best_proxy.get_proxy_url()
logger.debug(f"π Using proxy: {best_proxy.url} (success rate: {best_proxy.get_success_rate():.1%})")
return proxy_url
def _rotate_proxy(self):
"""Rotate to next proxy"""
if len(self.proxies) > 1:
self.current_proxy_index = (self.current_proxy_index + 1) % len(self.proxies)
logger.debug(f"π Rotated to proxy #{self.current_proxy_index}")
async def test_proxy(self, proxy: ProxyServer, test_url: str = "https://httpbin.org/ip") -> bool:
"""Test if proxy is working"""
try:
start_time = time.time()
async with aiohttp.ClientSession() as session:
async with session.get(
test_url,
proxy=proxy.get_proxy_url(),
timeout=aiohttp.ClientTimeout(total=10)
) as response:
if response.status == 200:
response_time = time.time() - start_time
proxy.record_success(response_time)
logger.info(f"β
Proxy {proxy.url} is working ({response_time:.2f}s)")
return True
proxy.record_failure()
return False
except Exception as e:
proxy.record_failure()
logger.warning(f"β οΈ Proxy {proxy.url} failed: {e}")
return False
async def test_all_proxies(self):
"""Test all proxies and update their status"""
logger.info("π§ͺ Testing all proxies...")
tasks = [self.test_proxy(proxy) for proxy in self.proxies]
results = await asyncio.gather(*tasks, return_exceptions=True)
active_count = sum(1 for r in results if r is True)
logger.info(f"β
{active_count}/{len(self.proxies)} proxies are active")
def add_proxy(self, url: str, protocol: str = "http", username: str = None, password: str = None):
"""Add a new proxy"""
proxy = ProxyServer(
url=url,
protocol=protocol,
username=username,
password=password
)
self.proxies.append(proxy)
logger.info(f"β Added proxy: {url}")
def remove_proxy(self, url: str):
"""Remove a proxy"""
self.proxies = [p for p in self.proxies if p.url != url]
logger.info(f"β Removed proxy: {url}")
def get_dns_server(self) -> str:
"""Get next DNS server"""
active_dns = [d for d in self.dns_servers if d.is_active]
if not active_dns:
return "8.8.8.8:53" # Fallback to Google DNS
# Random selection
dns = random.choice(active_dns)
return dns.get_address()
async def resolve_with_smart_dns(self, hostname: str) -> Optional[str]:
"""Resolve hostname using smart DNS"""
import socket
dns_server = self.get_dns_server()
logger.debug(f"π Resolving {hostname} using DNS: {dns_server}")
try:
# Use system DNS (we can't easily override without dnspython)
ip = socket.gethostbyname(hostname)
logger.debug(f"β
Resolved {hostname} -> {ip}")
return ip
except socket.gaierror as e:
logger.error(f"β DNS resolution failed for {hostname}: {e}")
return None
def get_status_report(self) -> Dict:
"""Get proxy manager status"""
active_proxies = [p for p in self.proxies if p.is_active]
return {
"total_proxies": len(self.proxies),
"active_proxies": len(active_proxies),
"inactive_proxies": len(self.proxies) - len(active_proxies),
"dns_servers": len(self.dns_servers),
"rotation_enabled": self.rotation_enabled,
"rotation_interval": self.rotation_interval,
"proxies": [
{
"url": p.url,
"protocol": p.protocol,
"is_active": p.is_active,
"success_rate": p.get_success_rate(),
"avg_response_time": p.avg_response_time,
"success_count": p.success_count,
"failure_count": p.failure_count
}
for p in self.proxies
]
}
async def fetch_with_proxy_rotation(
self,
url: str,
max_retries: int = 3,
**kwargs
) -> Optional[Dict]:
"""Fetch URL with automatic proxy rotation on failure"""
for attempt in range(max_retries):
proxy_url = await self.get_proxy()
if not proxy_url:
logger.warning("β οΈ No proxy available, trying direct connection")
proxy_url = None
try:
start_time = time.time()
async with aiohttp.ClientSession() as session:
async with session.get(
url,
proxy=proxy_url,
timeout=aiohttp.ClientTimeout(total=15),
**kwargs
) as response:
response.raise_for_status()
response_time = time.time() - start_time
# Record success
if proxy_url:
for proxy in self.proxies:
if proxy.get_proxy_url() == proxy_url:
proxy.record_success(response_time)
break
return await response.json()
except Exception as e:
logger.warning(f"β οΈ Proxy attempt {attempt + 1} failed: {e}")
# Record failure
if proxy_url:
for proxy in self.proxies:
if proxy.get_proxy_url() == proxy_url:
proxy.record_failure()
break
# Rotate to next proxy
self._rotate_proxy()
# If last attempt, raise
if attempt == max_retries - 1:
raise
return None
# Global instance
_proxy_manager = None
def get_proxy_manager() -> SmartProxyManager:
"""Get global proxy manager instance"""
global _proxy_manager
if _proxy_manager is None:
_proxy_manager = SmartProxyManager()
return _proxy_manager
|