diff --git a/README.md b/README.md index 34bc61e..a9045bf 100644 --- a/README.md +++ b/README.md @@ -77,21 +77,14 @@ Create a `.env` file, make sure not to reveal it to anyone, and fill in the requ - `PROXY_USER` (optional) - `PROXY_PASS` (optional) -### ClosedAI configuration -- `CLOSEDAI_KEY`: the API key used to access the ClosedAI API -- `CLOSEDAI_ENDPOINT` (defaults to `https://api.openai.com/v1`): the API endpoint which is used for the provider ClosedAI - ### `ACTUAL_IPS` (optional) This is a security measure to make sure a proxy, VPN, Tor or any other IP hiding service is used by the host when accessing "Closed"AI's API. It is a space separated list of IP addresses that are allowed to access the API. You can also just add the *beginning* of an API address, like `12.123.` (without an asterisk!) to allow all IPs starting with `12.123.`. > To disable the warning if you don't have this feature enabled, set `ACTUAL_IPS` to `None`. -### `DEMO_AUTH` -API key for demo purposes. You can give this to trusted team members. Never use it in production. - ### `CORE_API_KEY` -This will +This specifies the **very secret key** for accessing the entire user database etc. ## Run > **Warning:** read the according section for production usage! diff --git a/api/chat_balancing.py b/api/chat_balancing.py deleted file mode 100644 index 3f450b0..0000000 --- a/api/chat_balancing.py +++ /dev/null @@ -1,30 +0,0 @@ -import random -import asyncio - -import chat_providers - -provider_modules = [ - # chat_providers.twa, - chat_providers.quantum, - # chat_providers.churchless, - chat_providers.closed -] - -async def balance(payload: dict) -> dict: - providers_available = [] - - for provider_module in provider_modules: - if payload['stream'] and not provider_module.STREAMING: - continue - - if payload['model'] not in provider_module.MODELS: - continue - - providers_available.append(provider_module) - - provider = random.choice(providers_available) - return provider.chat_completion(**payload) - -if __name__ == '__main__': - req = asyncio.run(balance(payload={'model': 'gpt-3.5-turbo', 'stream': True})) - print(req['url']) diff --git a/api/config/credits.yml b/api/config/credits.yml new file mode 100644 index 0000000..e2f6807 --- /dev/null +++ b/api/config/credits.yml @@ -0,0 +1,23 @@ +max-credits: 100001 +start-credits: 1000 + +costs: + other: 50 + + chat-models: + gpt-3: 10 + gpt-4: 75 + gpt-4-32k: 100 + +# bonuses are multiplier for costs: +# final_cost = cost * bonus +bonuses: + owner: 0.1 + admin: 0.3 + helper: 0.4 + booster: 0.5 + +# discord reward 0.99^lvl? + +rewards: + day: 1000 diff --git a/api/core.py b/api/core.py index ddc6219..5895425 100644 --- a/api/core.py +++ b/api/core.py @@ -6,6 +6,7 @@ import fastapi from db import users +from dhooks import Webhook, Embed from dotenv import load_dotenv load_dotenv() @@ -31,6 +32,20 @@ async def get_users(discord_id: int, incoming_request: fastapi.Request): return user +def new_user_webhook(user: dict) -> None: + dhook = Webhook(os.getenv('DISCORD_WEBHOOK__USER_CREATED')) + + embed = Embed( + description='New User', + color=0x90ee90, + ) + + embed.add_field(name='ID', value=user['_id'], inline=False) + embed.add_field(name='Discord', value=user['auth']['discord']) + embed.add_field(name='Github', value=user['auth']['github']) + + dhook.send(embed=embed) + @router.post('/users') async def create_user(incoming_request: fastapi.Request): auth_error = await check_core_auth(incoming_request) @@ -43,6 +58,17 @@ async def create_user(incoming_request: fastapi.Request): discord_id = payload.get('discord_id') except (json.decoder.JSONDecodeError, AttributeError): return fastapi.Response(status_code=400, content='Invalid or no payload received.') - + user = await users.create(discord_id) + new_user_webhook(user) + return user + +if __name__ == '__main__': + new_user_webhook({ + '_id': 'JUST_A_TEST_IGNORE_ME', + 'auth': { + 'discord': 123, + 'github': 'abc' + } + }) diff --git a/api/db/logs.py b/api/db/logs.py index fde9cec..5f9d851 100644 --- a/api/db/logs.py +++ b/api/db/logs.py @@ -4,30 +4,30 @@ import time from dotenv import load_dotenv from motor.motor_asyncio import AsyncIOMotorClient +from helpers import network + load_dotenv() def _get_mongo(collection_name: str): return AsyncIOMotorClient(os.getenv('MONGO_URI'))['nova-core'][collection_name] -async def log_api_request(user, request, target_url): - payload = await request.json() +async def log_api_request(user: dict, incoming_request, target_url: str): + payload = await incoming_request.json() last_prompt = None if 'messages' in payload: last_prompt = payload['messages'][-1]['content'] - model = None - if 'model' in payload: - model = payload['model'] + model = payload.get('model') new_log_item = { 'timestamp': time.time(), - 'method': request.method, - 'path': request.url.path, + 'method': incoming_request.method, + 'path': incoming_request.url.path, 'user_id': user['_id'], 'security': { - 'ip': request.client.host, - 'useragent': request.headers.get('User-Agent') + 'ip': network.get_ip(incoming_request), + 'useragent': incoming_request.headers.get('User-Agent') }, 'details': { 'model': model, diff --git a/api/db/stats.py b/api/db/stats.py new file mode 100644 index 0000000..d9d5edc --- /dev/null +++ b/api/db/stats.py @@ -0,0 +1,42 @@ +import os +import pytz +import asyncio +import datetime + +from dotenv import load_dotenv +from motor.motor_asyncio import AsyncIOMotorClient + +load_dotenv() + +def _get_mongo(collection_name: str): + return AsyncIOMotorClient(os.getenv('MONGO_URI'))['nova-core'][collection_name] + +async def add_date(): + date = datetime.datetime.now(pytz.timezone('GMT')).strftime('%Y.%m.%d') + year, month, day = date.split('.') + + await _get_mongo('stats').update_one({}, {'$inc': {f'dates.{year}.{month}.{day}': 1}}, upsert=True) + +async def add_ip_address(ip_address: str): + ip_address = ip_address.replace('.', '_') + await _get_mongo('stats').update_one({}, {'$inc': {f'ips.{ip_address}': 1}}, upsert=True) + +async def add_target(url: str): + await _get_mongo('stats').update_one({}, {'$inc': {f'targets.{url}': 1}}, upsert=True) + +async def add_tokens(tokens: int, model: str): + await _get_mongo('stats').update_one({}, {'$inc': {f'tokens.{model}': tokens}}, upsert=True) + +async def add_model(model: str): + await _get_mongo('stats').update_one({}, {'$inc': {f'models.{model}': 1}}, upsert=True) + +async def add_path(path: str): + path = path.replace('/', '_') + await _get_mongo('stats').update_one({}, {'$inc': {f'paths.{path}': 1}}, upsert=True) + +async def get_value(obj_filter): + return await _get_mongo('stats').find_one({obj_filter}) + +if __name__ == '__main__': + asyncio.run(add_date()) + asyncio.run(add_path('/__demo/test')) diff --git a/api/db/users.py b/api/db/users.py index abc6242..b2c3e31 100644 --- a/api/db/users.py +++ b/api/db/users.py @@ -1,4 +1,5 @@ import os +import yaml import random import string import asyncio @@ -8,11 +9,15 @@ from motor.motor_asyncio import AsyncIOMotorClient load_dotenv() +with open('config/credits.yml', encoding='utf8') as f: + credits_config = yaml.safe_load(f) + def _get_mongo(collection_name: str): return AsyncIOMotorClient(os.getenv('MONGO_URI'))['nova-core'][collection_name] async def create(discord_id: int=0) -> dict: """Adds a new user to the MongoDB collection.""" + chars = string.ascii_letters + string.digits infix = os.getenv('KEYGEN_INFIX') @@ -23,7 +28,7 @@ async def create(discord_id: int=0) -> dict: new_user = { 'api_key': new_api_key, - 'credits': 1000, + 'credits': credits_config['start-credits'], 'role': '', 'status': { 'active': True, diff --git a/api/helpers/network.py b/api/helpers/network.py new file mode 100644 index 0000000..eb19198 --- /dev/null +++ b/api/helpers/network.py @@ -0,0 +1,2 @@ +async def get_ip(request) -> str: + return request.client.host diff --git a/api/load_balancing.py b/api/load_balancing.py new file mode 100644 index 0000000..b718540 --- /dev/null +++ b/api/load_balancing.py @@ -0,0 +1,42 @@ +import random +import asyncio + +import chat_providers + +provider_modules = [ + chat_providers.twa, + chat_providers.quantum, + chat_providers.churchless, + chat_providers.closed, + chat_providers.closed4 +] + +async def balance_chat_request(payload: dict) -> dict: + providers_available = [] + + for provider_module in provider_modules: + if payload['stream'] and not provider_module.STREAMING: + continue + + if payload['model'] not in provider_module.MODELS: + continue + + providers_available.append(provider_module) + + provider = random.choice(providers_available) + return provider.chat_completion(**payload) + +async def balance_organic_request(request: dict) -> dict: + providers_available = [] + + for provider_module in provider_modules: + if provider_module.ORGANIC: + providers_available.append(provider_module) + + provider = random.choice(providers_available) + + return provider.organify(request) + +if __name__ == '__main__': + req = asyncio.run(balance_chat_request(payload={'model': 'gpt-3.5-turbo', 'stream': True})) + print(req['url']) diff --git a/api/main.py b/api/main.py index d7852e3..cb593fe 100644 --- a/api/main.py +++ b/api/main.py @@ -9,8 +9,6 @@ from dotenv import load_dotenv import core import transfer -from db import users - load_dotenv() app = fastapi.FastAPI() diff --git a/api/moderation.py b/api/moderation.py new file mode 100644 index 0000000..a885682 --- /dev/null +++ b/api/moderation.py @@ -0,0 +1,18 @@ +import os +import asyncio +import openai as closedai + +from typing import Union +from dotenv import load_dotenv + +load_dotenv() + +closedai.api_key = os.getenv('LEGIT_CLOSEDAI_KEY') + +async def is_safe(text: Union[str, list]) -> bool: + return closedai.Moderation.create( + input=text, + )['results'][0]['flagged'] + +if __name__ == '__main__': + asyncio.run(is_safe('Hello')) diff --git a/api/netclient.py b/api/netclient.py deleted file mode 100644 index 0d79c45..0000000 --- a/api/netclient.py +++ /dev/null @@ -1,61 +0,0 @@ -import os -import aiohttp -import asyncio -import aiohttp_socks - -from dotenv import load_dotenv - -import proxies - -from helpers import exceptions - -load_dotenv() - -async def stream(request: dict, demo_mode: bool=False): - headers = { - 'Content-Type': 'application/json' - } - - for k, v in request.get('headers', {}).items(): - headers[k] = v - - for _ in range(3): - async with aiohttp.ClientSession(connector=proxies.default_proxy.connector) as session: - async with session.get( - # 'GET', - 'https://checkip.amazonaws.com/' - ) as response: - print(response.content) - print(type(response.content)) - - # html = await response.text() - # print(html) - - # async with session.get( - # method='GET', - # url='https://checkip.amazonaws.com', - # method=request.get('method', 'POST'), - # url=request['url'], - # json=request.get('payload', {}), - # headers=headers, - # timeout=aiohttp.ClientTimeout(total=float(os.getenv('TRANSFER_TIMEOUT', '120'))), - # ) as response: - # try: - # await response.raise_for_status() - # except Exception as exc: - # if 'Too Many Requests' in str(exc): - # continue - # else: - # break - - async for chunk in response.content.iter_chunks(): - # chunk = f'{chunk.decode("utf8")}\n\n' - - if demo_mode: - print(chunk) - - yield chunk - -if __name__ == '__main__': - asyncio.run(stream({'method': 'GET', 'url': 'https://checkip.amazonaws.com'}, True)) - diff --git a/api/streaming.py b/api/streaming.py new file mode 100644 index 0000000..d1ca49f --- /dev/null +++ b/api/streaming.py @@ -0,0 +1,107 @@ +import os +import yaml +import asyncio +import aiohttp +import starlette + +from dotenv import load_dotenv + +import proxies +import load_balancing + +from db import logs, users, stats +from rich import print +from helpers import network + +load_dotenv() + +DEMO_PAYLOAD = { + 'model': 'gpt-3.5-turbo', + 'messages': [ + { + 'role': 'user', + 'content': '1+1=' + } + ] +} + +with open('config/credits.yml', encoding='utf8') as f: + max_credits = yaml.safe_load(f)['max-credits'] + +async def stream( + path: str='/v1/chat/completions', + user: dict=None, + payload: dict=None, + credits_cost: int=0, + demo_mode: bool=False, + input_tokens: int=0, + incoming_request: starlette.requests.Request=None, +): + payload = payload or DEMO_PAYLOAD + + if 'chat/completions' in path: # is a chat endpoint + target_request = await load_balancing.balance_chat_request(payload) + else: + target_request = await load_balancing.balance_organic_request(payload) + + headers = { + 'Content-Type': 'application/json' + } + + for k, v in target_request.get('headers', {}).items(): + headers[k] = v + + for _ in range(5): + async with aiohttp.ClientSession(connector=proxies.default_proxy.connector) as session: + async with session.request( + method=target_request.get('method', 'POST'), + url=target_request['url'], + + data=target_request.get('data'), + json=target_request.get('payload'), + + headers=headers, + cookies=target_request.get('cookies'), + + ssl=False, + + timeout=aiohttp.ClientTimeout(total=float(os.getenv('TRANSFER_TIMEOUT', '120'))), + ) as response: + try: + await response.raise_for_status() + except Exception as exc: + if 'Too Many Requests' in str(exc): + continue + else: + break + + if user and incoming_request: + await logs.log_api_request( + user=user, + incoming_request=incoming_request, + target_url=target_request['url'] + ) + + if credits_cost and user: + await users.update_by_id(user['_id'], { + '$inc': {'credits': -credits_cost} + }) + + if not demo_mode: + await stats.add_date() + await stats.add_ip_address(network.get_ip(incoming_request)) + await stats.add_model(payload.get('model', '_non-chat')) + await stats.add_path(path) + await stats.add_target(target_request['url']) + await stats.add_tokens(input_tokens) + + async for chunk in response.content.iter_chunks(): + # chunk = f'{chunk.decode("utf8")}\n\n' + + if demo_mode: + print(chunk) + + yield chunk + +if __name__ == '__main__': + asyncio.run(stream()) diff --git a/api/transfer.py b/api/transfer.py index 1ef3545..04bdc20 100644 --- a/api/transfer.py +++ b/api/transfer.py @@ -2,13 +2,13 @@ import os import json +import yaml import logging import starlette from dotenv import load_dotenv -import netclient -import chat_balancing +import streaming from db import logs, users from helpers import tokens, errors, exceptions @@ -24,35 +24,41 @@ logging.basicConfig( logging.info('API started') +with open('config/credits.yml', encoding='utf8') as f: + credits_config = yaml.safe_load(f) + async def handle(incoming_request): """Transfer a streaming response from the incoming request to the target endpoint""" path = incoming_request.url.path + # METHOD if incoming_request.method not in ['GET', 'POST', 'PUT', 'DELETE', 'PATCH']: return errors.error(405, f'Method "{incoming_request.method}" is not allowed.', 'Change the request method to the correct one.') + # PAYLOAD try: payload = await incoming_request.json() except json.decoder.JSONDecodeError: payload = {} + # TOKENS try: input_tokens = tokens.count_for_messages(payload['messages']) except (KeyError, TypeError): input_tokens = 0 - auth_header = incoming_request.headers.get('Authorization') + # AUTH + received_key = incoming_request.headers.get('Authorization') - if not auth_header: + if not received_key: return errors.error(401, 'No NovaAI API key given!', 'Add "Authorization: Bearer nv-..." to your request headers.') - received_key = auth_header + if received_key.startswith('Bearer '): + received_key = received_key.split('Bearer ')[1] - if auth_header.startswith('Bearer '): - received_key = auth_header.split('Bearer ')[1] - - user = await users.by_api_key(received_key) + # USER + user = await users.by_api_key(received_key.strip()) if not user: return errors.error(401, 'Invalid NovaAI API key!', 'Create a new NovaOSS API key.') @@ -64,31 +70,36 @@ async def handle(incoming_request): if not user['status']['active']: return errors.error(418, 'Your NovaAI account is not active (paused).', 'Simply re-activate your account using a Discord command or the web panel.') + # COST + costs = credits_config['costs'] + cost = costs['other'] + + if 'chat/completions' in path: + for model_name, model_cost in costs['chat-models'].items(): + if model_name in payload['model']: + cost = model_cost + + role_cost_multiplier = credits_config['bonuses'].get(user['role'], 1) + cost = round(cost * role_cost_multiplier) + + if user['credits'] < cost: + return errors.error(429, 'Not enough credits.', 'Wait or earn more credits. Learn more on our website or Discord server.') + + # READY + payload['user'] = str(user['_id']) - cost = 1 - - if '/chat/completions' in path: - cost = 5 - - if 'gpt-4' in payload['model']: - cost = 10 - - else: - return errors.error(404, f'Sorry, we don\'t support "{path}" yet. We\'re working on it.', 'Contact our team.') - if not payload.get('stream') is True: payload['stream'] = False - if user['credits'] < cost: - return errors.error(429, 'Not enough credits.', 'You do not have enough credits to complete this request.') - - await users.update_by_id(user['_id'], {'$inc': {'credits': -cost}}) - - target_request = await chat_balancing.balance(payload) - - print(target_request['url']) - - return errors.error(500, 'Sorry, the API is currenly under maintainance.', 'Please try again later.') - - return starlette.responses.StreamingResponse(netclient.stream(target_request)) + return starlette.responses.StreamingResponse( + content=streaming.stream( + user=user, + path=path, + payload=payload, + credits_cost=cost, + input_tokens=input_tokens, + incoming_request=incoming_request, + ), + media_type='text/event-stream' + ) diff --git a/tests/__main__.py b/tests/__main__.py index aba7550..9f4db02 100644 --- a/tests/__main__.py +++ b/tests/__main__.py @@ -52,7 +52,7 @@ def test_api(model: str=MODEL, messages: List[dict]=None) -> dict: } response = httpx.post( - url=f'{api_endpoint}/chat/completions', + url=f'{api_endpoint}/v1/chat/completions', headers=headers, json=json_data, timeout=20 @@ -87,6 +87,7 @@ def test_all(): print(test_library()) if __name__ == '__main__': - api_endpoint = 'https://api.nova-oss.com' + # api_endpoint = 'https://api.nova-oss.com' + api_endpoint = 'http://localhost:2332' api_key = os.getenv('TEST_NOVA_KEY') test_all()