diff --git a/.gitignore b/.gitignore index 92a7b48..9d362aa 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,11 @@ +*.log.json +/logs +/log +.log +*.log.* + providers/* providers/ -chat_providers/* -chat_providers/ secret/* secret/ diff --git a/api/config/credits.yml b/api/config/credits.yml index e79cf8c..fa2449b 100644 --- a/api/config/credits.yml +++ b/api/config/credits.yml @@ -1,4 +1,5 @@ max-credits: 100001 +max-credits-owner: 694201337 start-credits: 1000 costs: diff --git a/api/core.py b/api/core.py index 5895425..b763ed3 100644 --- a/api/core.py +++ b/api/core.py @@ -38,7 +38,7 @@ def new_user_webhook(user: dict) -> None: embed = Embed( description='New User', color=0x90ee90, - ) + ) embed.add_field(name='ID', value=user['_id'], inline=False) embed.add_field(name='Discord', value=user['auth']['discord']) diff --git a/api/db/logs.py b/api/db/logs.py index 06868ac..9a79286 100644 --- a/api/db/logs.py +++ b/api/db/logs.py @@ -12,7 +12,13 @@ def _get_mongo(collection_name: str): return AsyncIOMotorClient(os.getenv('MONGO_URI'))['nova-core'][collection_name] async def log_api_request(user: dict, incoming_request, target_url: str): - payload = await incoming_request.json() + payload = {} + + try: + payload = await incoming_request.json() + except Exception as exc: + if 'JSONDecodeError' in str(exc): + pass last_prompt = None if 'messages' in payload: diff --git a/api/helpers/errors.py b/api/helpers/errors.py index 6978e39..d3846db 100644 --- a/api/helpers/errors.py +++ b/api/helpers/errors.py @@ -11,3 +11,10 @@ def error(code: int, message: str, tip: str) -> starlette.responses.Response: }} return starlette.responses.Response(status_code=code, content=json.dumps(info)) + +def yield_error(code: int, message: str, tip: str) -> str: + return json.dumps({ + 'code': code, + 'message': message, + 'tip': tip + }) diff --git a/api/load_balancing.py b/api/load_balancing.py index 263dfe2..c6d30bd 100644 --- a/api/load_balancing.py +++ b/api/load_balancing.py @@ -1,14 +1,14 @@ import random import asyncio -import chat_providers +import providers provider_modules = [ - # chat_providers.twa, - # chat_providers.quantum, - chat_providers.churchless, - chat_providers.closed, - chat_providers.closed4 + # providers.twa, + # providers.quantum, + providers.churchless, + providers.closed, + providers.closed4 ] def _get_module_name(module) -> str: diff --git a/api/streaming.py b/api/streaming.py index df3011e..8dba622 100644 --- a/api/streaming.py +++ b/api/streaming.py @@ -1,6 +1,6 @@ import os -import yaml import json +import dhooks import asyncio import aiohttp import starlette @@ -13,7 +13,7 @@ import proxies import load_balancing from db import logs, users, stats -from helpers import network, chat +from helpers import network, chat, errors load_dotenv() @@ -27,9 +27,6 @@ DEMO_PAYLOAD = { ] } -with open('config/credits.yml', encoding='utf8') as f: - max_credits = yaml.safe_load(f)['max-credits'] - async def stream( path: str='/v1/chat/completions', user: dict=None, @@ -42,12 +39,16 @@ async def stream( payload = payload or DEMO_PAYLOAD is_chat = False + is_stream = payload.get('stream', False) if 'chat/completions' in path: is_chat = True - chat_id = await chat.create_chat_id() model = payload['model'] + + if is_chat and is_stream: + chat_id = await chat.create_chat_id() + yield chat.create_chat_chunk( chat_id=chat_id, model=model, @@ -60,25 +61,38 @@ async def stream( content=None ) - for _ in range(5): + for _ in range(3): headers = { 'Content-Type': 'application/json' } - if is_chat: - target_request = await load_balancing.balance_chat_request(payload) - else: - target_request = await load_balancing.balance_organic_request({ - 'path': path, - 'payload': payload, - 'headers': headers - }) + try: + if is_chat: + target_request = await load_balancing.balance_chat_request(payload) + else: + target_request = await load_balancing.balance_organic_request({ + 'method': incoming_request.method, + 'path': path, + 'payload': payload, + 'headers': headers, + 'cookies': incoming_request.cookies + }) + except ValueError as exc: + webhook = dhooks.Webhook(os.getenv('DISCORD_WEBHOOK__API_ISSUE')) + + webhook.send(content=f'API Issue: **`{exc}`**\nhttps://i.imgflip.com/7uv122.jpg') + yield errors.yield_error( + 500, + 'Sorry, the API has no working keys anymore.', + 'The admins have been messaged automatically.' + ) for k, v in target_request.get('headers', {}).items(): headers[k] = v - async with aiohttp.ClientSession(connector=proxies.default_proxy.connector) as session: + json.dump(target_request, open('api.log.json', 'w'), indent=4) + async with aiohttp.ClientSession(connector=proxies.default_proxy.connector) as session: try: async with session.request( method=target_request.get('method', 'POST'), @@ -94,6 +108,7 @@ async def stream( timeout=aiohttp.ClientTimeout(total=float(os.getenv('TRANSFER_TIMEOUT', '120'))), ) as response: + print(5) try: response.raise_for_status() @@ -114,28 +129,44 @@ async def stream( '$inc': {'credits': -credits_cost} }) + print(6) - try: - async for chunk in response.content.iter_any(): - chunk = f'{chunk.decode("utf8")}\n\n' + if is_stream: + try: + async for chunk in response.content.iter_any(): + send = False + chunk = f'{chunk.decode("utf8")}\n\n' + chunk = chunk.replace(os.getenv('MAGIC_WORD', 'novaOSScheckKeyword'), payload['model']) + # chunk = chunk.replace(os.getenv('MAGIC_USER_WORD', 'novaOSSuserKeyword'), user['_id']) - if chunk.strip(): - if is_chat: - if target_request['module'] == 'twa': - data = json.loads(chunk.split('data: ')[1]) + if not chunk.strip(): + send = False - if data.get('text'): - chunk = chat.create_chat_chunk( - chat_id=chat_id, - model=model, - content=['text'] - ) - yield chunk + if is_chat and '{' in chunk: + data = json.loads(chunk.split('data: ')[1]) + send = True + print(data) - except Exception as exc: - if 'Connection closed' in str(exc): - print('connection closed') - continue + if target_request['module'] == 'twa' and data.get('text'): + chunk = chat.create_chat_chunk( + chat_id=chat_id, + model=model, + content=['text'] + ) + + if not data['choices'][0]['delta']: + send = False + + if data['choices'][0]['delta'] == {'role': 'assistant'}: + send = False + + if send: + yield chunk + + except Exception as exc: + if 'Connection closed' in str(exc): + print('connection closed: ', exc) + continue if not demo_mode: ip_address = await network.get_ip(incoming_request) @@ -151,12 +182,13 @@ async def stream( break - except ProxyError: + except ProxyError as exc: print('proxy error') continue print(3) - if is_chat: + + if is_chat and is_stream: chat_chunk = chat.create_chat_chunk( chat_id=chat_id, model=model, @@ -166,5 +198,9 @@ async def stream( yield 'data: [DONE]\n\n' + if not is_stream: + json_response = await response.json() + yield json_response.encode('utf8') + if __name__ == '__main__': asyncio.run(stream()) diff --git a/api/transfer.py b/api/transfer.py index e4a3e39..58b7fb7 100644 --- a/api/transfer.py +++ b/api/transfer.py @@ -87,7 +87,7 @@ async def handle(incoming_request): payload['user'] = str(user['_id']) - if not payload.get('stream') is True: + if 'chat/completions' in path and not payload.get('stream') is True: payload['stream'] = False return starlette.responses.StreamingResponse( @@ -99,5 +99,5 @@ async def handle(incoming_request): input_tokens=input_tokens, incoming_request=incoming_request, ), - media_type='text/event-stream' + media_type='text/event-stream' if payload.get('stream', False) else 'application/json' ) diff --git a/tests/__main__.py b/tests/__main__.py index 80b0792..7df796e 100644 --- a/tests/__main__.py +++ b/tests/__main__.py @@ -42,7 +42,7 @@ def test_api(model: str=MODEL, messages: List[dict]=None) -> dict: headers = { 'Content-Type': 'application/json', - 'Authorization': 'Bearer ' + api_key + 'Authorization': 'Bearer ' + closedai.api_key } json_data = { @@ -52,7 +52,7 @@ def test_api(model: str=MODEL, messages: List[dict]=None) -> dict: } response = httpx.post( - url=f'{api_endpoint}/v1/chat/completions', + url=f'{api_endpoint}/chat/completions', headers=headers, json=json_data, timeout=20 @@ -64,9 +64,6 @@ def test_api(model: str=MODEL, messages: List[dict]=None) -> dict: def test_library(): """Tests if the api_endpoint is working with the Python library.""" - closedai.api_base = api_endpoint - closedai.api_key = api_key - completion = closedai.ChatCompletion.create( model=MODEL, messages=MESSAGES, @@ -79,23 +76,27 @@ def test_library(): except: print('-') +def test_library_moderation(): + return closedai.Moderation.create("I wanna kill myself, I wanna kill myself; It's all I hear right now, it's all I hear right now") + def test_all(): """Runs all tests.""" # print(test_server()) - # print(test_api()) - print(test_library()) + print(test_api()) + # print(test_library()) + # print(test_library_moderation()) -def test_api(model: str=MODEL, messages: List[dict]=None) -> dict: +def test_api_moderation(model: str=MODEL, messages: List[dict]=None) -> dict: """Tests an API api_endpoint.""" headers = { - 'Authorization': 'Bearer ' + api_key + 'Authorization': 'Bearer ' + closedai.api_key } response = httpx.get( - url=f'{api_endpoint}/v1/usage', + url=f'{api_endpoint}/moderations', headers=headers, timeout=20 ) @@ -104,9 +105,8 @@ def test_api(model: str=MODEL, messages: List[dict]=None) -> dict: return response.text if __name__ == '__main__': - # api_endpoint = 'https://api.nova-oss.com' - api_endpoint = 'https://alpha-api.nova-oss.com' - api_key = os.getenv('TEST_NOVA_KEY') - # test_all() + api_endpoint = 'https://alpha-api.nova-oss.com/v1' + closedai.api_base = api_endpoint + closedai.api_key = os.getenv('TEST_NOVA_KEY') - print(test_api()) + test_all()