nova-api/api/proxies.py

147 lines
4.1 KiB
Python
Raw Normal View History

2023-07-19 23:51:28 +02:00
"""This module makes it easy to implement proxies by providing a class.."""
2023-06-28 15:21:14 +02:00
import os
import socket
2023-08-04 17:29:49 +02:00
import random
2023-06-28 15:21:14 +02:00
import asyncio
2023-07-25 02:42:53 +02:00
import aiohttp
import aiohttp_socks
2023-06-28 15:21:14 +02:00
2023-08-05 02:30:42 +02:00
from rich import print
2023-06-28 15:21:14 +02:00
from dotenv import load_dotenv
load_dotenv()
2023-08-12 17:49:31 +02:00
USE_PROXY_LIST = os.getenv('USE_PROXY_LIST', 'False').lower() == 'true'
2023-06-28 15:21:14 +02:00
class Proxy:
2023-08-13 11:12:08 +02:00
"""
### Represents a proxy.
The type can be either http, https, socks4 or socks5.
You can also pass a url, which will be parsed into the other attributes.
URL format:
2023-08-12 17:49:31 +02:00
[type]://[username:password@]host:port
2023-08-13 11:12:08 +02:00
"""
2023-06-28 15:21:14 +02:00
def __init__(self,
2023-08-04 17:29:49 +02:00
url: str=None,
2023-06-28 15:21:14 +02:00
proxy_type: str='http',
host_or_ip: str='127.0.0.1',
2023-06-28 15:21:14 +02:00
port: int=8080,
username: str=None,
password: str=None
):
2023-08-04 17:29:49 +02:00
if url:
proxy_type = url.split('://')[0]
url = url.split('://')[1]
if '@' in url:
username = url.split('@')[1].split(':')[0]
password = url.split('@')[1].split(':')[1]
host_or_ip = url.split(':')[0]
port = url.split(':')[1]
2023-06-28 15:21:14 +02:00
self.proxy_type = proxy_type
self.host_or_ip = host_or_ip
try:
self.ip_address = socket.gethostbyname(self.host_or_ip) # get ip address from host
except socket.gaierror:
self.ip_address = self.host_or_ip
self.host = self.host_or_ip
2023-06-28 15:21:14 +02:00
self.port = port
self.username = username
self.password = password
self.url = f'{self.proxy_type}://{self.username}:{self.password}@{self.host}:{self.port}'
2023-08-04 17:29:49 +02:00
self.url_ip = f'{self.proxy_type}://{self.username}:{self.password}@{self.ip_address}:{self.port}'
self.urls = {
'http': self.url,
'https': self.url
}
self.urls_httpx = {k + '://' :v for k, v in self.urls.items()}
self.proxies = self.url
2023-07-25 19:45:21 +02:00
print({
'proxy_type': self.proxy_type,
'host_or_ip': self.host_or_ip,
'ip_address': self.ip_address,
'host': self.host,
'port': self.port,
'username': self.username,
'password': self.password,
'url': self.url
})
2023-08-03 03:50:04 +02:00
@property
def connector(self):
2023-08-13 17:12:35 +02:00
"""
### Returns a proxy connector
Returns an aiohttp_socks.ProxyConnector object.
This can be used in aiohttp.ClientSession.
"""
2023-08-12 17:49:31 +02:00
2023-07-25 02:42:53 +02:00
proxy_types = {
'http': aiohttp_socks.ProxyType.HTTP,
'https': aiohttp_socks.ProxyType.HTTP,
'socks4': aiohttp_socks.ProxyType.SOCKS4,
'socks5': aiohttp_socks.ProxyType.SOCKS5
}
2023-08-03 03:50:04 +02:00
return aiohttp_socks.ProxyConnector(
2023-07-25 02:42:53 +02:00
proxy_type=proxy_types[self.proxy_type],
host=self.host,
2023-07-25 02:42:53 +02:00
port=self.port,
2023-08-13 11:12:08 +02:00
rdns=False,
2023-07-25 02:42:53 +02:00
username=self.username,
password=self.password
)
2023-08-13 11:12:08 +02:00
## Load proxies from their files
2023-08-12 17:49:31 +02:00
2023-08-04 17:29:49 +02:00
proxies_in_files = []
for proxy_type in ['http', 'socks4', 'socks5']:
try:
2023-08-05 02:30:42 +02:00
with open(f'secret/proxies/{proxy_type}.txt') as f:
2023-08-13 11:12:08 +02:00
for line in f:
clean_line = line.split('#', 1)[0].strip()
if clean_line:
proxies_in_files.append(f'{proxy_type}://{clean_line}')
except FileNotFoundError:
pass
2023-08-04 17:29:49 +02:00
2023-08-13 11:12:08 +02:00
## Manages the proxy list
2023-08-12 17:49:31 +02:00
class ProxyLists:
2023-08-04 17:29:49 +02:00
def __init__(self):
random_proxy = random.choice(proxies_in_files)
self.get_random = Proxy(url=random_proxy)
self.connector = aiohttp_socks.ChainProxyConnector.from_urls(proxies_in_files)
2023-08-12 17:49:31 +02:00
def get_proxy() -> Proxy:
2023-08-13 11:12:08 +02:00
"""
### Returns a Proxy object
The proxy is either from the proxy list or from the environment variables.
"""
print('URL:\t' + ProxyLists().get_random.url)
2023-08-12 17:49:31 +02:00
if USE_PROXY_LIST:
return ProxyLists().get_random
return Proxy(
proxy_type=os.getenv('PROXY_TYPE', 'http'),
host_or_ip=os.getenv('PROXY_HOST', '127.0.0.1'),
port=int(os.getenv('PROXY_PORT', '8080')),
username=os.getenv('PROXY_USER'),
password=os.getenv('PROXY_PASS')
)
if __name__ == '__main__':
print(get_proxy().url)
print(get_proxy().connector)