nova-api/api/streaming.py

202 lines
7.3 KiB
Python
Raw Normal View History

2023-08-12 17:49:31 +02:00
"""This module contains the streaming logic for the API."""
2023-08-04 03:30:56 +02:00
import os
2023-08-04 17:29:49 +02:00
import json
2023-08-18 21:23:00 +02:00
import yaml
import dhooks
2023-08-04 03:30:56 +02:00
import asyncio
import aiohttp
import starlette
2023-08-04 17:29:49 +02:00
from rich import print
2023-08-04 03:30:56 +02:00
from dotenv import load_dotenv
2023-08-05 02:30:42 +02:00
from python_socks._errors import ProxyError
2023-08-04 03:30:56 +02:00
2023-08-18 21:23:00 +02:00
import chunks
2023-08-04 03:30:56 +02:00
import proxies
import provider_auth
2023-08-04 03:30:56 +02:00
import load_balancing
from db import logs
from db.users import UserManager
from db.stats import StatsManager
from helpers import network, chat, errors
2023-08-04 03:30:56 +02:00
load_dotenv()
2023-08-14 10:47:03 +02:00
## Loads config which contains rate limits
with open('config/config.yml', encoding='utf8') as f:
2023-08-14 10:47:03 +02:00
config = yaml.safe_load(f)
## Where all rate limit requested data will be stored.
# Rate limit data is **not persistent** (It will be deleted on server stop/restart).
user_last_request_time = {}
2023-08-14 10:47:03 +02:00
2023-08-04 03:30:56 +02:00
DEMO_PAYLOAD = {
'model': 'gpt-3.5-turbo',
'messages': [
{
'role': 'user',
'content': '1+1='
}
]
}
async def stream(
path: str='/v1/chat/completions',
user: dict=None,
payload: dict=None,
credits_cost: int=0,
input_tokens: int=0,
incoming_request: starlette.requests.Request=None,
):
2023-08-13 17:12:35 +02:00
"""Stream the completions request. Sends data in chunks
2023-08-14 10:47:03 +02:00
If not streaming, it sends the result in its entirety.
2023-08-13 17:12:35 +02:00
"""
2023-08-14 10:47:03 +02:00
## Setup managers
db = UserManager()
stats = StatsManager()
2023-08-14 10:47:03 +02:00
2023-08-04 17:29:49 +02:00
is_chat = False
is_stream = payload.get('stream', False)
2023-08-04 03:30:56 +02:00
2023-08-04 17:29:49 +02:00
if 'chat/completions' in path:
is_chat = True
model = payload['model']
2023-08-04 03:30:56 +02:00
if is_chat and is_stream:
chat_id = await chat.create_chat_id()
yield await chat.create_chat_chunk(chat_id=chat_id, model=model, content=chat.CompletionStart)
yield await chat.create_chat_chunk(chat_id=chat_id, model=model, content=None)
json_response = {'error': 'No JSON response could be received'}
2023-08-04 03:30:56 +02:00
2023-08-06 21:42:07 +02:00
for _ in range(5):
headers = {'Content-Type': 'application/json'}
2023-08-12 17:49:31 +02:00
2023-08-18 21:23:00 +02:00
# Load balancing: randomly selecting a suitable provider
2023-08-13 18:29:45 +02:00
# If the request is a chat completion, then we need to load balance between chat providers
# If the request is an organic request, then we need to load balance between organic providers
try:
if is_chat:
target_request = await load_balancing.balance_chat_request(payload)
else:
2023-08-13 18:29:45 +02:00
# In this case we are doing a organic request. "organic" means that it's not using a reverse engineered front-end, but rather ClosedAI's API directly
# churchless.tech is an example of an organic provider, because it redirects the request to ClosedAI.
target_request = await load_balancing.balance_organic_request({
'method': incoming_request.method,
'path': path,
'payload': payload,
'headers': headers,
'cookies': incoming_request.cookies
})
except ValueError as exc:
webhook = dhooks.Webhook(os.environ['DISCORD_WEBHOOK__API_ISSUE'])
webhook.send(content=f'API Issue: **`{exc}`**\nhttps://i.imgflip.com/7uv122.jpg')
yield await errors.yield_error(500, 'Sorry, the API has no working keys anymore.', 'The admins have been messaged automatically.')
2023-08-06 12:46:41 +02:00
return
2023-08-05 02:30:42 +02:00
target_request['headers'].update(target_request.get('headers', {}))
if target_request['method'] == 'GET' and not payload:
target_request['payload'] = None
2023-08-04 17:29:49 +02:00
2023-08-13 18:29:45 +02:00
# We haven't done any requests as of right now, everything until now was just preparation
# Here, we process the request
2023-08-12 17:49:31 +02:00
async with aiohttp.ClientSession(connector=proxies.get_proxy().connector) as session:
2023-08-23 23:26:43 +02:00
try:
async with session.get(
url='https://checkip.amazonaws.com',
timeout=aiohttp.ClientTimeout(
connect=3,
total=float(os.getenv('TRANSFER_TIMEOUT', '5'))
)
) as response:
for actual_ip in os.getenv('ACTUAL_IPS', '').split(' '):
if actual_ip in await response.text():
raise ValueError(f'Proxy {response.text()} is transparent!')
except Exception as exc:
print(f'[!] proxy {proxies.get_proxy()} error - ({type(exc)} {exc})')
2023-08-23 23:26:43 +02:00
continue
2023-08-05 02:30:42 +02:00
try:
async with session.request(
method=target_request.get('method', 'POST'),
url=target_request['url'],
data=target_request.get('data'),
json=target_request.get('payload'),
headers=target_request.get('headers', {}),
2023-08-05 02:30:42 +02:00
cookies=target_request.get('cookies'),
ssl=False,
2023-08-16 15:06:16 +02:00
timeout=aiohttp.ClientTimeout(
2023-08-22 20:03:55 +02:00
connect=60,
2023-08-16 15:06:16 +02:00
total=float(os.getenv('TRANSFER_TIMEOUT', '120'))
),
2023-08-05 02:30:42 +02:00
) as response:
2023-08-23 23:26:43 +02:00
if response.status == 429:
continue
if response.content_type == 'application/json':
data = await response.json()
2023-08-06 21:42:07 +02:00
if data.get('code') == 'invalid_api_key':
await provider_auth.invalidate_key(target_request.get('provider_auth'))
2023-08-05 02:30:42 +02:00
continue
2023-08-04 03:30:56 +02:00
if response.ok:
json_response = data
if is_stream:
try:
response.raise_for_status()
except Exception as exc:
if 'Too Many Requests' in str(exc):
continue
2023-08-18 21:23:00 +02:00
async for chunk in chunks.process_chunks(
chunks=response.content.iter_any(),
is_chat=is_chat,
chat_id=chat_id,
model=model,
target_request=target_request
):
yield chunk
2023-08-04 17:29:49 +02:00
2023-08-05 02:30:42 +02:00
break
2023-08-04 17:29:49 +02:00
except ProxyError as exc:
2023-08-23 23:26:43 +02:00
print('[!] aiohttp came up with a dumb excuse to not work again ("pRoXy ErRor")')
2023-08-05 02:30:42 +02:00
continue
2023-08-04 17:29:49 +02:00
2023-08-23 23:26:43 +02:00
except ConnectionResetError as exc:
print('[!] aiohttp came up with a dumb excuse to not work again ("cOnNeCtIoN rEsEt")')
continue
if is_chat and is_stream:
yield await chat.create_chat_chunk(chat_id=chat_id, model=model, content=chat.CompletionStop)
2023-08-05 02:30:42 +02:00
yield 'data: [DONE]\n\n'
2023-08-04 03:30:56 +02:00
if not is_stream and json_response:
2023-08-06 21:42:07 +02:00
yield json.dumps(json_response)
if user and incoming_request:
await logs.log_api_request(user=user, incoming_request=incoming_request, target_url=target_request['url'])
2023-08-06 21:42:07 +02:00
if credits_cost and user:
await db.update_by_id(user['_id'], {'$inc': {'credits': -credits_cost}})
2023-08-06 21:42:07 +02:00
ip_address = await network.get_ip(incoming_request)
await stats.add_date()
await stats.add_ip_address(ip_address)
await stats.add_path(path)
await stats.add_target(target_request['url'])
2023-08-06 21:42:07 +02:00
if is_chat:
await stats.add_model(model)
await stats.add_tokens(input_tokens, model)
2023-08-04 03:30:56 +02:00
if __name__ == '__main__':
asyncio.run(stream())