From 23533b4aa339096c389265dac0248f5732c331a2 Mon Sep 17 00:00:00 2001 From: nsde Date: Fri, 30 Jun 2023 02:49:56 +0200 Subject: [PATCH] First actually working version I guess (streaming support) --- api/main.py | 15 +++-------- api/proxies.py | 8 +++++- api/transfer.py | 63 +++++++++++++++++++++++++++++++++++++---------- requirements.txt | 2 ++ tests/__main__.py | 3 ++- 5 files changed, 64 insertions(+), 27 deletions(-) diff --git a/api/main.py b/api/main.py index 9774aa7..3fb50db 100644 --- a/api/main.py +++ b/api/main.py @@ -26,8 +26,8 @@ app.add_middleware( async def startup_event(): """Read up the API server.""" - security.enable_proxy() - security.ip_protection_check() + # security.enable_proxy() + # security.ip_protection_check() @app.get('/') async def root(): @@ -39,13 +39,4 @@ async def root(): 'github': 'https://github.com/Luna-OSS' } -async def _reverse_proxy(request: Request): - headers = { - name: value - for name, value in target_response.headers.items() - if name.lower() not in EXCLUDED_HEADERS - } - - # ... - -app.add_route('/{path:path}', _reverse_proxy, ['GET', 'POST', 'PUT', 'DELETE', 'PATCH']) +app.add_route('/{path:path}', transfer.transfer_streaming_response, ['GET', 'POST', 'PUT', 'DELETE', 'PATCH']) diff --git a/api/proxies.py b/api/proxies.py index 56c2768..ea30520 100644 --- a/api/proxies.py +++ b/api/proxies.py @@ -40,6 +40,7 @@ class Proxy: """ return self.proxy_type# + 'h' if self.proxy_type.startswith('socks') else self.proxy_type + @property def proxies(self): """Returns a dictionary of proxies, ready to be used with the requests library or httpx. """ @@ -53,6 +54,11 @@ class Proxy: return proxies_dict + @property + def url(self): + """Returns the proxy URL.""" + return f'{self.protocol}://{self.auth}{self.host}:{self.port}' + def __str__(self): return f'{self.proxy_type}://{len(self.auth) * "*"}{self.host}:{self.port}' @@ -83,7 +89,7 @@ def check_proxy(): resp = httpx.get( url='https://echo.hoppscotch.io/', timeout=20, - proxies=active_proxy.proxies() + proxies=active_proxy.proxies ) resp.raise_for_status() diff --git a/api/transfer.py b/api/transfer.py index 376d24d..8bf7bc6 100644 --- a/api/transfer.py +++ b/api/transfer.py @@ -1,5 +1,10 @@ +"""Module for transferring requests to ClosedAI API""" + import os -import httpx +import aiohttp +import aiohttp_socks + +import proxies from dotenv import load_dotenv @@ -15,16 +20,48 @@ EXCLUDED_HEADERS = [ 'connection' ] -async def stream_api_response(request, target_endpoint: str='https://api.openai.com/v1'): - async with httpx.AsyncClient(timeout=120) as client: - async with client.stream( - method=request.method, - url=f'{target_endpoint}/{request.url.path}', - headers={ - 'Authorization': 'Bearer ' + os.getenv('CLOSEDAI_KEY'), - 'Content-Type': 'application/json' - }, - data=await request.body(), - ) as target_response: - target_response.raise_for_status() +proxy = proxies.active_proxy +proxy_connector = aiohttp_socks.ProxyConnector( + proxy_type=aiohttp_socks.ProxyType.SOCKS5, + host=proxy.ip_address, + port=proxy.port, + rdns=False, + username=proxy.username, + password=proxy.password +) +async def transfer_streaming_response(incoming_request, target_endpoint: str='https://api.openai.com/v1'): + """Transfer a streaming response from the incoming request to the target endpoint""" + + incoming_json_payload = await incoming_request.json() + + async def receive_target_stream(): + connector = aiohttp_socks.ProxyConnector( + proxy_type=aiohttp_socks.ProxyType.SOCKS5, + host=proxy.ip_address, + port=proxy.port, + rdns=False, + username=proxy.username, + password=proxy.password + ) + async with aiohttp.ClientSession( + connector=connector, + timeout=aiohttp.ClientTimeout(total=120), + raise_for_status=True + ) as session: + async with session.request( + method=incoming_request.method, + url=f'{target_endpoint}/{incoming_request.url.path}', + json=incoming_json_payload, + headers={ + 'Content-Type': 'application/json', + 'Authorization': f'Bearer {os.getenv("CLOSEDAI_KEY")}' + } + ) as response: + async for chunk in response.content.iter_any(): + chunk = f'{chunk.decode("utf8")}\n\n' + yield chunk + + return StreamingResponse( + content=receive_target_stream() + ) diff --git a/requirements.txt b/requirements.txt index 6938018..d737da3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,3 +5,5 @@ python-dotenv rich starlette win_inet_pton +aiohttp-socks +aiohttp diff --git a/tests/__main__.py b/tests/__main__.py index e3ffe37..a0ea5e7 100644 --- a/tests/__main__.py +++ b/tests/__main__.py @@ -35,12 +35,13 @@ def test_api(model: str=MODEL, messages: List[dict]=None) -> dict: json_data = { 'model': model, 'messages': messages or MESSAGES, + 'stream': True, } response = httpx.post(f'{ENDPOINT}/chat/completions', headers=headers, json=json_data, timeout=20) response.raise_for_status() - return response.json()['choices'][0] + return response def test_library(): """Tests if the endpoint is working with the "Closed"AI library."""