From bf7a6b565ae035bd04956770e55fba3c307d5c51 Mon Sep 17 00:00:00 2001 From: nsde Date: Fri, 4 Aug 2023 17:29:49 +0200 Subject: [PATCH] proxies have issues --- api/config/credits.yml | 4 +- api/db/logs.py | 3 +- api/helpers/chat.py | 58 ++++++++++++++++++++ api/helpers/network.py | 18 +++++++ api/load_balancing.py | 23 +++++--- api/proxies.py | 91 ++++++++++++++++++++++++------- api/streaming.py | 119 ++++++++++++++++++++++++++++------------- api/transfer.py | 2 - 8 files changed, 251 insertions(+), 67 deletions(-) create mode 100644 api/helpers/chat.py diff --git a/api/config/credits.yml b/api/config/credits.yml index e2f6807..e79cf8c 100644 --- a/api/config/credits.yml +++ b/api/config/credits.yml @@ -6,7 +6,7 @@ costs: chat-models: gpt-3: 10 - gpt-4: 75 + gpt-4: 30 gpt-4-32k: 100 # bonuses are multiplier for costs: @@ -20,4 +20,4 @@ bonuses: # discord reward 0.99^lvl? rewards: - day: 1000 + day: 250 diff --git a/api/db/logs.py b/api/db/logs.py index 5f9d851..06868ac 100644 --- a/api/db/logs.py +++ b/api/db/logs.py @@ -19,6 +19,7 @@ async def log_api_request(user: dict, incoming_request, target_url: str): last_prompt = payload['messages'][-1]['content'] model = payload.get('model') + ip_address = await network.get_ip(incoming_request) new_log_item = { 'timestamp': time.time(), @@ -26,7 +27,7 @@ async def log_api_request(user: dict, incoming_request, target_url: str): 'path': incoming_request.url.path, 'user_id': user['_id'], 'security': { - 'ip': network.get_ip(incoming_request), + 'ip': ip_address, 'useragent': incoming_request.headers.get('User-Agent') }, 'details': { diff --git a/api/helpers/chat.py b/api/helpers/chat.py new file mode 100644 index 0000000..2526241 --- /dev/null +++ b/api/helpers/chat.py @@ -0,0 +1,58 @@ +import string +import random +import asyncio + +from rich import print + +class CompletionStart: + """Beinning of a chat""" + +class CompletionStop: + """End of a chat""" + +async def create_chat_id() -> str: + chars = string.ascii_letters + string.digits + chat_id = ''.join(random.choices(chars, k=32)) + + return f'chatcmpl-{chat_id}' + +def create_chat_chunk(chat_id: str, model: str, content=None) -> dict: + content = content or {} + + delta = {} + + if content: + delta = { + 'content': content + } + + if not isinstance(content, str): + delta = { + 'role': 'assistant' + } + + chunk = { + 'id': chat_id, + 'object': 'chat.completion.chunk', + 'created': 0, + 'model': model, + 'choices': [ + { + 'delta': delta, + 'index': 0, + 'finish_reason': None if not(isinstance(content, str)) else 'stop' + } + ], + } + print(chunk) + + return chunk + +if __name__ == '__main__': + demo_chat_id = asyncio.run(create_chat_id()) + print(demo_chat_id) + print(asyncio.run(create_chat_chunk( + model='gpt-4', + content='Hello', + chat_id=demo_chat_id, + ))) diff --git a/api/helpers/network.py b/api/helpers/network.py index eb19198..26d810c 100644 --- a/api/helpers/network.py +++ b/api/helpers/network.py @@ -1,2 +1,20 @@ +import base64 +import asyncio + async def get_ip(request) -> str: return request.client.host + +async def add_proxy_auth_to_headers(username: str, password: str, headers: dict) -> dict: + proxy_auth = base64.b64encode(f'{username}:{password}'.encode()).decode() + headers['Proxy-Authorization'] = f'Basic {proxy_auth}' + return headers + +if __name__ == '__main__': + print(asyncio.run(add_proxy_auth_to_headers( + 'user', + 'pass', + { + 'Authorization': 'Bearer demo', + 'Another-Header': '123' + } + ))) diff --git a/api/load_balancing.py b/api/load_balancing.py index b718540..6fa4696 100644 --- a/api/load_balancing.py +++ b/api/load_balancing.py @@ -5,12 +5,18 @@ import chat_providers provider_modules = [ chat_providers.twa, - chat_providers.quantum, - chat_providers.churchless, - chat_providers.closed, - chat_providers.closed4 + # chat_providers.quantum, + # chat_providers.churchless, + # chat_providers.closed, + # chat_providers.closed4 ] +def _get_module_name(module) -> str: + name = module.__name__ + if '.' in name: + return name.split('.')[-1] + return name + async def balance_chat_request(payload: dict) -> dict: providers_available = [] @@ -24,7 +30,10 @@ async def balance_chat_request(payload: dict) -> dict: providers_available.append(provider_module) provider = random.choice(providers_available) - return provider.chat_completion(**payload) + target = provider.chat_completion(**payload) + target['module'] = _get_module_name(provider) + + return target async def balance_organic_request(request: dict) -> dict: providers_available = [] @@ -34,8 +43,10 @@ async def balance_organic_request(request: dict) -> dict: providers_available.append(provider_module) provider = random.choice(providers_available) + target = provider.organify(request) + target['module'] = _get_module_name(provider) - return provider.organify(request) + return target if __name__ == '__main__': req = asyncio.run(balance_chat_request(payload={'model': 'gpt-3.5-turbo', 'stream': True})) diff --git a/api/proxies.py b/api/proxies.py index 10f5e33..ee7348d 100644 --- a/api/proxies.py +++ b/api/proxies.py @@ -2,6 +2,7 @@ import os import socket +import random import asyncio import aiohttp import aiohttp_socks @@ -14,21 +15,34 @@ class Proxy: """Represents a proxy. The type can be either http, https, socks4 or socks5.""" def __init__(self, + url: str=None, proxy_type: str='http', host_or_ip: str='127.0.0.1', port: int=8080, username: str=None, password: str=None ): + if url: + proxy_type = url.split('://')[0] + url = url.split('://')[1] + + if '@' in url: + username = url.split('@')[1].split(':')[0] + password = url.split('@')[1].split(':')[1] + + host_or_ip = url.split(':')[0] + port = url.split(':')[1] + self.proxy_type = proxy_type self.host_or_ip = host_or_ip - self.ip_address = socket.gethostbyname(self.host_or_ip) if host_or_ip[0].isdigit() else host_or_ip + self.ip_address = socket.gethostbyname(self.host_or_ip) #if host_or_ip.replace('.', '').isdigit() else host_or_ip self.host = self.host_or_ip self.port = port self.username = username self.password = password self.url = f'{self.proxy_type}://{self.username}:{self.password}@{self.host}:{self.port}' + self.url_ip = f'{self.proxy_type}://{self.username}:{self.password}@{self.ip_address}:{self.port}' self.urls = { 'http': self.url, 'https': self.url @@ -37,21 +51,6 @@ class Proxy: self.urls_httpx = {k + '://' :v for k, v in self.urls.items()} self.proxies = self.url - async def initialize_connector(self, connector): - async with aiohttp.ClientSession( - connector=connector, - timeout=aiohttp.ClientTimeout(total=10), - raise_for_status=True - ) as session: - async with session.request( - method='get', - url='https://checkip.amazonaws.com', - headers={'Content-Type': 'application/json'} - ) as response: - detected_ip = await response.text() - print(f'Detected IP: {detected_ip}') - return detected_ip.strip() - @property def connector(self): proxy_types = { @@ -63,13 +62,33 @@ class Proxy: return aiohttp_socks.ProxyConnector( proxy_type=proxy_types[self.proxy_type], - host=self.host, + host=self.ip_address, port=self.port, rdns=False, username=self.username, password=self.password ) +proxies_in_files = [] + +for proxy_type in ['http', 'socks4', 'socks5']: + with open(f'secret/proxies/{proxy_type}.txt') as f: + for line in f.readlines(): + if line.strip() and not line.strip().startswith('#'): + if '#' in line: + line = line.split('#')[0] + + proxies_in_files.append(f'{proxy_type}://{line.strip()}') + +class ProxyChain: + def __init__(self): + random_proxy = random.choice(proxies_in_files) + + self.get_random = Proxy(url=random_proxy) + self.connector = aiohttp_socks.ChainProxyConnector.from_urls(proxies_in_files) + +default_chain = ProxyChain() + default_proxy = Proxy( proxy_type=os.getenv('PROXY_TYPE', 'http'), host_or_ip=os.getenv('PROXY_HOST', '127.0.0.1'), @@ -78,6 +97,8 @@ default_proxy = Proxy( password=os.getenv('PROXY_PASS') ) +random_proxy = ProxyChain().get_random + def test_httpx_workaround(): import httpx @@ -106,7 +127,41 @@ async def test_aiohttp_socks(): html = await response.text() return html.strip() +async def streaming_aiohttp_socks(): + async with aiohttp.ClientSession(connector=default_proxy.connector) as session: + async with session.post( + 'https://free.churchless.tech/v1/chat/completions', + json={ + "model": "gpt-3.5-turbo", + "messages": [ + { + "role": "user", + "content": "Hi" + } + ], + "stream": True + }, + # headers={ + # 'Authorization': 'Bearer MyDiscord' + # } + ) as response: + html = await response.text() + return html.strip() + +async def text_httpx_socks(): + import httpx + from httpx_socks import AsyncProxyTransport + + print(default_proxy.url_ip) + + transport = AsyncProxyTransport.from_url(default_proxy.url_ip) + async with httpx.AsyncClient(transport=transport) as client: + res = await client.get('https://checkip.amazonaws.com') + return res.text + if __name__ == '__main__': # print(test_httpx()) # print(test_requests()) - print(asyncio.run(test_aiohttp_socks())) + # print(asyncio.run(test_aiohttp_socks())) + # print(asyncio.run(streaming_aiohttp_socks())) + print(asyncio.run(text_httpx_socks())) diff --git a/api/streaming.py b/api/streaming.py index d1ca49f..a82798c 100644 --- a/api/streaming.py +++ b/api/streaming.py @@ -1,17 +1,18 @@ import os import yaml +import json import asyncio import aiohttp import starlette +from rich import print from dotenv import load_dotenv import proxies import load_balancing from db import logs, users, stats -from rich import print -from helpers import network +from helpers import network, chat load_dotenv() @@ -38,21 +39,38 @@ async def stream( incoming_request: starlette.requests.Request=None, ): payload = payload or DEMO_PAYLOAD + is_chat = False - if 'chat/completions' in path: # is a chat endpoint - target_request = await load_balancing.balance_chat_request(payload) - else: - target_request = await load_balancing.balance_organic_request(payload) + if 'chat/completions' in path: + is_chat = True + chat_id = await chat.create_chat_id() + model = payload['model'] - headers = { - 'Content-Type': 'application/json' - } + chat_chunk = chat.create_chat_chunk( + chat_id=chat_id, + model=model, + content=chat.CompletionStart + ) + data = json.dumps(chat_chunk) - for k, v in target_request.get('headers', {}).items(): - headers[k] = v + chunk = f'data: {data}' + + yield chunk for _ in range(5): - async with aiohttp.ClientSession(connector=proxies.default_proxy.connector) as session: + if is_chat: + target_request = await load_balancing.balance_chat_request(payload) + else: + target_request = await load_balancing.balance_organic_request(payload) + + headers = { + 'Content-Type': 'application/json' + } + + for k, v in target_request.get('headers', {}).items(): + headers[k] = v + + async with aiohttp.ClientSession(connector=proxies.random_proxy.connector) as session: async with session.request( method=target_request.get('method', 'POST'), url=target_request['url'], @@ -70,38 +88,63 @@ async def stream( try: await response.raise_for_status() except Exception as exc: - if 'Too Many Requests' in str(exc): - continue - else: - break + continue + # if 'Too Many Requests' in str(exc): - if user and incoming_request: - await logs.log_api_request( - user=user, - incoming_request=incoming_request, - target_url=target_request['url'] - ) + if user and incoming_request: + await logs.log_api_request( + user=user, + incoming_request=incoming_request, + target_url=target_request['url'] + ) - if credits_cost and user: - await users.update_by_id(user['_id'], { - '$inc': {'credits': -credits_cost} - }) + if credits_cost and user: + await users.update_by_id(user['_id'], { + '$inc': {'credits': -credits_cost} + }) - if not demo_mode: - await stats.add_date() - await stats.add_ip_address(network.get_ip(incoming_request)) - await stats.add_model(payload.get('model', '_non-chat')) - await stats.add_path(path) - await stats.add_target(target_request['url']) - await stats.add_tokens(input_tokens) + if not demo_mode: + ip_address = await network.get_ip(incoming_request) - async for chunk in response.content.iter_chunks(): - # chunk = f'{chunk.decode("utf8")}\n\n' + await stats.add_date() + await stats.add_ip_address(ip_address) + await stats.add_path(path) + await stats.add_target(target_request['url']) - if demo_mode: - print(chunk) + if is_chat: + await stats.add_model(model) + await stats.add_tokens(input_tokens, model) - yield chunk + async for chunk in response.content.iter_any(): + chunk = f'{chunk.decode("utf8")}\n\n' + + if chunk.strip(): + if is_chat: + if target_request['module'] == 'twa': + data = json.loads(chunk.split('data: ')[1]) + + if data.get('text'): + chat_chunk = chat.create_chat_chunk( + chat_id=chat_id, + model=model, + content=['text'] + ) + data = json.dumps(chat_chunk) + + chunk = f'data: {data}' + + yield chunk + break + if is_chat: + chat_chunk = chat.create_chat_chunk( + chat_id=chat_id, + model=model, + content=chat.CompletionStop + ) + data = json.dumps(chat_chunk) + + yield f'data: {data}' + yield 'data: [DONE]' if __name__ == '__main__': asyncio.run(stream()) diff --git a/api/transfer.py b/api/transfer.py index 04bdc20..e4a3e39 100644 --- a/api/transfer.py +++ b/api/transfer.py @@ -22,8 +22,6 @@ logging.basicConfig( format='%(asctime)s %(levelname)s %(name)s %(message)s' ) -logging.info('API started') - with open('config/credits.yml', encoding='utf8') as f: credits_config = yaml.safe_load(f)