2023-08-12 17:49:31 +02:00
""" This module contains the streaming logic for the API. """
2023-08-04 03:30:56 +02:00
import os
2023-08-04 17:29:49 +02:00
import json
2023-08-06 00:43:36 +02:00
import dhooks
2023-08-04 03:30:56 +02:00
import asyncio
import aiohttp
import starlette
2023-08-14 10:47:03 +02:00
import datetime
2023-08-04 03:30:56 +02:00
2023-08-04 17:29:49 +02:00
from rich import print
2023-08-04 03:30:56 +02:00
from dotenv import load_dotenv
2023-08-05 02:30:42 +02:00
from python_socks . _errors import ProxyError
2023-08-04 03:30:56 +02:00
import proxies
2023-08-09 11:15:49 +02:00
import provider_auth
2023-08-04 03:30:56 +02:00
import load_balancing
2023-08-14 05:11:15 +02:00
from db import logs
from db . users import UserManager
from db . stats import StatsManager
2023-08-06 00:43:36 +02:00
from helpers import network , chat , errors
2023-08-14 10:47:03 +02:00
import yaml
2023-08-04 03:30:56 +02:00
load_dotenv ( )
2023-08-14 10:47:03 +02:00
## Loads config which contains rate limits
2023-08-16 12:08:58 +02:00
with open ( ' config/credits.yml ' , encoding = ' utf8 ' ) as f :
2023-08-14 10:47:03 +02:00
config = yaml . safe_load ( f )
## Where all rate limit requested data will be stored.
2023-08-14 11:06:25 +02:00
# Rate limit data is **not persistent** (It will be deleted on server stop/restart).
2023-08-16 12:08:58 +02:00
# user_last_request_time = {}
2023-08-14 10:47:03 +02:00
2023-08-04 03:30:56 +02:00
DEMO_PAYLOAD = {
' model ' : ' gpt-3.5-turbo ' ,
' messages ' : [
{
' role ' : ' user ' ,
' content ' : ' 1+1= '
}
]
}
2023-08-13 18:42:38 +02:00
async def process_response ( response , is_chat , chat_id , model , target_request ) :
2023-08-13 18:26:35 +02:00
""" Proccesses chunks from streaming
Args :
response ( _type_ ) : The response
is_chat ( bool ) : If there is ' chat/completions ' in path
chat_id ( _type_ ) : ID of chat with bot
model ( _type_ ) : What AI model it is
"""
async for chunk in response . content . iter_any ( ) :
chunk = chunk . decode ( " utf8 " ) . strip ( )
send = False
if is_chat and ' { ' in chunk :
data = json . loads ( chunk . split ( ' data: ' ) [ 1 ] )
chunk = chunk . replace ( data [ ' id ' ] , chat_id )
send = True
if target_request [ ' module ' ] == ' twa ' and data . get ( ' text ' ) :
chunk = await chat . create_chat_chunk ( chat_id = chat_id , model = model , content = [ ' text ' ] )
if ( not data [ ' choices ' ] [ 0 ] [ ' delta ' ] ) or data [ ' choices ' ] [ 0 ] [ ' delta ' ] == { ' role ' : ' assistant ' } :
send = False
if send and chunk :
yield chunk + ' \n \n '
2023-08-04 03:30:56 +02:00
async def stream (
path : str = ' /v1/chat/completions ' ,
user : dict = None ,
payload : dict = None ,
credits_cost : int = 0 ,
input_tokens : int = 0 ,
incoming_request : starlette . requests . Request = None ,
) :
2023-08-13 17:12:35 +02:00
""" Stream the completions request. Sends data in chunks
2023-08-14 10:47:03 +02:00
If not streaming , it sends the result in its entirety .
2023-08-13 17:12:35 +02:00
Args :
path ( str , optional ) : URL Path . Defaults to ' /v1/chat/completions ' .
user ( dict , optional ) : User object ( dict ) Defaults to None .
payload ( dict , optional ) : Payload . Defaults to None .
credits_cost ( int , optional ) : Cost of the credits of the request . Defaults to 0.
input_tokens ( int , optional ) : Total tokens calculated with tokenizer . Defaults to 0.
incoming_request ( starlette . requests . Request , optional ) : Incoming request . Defaults to None .
"""
2023-08-14 10:47:03 +02:00
2023-08-14 11:06:25 +02:00
## Rate limits user.
# If rate limit is exceeded, error code 429. Otherwise, lets the user pass but notes down
# last request time for future requests.
2023-08-16 12:08:58 +02:00
# if user:
# role = user.get('role', 'default')
# rate_limit = config['roles'][role]['rate_limit'].get(payload['model'], 10)
# last_request_time = user_last_request_time.get(user['api_key'])
# time_since_last_request = datetime.datetime.now() - last_request_time
# if time_since_last_request < datetime.timedelta(seconds=rate_limit):
# yield await errors.yield_error(429, 'Rate limit exceeded', "You are making requests too quickly. Please wait and try again later. Ask a administrator if you think this shouldn't happen.")
# return
# else:
# user_last_request_time[user['_id']] = datetime.datetime.now()
2023-08-14 10:47:03 +02:00
## Setup managers
2023-08-14 05:11:15 +02:00
db = UserManager ( )
stats = StatsManager ( )
2023-08-14 10:47:03 +02:00
2023-08-04 17:29:49 +02:00
is_chat = False
2023-08-06 00:43:36 +02:00
is_stream = payload . get ( ' stream ' , False )
2023-08-04 03:30:56 +02:00
2023-08-04 17:29:49 +02:00
if ' chat/completions ' in path :
is_chat = True
model = payload [ ' model ' ]
2023-08-04 03:30:56 +02:00
2023-08-06 00:43:36 +02:00
if is_chat and is_stream :
chat_id = await chat . create_chat_id ( )
2023-08-13 18:26:35 +02:00
yield await chat . create_chat_chunk ( chat_id = chat_id , model = model , content = chat . CompletionStart )
yield await chat . create_chat_chunk ( chat_id = chat_id , model = model , content = None )
2023-08-06 00:43:36 +02:00
2023-08-13 18:26:35 +02:00
json_response = { ' error ' : ' No JSON response could be received ' }
2023-08-04 03:30:56 +02:00
2023-08-06 21:42:07 +02:00
for _ in range ( 5 ) :
2023-08-13 18:26:35 +02:00
headers = { ' Content-Type ' : ' application/json ' }
2023-08-12 17:49:31 +02:00
2023-08-13 18:29:45 +02:00
# Load balancing
# If the request is a chat completion, then we need to load balance between chat providers
# If the request is an organic request, then we need to load balance between organic providers
2023-08-06 00:43:36 +02:00
try :
if is_chat :
target_request = await load_balancing . balance_chat_request ( payload )
else :
2023-08-13 18:29:45 +02:00
# In this case we are doing a organic request. "organic" means that it's not using a reverse engineered front-end, but rather ClosedAI's API directly
# churchless.tech is an example of an organic provider, because it redirects the request to ClosedAI.
2023-08-06 00:43:36 +02:00
target_request = await load_balancing . balance_organic_request ( {
' method ' : incoming_request . method ,
' path ' : path ,
' payload ' : payload ,
' headers ' : headers ,
' cookies ' : incoming_request . cookies
} )
except ValueError as exc :
webhook = dhooks . Webhook ( os . getenv ( ' DISCORD_WEBHOOK__API_ISSUE ' ) )
webhook . send ( content = f ' API Issue: **` { exc } `** \n https://i.imgflip.com/7uv122.jpg ' )
2023-08-13 18:26:35 +02:00
yield await errors . yield_error ( 500 , ' Sorry, the API has no working keys anymore. ' , ' The admins have been messaged automatically. ' )
2023-08-06 12:46:41 +02:00
return
2023-08-05 02:30:42 +02:00
2023-08-13 18:26:35 +02:00
target_request [ ' headers ' ] . update ( target_request . get ( ' headers ' , { } ) )
2023-08-09 11:15:49 +02:00
if target_request [ ' method ' ] == ' GET ' and not payload :
target_request [ ' payload ' ] = None
2023-08-04 17:29:49 +02:00
2023-08-13 18:29:45 +02:00
# We haven't done any requests as of right now, everything until now was just preparation
# Here, we process the request
2023-08-12 17:49:31 +02:00
async with aiohttp . ClientSession ( connector = proxies . get_proxy ( ) . connector ) as session :
2023-08-05 02:30:42 +02:00
try :
async with session . request (
method = target_request . get ( ' method ' , ' POST ' ) ,
url = target_request [ ' url ' ] ,
data = target_request . get ( ' data ' ) ,
json = target_request . get ( ' payload ' ) ,
2023-08-09 11:15:49 +02:00
headers = target_request . get ( ' headers ' , { } ) ,
2023-08-05 02:30:42 +02:00
cookies = target_request . get ( ' cookies ' ) ,
ssl = False ,
2023-08-16 15:06:16 +02:00
timeout = aiohttp . ClientTimeout (
connect = 3.0 ,
total = float ( os . getenv ( ' TRANSFER_TIMEOUT ' , ' 120 ' ) )
) ,
2023-08-05 02:30:42 +02:00
) as response :
2023-08-09 11:15:49 +02:00
if response . content_type == ' application/json ' :
data = await response . json ( )
2023-08-06 21:42:07 +02:00
2023-08-09 11:15:49 +02:00
if data . get ( ' code ' ) == ' invalid_api_key ' :
await provider_auth . invalidate_key ( target_request . get ( ' provider_auth ' ) )
2023-08-05 02:30:42 +02:00
continue
2023-08-04 03:30:56 +02:00
2023-08-09 11:15:49 +02:00
if response . ok :
json_response = data
2023-08-06 00:43:36 +02:00
if is_stream :
2023-08-09 11:15:49 +02:00
try :
response . raise_for_status ( )
except Exception as exc :
if ' Too Many Requests ' in str ( exc ) :
continue
2023-08-13 18:42:38 +02:00
async for chunk in process_response ( response , is_chat , chat_id , model , target_request ) :
2023-08-13 18:26:35 +02:00
yield chunk
2023-08-04 17:29:49 +02:00
2023-08-05 02:30:42 +02:00
break
2023-08-04 17:29:49 +02:00
2023-08-06 00:43:36 +02:00
except ProxyError as exc :
2023-08-12 17:49:31 +02:00
print ( ' [!] Proxy error: ' , exc )
2023-08-05 02:30:42 +02:00
continue
2023-08-04 17:29:49 +02:00
2023-08-06 00:43:36 +02:00
if is_chat and is_stream :
2023-08-13 18:26:35 +02:00
yield await chat . create_chat_chunk ( chat_id = chat_id , model = model , content = chat . CompletionStop )
2023-08-05 02:30:42 +02:00
yield ' data: [DONE] \n \n '
2023-08-04 03:30:56 +02:00
2023-08-09 11:15:49 +02:00
if not is_stream and json_response :
2023-08-06 21:42:07 +02:00
yield json . dumps ( json_response )
if user and incoming_request :
2023-08-13 18:26:35 +02:00
await logs . log_api_request ( user = user , incoming_request = incoming_request , target_url = target_request [ ' url ' ] )
2023-08-06 21:42:07 +02:00
if credits_cost and user :
2023-08-14 05:11:15 +02:00
await db . update_by_id ( user [ ' _id ' ] , { ' $inc ' : { ' credits ' : - credits_cost } } )
2023-08-06 21:42:07 +02:00
ip_address = await network . get_ip ( incoming_request )
2023-08-14 05:11:15 +02:00
await stats . add_date ( )
await stats . add_ip_address ( ip_address )
await stats . add_path ( path )
await stats . add_target ( target_request [ ' url ' ] )
2023-08-06 21:42:07 +02:00
if is_chat :
2023-08-14 05:11:15 +02:00
await stats . add_model ( model )
await stats . add_tokens ( input_tokens , model )
2023-08-06 00:43:36 +02:00
2023-08-04 03:30:56 +02:00
if __name__ == ' __main__ ' :
asyncio . run ( stream ( ) )