diff --git a/api/db/logs.py b/api/db/logs.py index 6b2512b..fde9cec 100644 --- a/api/db/logs.py +++ b/api/db/logs.py @@ -1,6 +1,5 @@ import os -import bson -import datetime +import time from dotenv import load_dotenv from motor.motor_asyncio import AsyncIOMotorClient @@ -22,7 +21,7 @@ async def log_api_request(user, request, target_url): model = payload['model'] new_log_item = { - 'timestamp': bson.timestamp.Timestamp(datetime.datetime.now(), 0), + 'timestamp': time.time(), 'method': request.method, 'path': request.url.path, 'user_id': user['_id'], diff --git a/api/netclient.py b/api/netclient.py index 181ee8a..0d79c45 100644 --- a/api/netclient.py +++ b/api/netclient.py @@ -1,5 +1,7 @@ import os -import requests +import aiohttp +import asyncio +import aiohttp_socks from dotenv import load_dotenv @@ -9,7 +11,7 @@ from helpers import exceptions load_dotenv() -async def stream(request: dict): +async def stream(request: dict, demo_mode: bool=False): headers = { 'Content-Type': 'application/json' } @@ -18,27 +20,42 @@ async def stream(request: dict): headers[k] = v for _ in range(3): - response = requests.request( - method=request.get('method', 'POST'), - url=request['url'], - json=request.get('payload', {}), - headers=headers, - timeout=int(os.getenv('TRANSFER_TIMEOUT', '120')), - proxies=proxies.default_proxy.urls, - stream=True - ) + async with aiohttp.ClientSession(connector=proxies.default_proxy.connector) as session: + async with session.get( + # 'GET', + 'https://checkip.amazonaws.com/' + ) as response: + print(response.content) + print(type(response.content)) - try: - response.raise_for_status() - except Exception as exc: - if str(exc) == '429 Client Error: Too Many Requests for url: https://api.openai.com/v1/chat/completions': - continue - else: - break + # html = await response.text() + # print(html) - for chunk in response.iter_lines(): - chunk = f'{chunk.decode("utf8")}\n\n' - yield chunk + # async with session.get( + # method='GET', + # url='https://checkip.amazonaws.com', + # method=request.get('method', 'POST'), + # url=request['url'], + # json=request.get('payload', {}), + # headers=headers, + # timeout=aiohttp.ClientTimeout(total=float(os.getenv('TRANSFER_TIMEOUT', '120'))), + # ) as response: + # try: + # await response.raise_for_status() + # except Exception as exc: + # if 'Too Many Requests' in str(exc): + # continue + # else: + # break + + async for chunk in response.content.iter_chunks(): + # chunk = f'{chunk.decode("utf8")}\n\n' + + if demo_mode: + print(chunk) + + yield chunk if __name__ == '__main__': - pass + asyncio.run(stream({'method': 'GET', 'url': 'https://checkip.amazonaws.com'}, True)) + diff --git a/api/proxies.py b/api/proxies.py index 70d90ad..10f5e33 100644 --- a/api/proxies.py +++ b/api/proxies.py @@ -52,7 +52,8 @@ class Proxy: print(f'Detected IP: {detected_ip}') return detected_ip.strip() - async def get_connector(self): + @property + def connector(self): proxy_types = { 'http': aiohttp_socks.ProxyType.HTTP, 'https': aiohttp_socks.ProxyType.HTTP, @@ -60,7 +61,7 @@ class Proxy: 'socks5': aiohttp_socks.ProxyType.SOCKS5 } - connector = aiohttp_socks.ProxyConnector( + return aiohttp_socks.ProxyConnector( proxy_type=proxy_types[self.proxy_type], host=self.host, port=self.port, @@ -69,10 +70,6 @@ class Proxy: password=self.password ) - await self.initialize_connector(connector) - - return connector - default_proxy = Proxy( proxy_type=os.getenv('PROXY_TYPE', 'http'), host_or_ip=os.getenv('PROXY_HOST', '127.0.0.1'), @@ -81,16 +78,6 @@ default_proxy = Proxy( password=os.getenv('PROXY_PASS') ) -def test_httpx(): - import httpx - - print(default_proxy.proxies) - - with httpx.Client( - # proxies=default_proxy.proxies - ) as client: - return client.get('https://checkip.amazonaws.com').text.strip() - def test_httpx_workaround(): import httpx @@ -113,6 +100,13 @@ def test_requests(): proxies=default_proxy.urls ).text.strip() +async def test_aiohttp_socks(): + async with aiohttp.ClientSession(connector=default_proxy.connector) as session: + async with session.get('https://checkip.amazonaws.com/') as response: + html = await response.text() + return html.strip() + if __name__ == '__main__': - print(test_httpx()) + # print(test_httpx()) # print(test_requests()) + print(asyncio.run(test_aiohttp_socks())) diff --git a/api/transfer.py b/api/transfer.py index 9fef647..1ef3545 100644 --- a/api/transfer.py +++ b/api/transfer.py @@ -89,4 +89,6 @@ async def handle(incoming_request): print(target_request['url']) + return errors.error(500, 'Sorry, the API is currenly under maintainance.', 'Please try again later.') + return starlette.responses.StreamingResponse(netclient.stream(target_request))