mirror of
https://github.com/NovaOSS/nova-api.git
synced 2024-11-25 20:43:56 +01:00
Fixed moderation, cleanup and other performance changes
This commit is contained in:
parent
0e4a0c60ec
commit
5fdc15c90f
29
api/after_request.py
Normal file
29
api/after_request.py
Normal file
|
@ -0,0 +1,29 @@
|
||||||
|
from db import logs, stats, users
|
||||||
|
from helpers import network
|
||||||
|
|
||||||
|
async def after_request(
|
||||||
|
incoming_request: dict,
|
||||||
|
target_request: dict,
|
||||||
|
user: dict,
|
||||||
|
credits_cost: int,
|
||||||
|
input_tokens: int,
|
||||||
|
path: str,
|
||||||
|
is_chat: bool,
|
||||||
|
model: str,
|
||||||
|
) -> None:
|
||||||
|
if user and incoming_request:
|
||||||
|
await logs.log_api_request(user=user, incoming_request=incoming_request, target_url=target_request['url'])
|
||||||
|
|
||||||
|
if credits_cost and user:
|
||||||
|
await users.manager.update_by_id(user['_id'], {'$inc': {'credits': -credits_cost}})
|
||||||
|
|
||||||
|
ip_address = await network.get_ip(incoming_request)
|
||||||
|
|
||||||
|
await stats.manager.add_date()
|
||||||
|
await stats.manager.add_ip_address(ip_address)
|
||||||
|
await stats.manager.add_path(path)
|
||||||
|
await stats.manager.add_target(target_request['url'])
|
||||||
|
|
||||||
|
if is_chat:
|
||||||
|
await stats.manager.add_model(model)
|
||||||
|
await stats.manager.add_tokens(input_tokens, model)
|
|
@ -61,6 +61,8 @@ class StatsManager:
|
||||||
db = await self._get_collection('stats')
|
db = await self._get_collection('stats')
|
||||||
return await db.find_one({obj_filter})
|
return await db.find_one({obj_filter})
|
||||||
|
|
||||||
|
manager = StatsManager()
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
stats = StatsManager()
|
stats = StatsManager()
|
||||||
asyncio.run(stats.add_date())
|
asyncio.run(stats.add_date())
|
||||||
|
|
|
@ -101,6 +101,8 @@ class UserManager:
|
||||||
db = await self._get_collection('users')
|
db = await self._get_collection('users')
|
||||||
await db.delete_one({'_id': user_id})
|
await db.delete_one({'_id': user_id})
|
||||||
|
|
||||||
|
manager = UserManager()
|
||||||
|
|
||||||
async def demo():
|
async def demo():
|
||||||
user = await UserManager().create(69420)
|
user = await UserManager().create(69420)
|
||||||
print(user)
|
print(user)
|
||||||
|
|
|
@ -40,11 +40,6 @@ async def handle(incoming_request: fastapi.Request):
|
||||||
except json.decoder.JSONDecodeError:
|
except json.decoder.JSONDecodeError:
|
||||||
payload = {}
|
payload = {}
|
||||||
|
|
||||||
try:
|
|
||||||
input_tokens = await tokens.count_for_messages(payload.get('messages', []))
|
|
||||||
except (KeyError, TypeError):
|
|
||||||
input_tokens = 0
|
|
||||||
|
|
||||||
received_key = incoming_request.headers.get('Authorization')
|
received_key = incoming_request.headers.get('Authorization')
|
||||||
|
|
||||||
if not received_key or not received_key.startswith('Bearer '):
|
if not received_key or not received_key.startswith('Bearer '):
|
||||||
|
@ -70,8 +65,14 @@ async def handle(incoming_request: fastapi.Request):
|
||||||
|
|
||||||
policy_violation = False
|
policy_violation = False
|
||||||
if '/moderations' not in path:
|
if '/moderations' not in path:
|
||||||
if '/chat/completions' in path or ('input' in payload or 'prompt' in payload):
|
inp = ''
|
||||||
|
|
||||||
|
if 'input' in payload or 'prompt' in payload:
|
||||||
inp = payload.get('input', payload.get('prompt', ''))
|
inp = payload.get('input', payload.get('prompt', ''))
|
||||||
|
|
||||||
|
if isinstance(payload.get('messages'), list):
|
||||||
|
inp = '\n'.join([message['content'] for message in payload['messages']])
|
||||||
|
|
||||||
if inp and len(inp) > 2 and not inp.isnumeric():
|
if inp and len(inp) > 2 and not inp.isnumeric():
|
||||||
policy_violation = await moderation.is_policy_violated(inp)
|
policy_violation = await moderation.is_policy_violated(inp)
|
||||||
|
|
||||||
|
@ -104,7 +105,7 @@ async def handle(incoming_request: fastapi.Request):
|
||||||
path=path,
|
path=path,
|
||||||
payload=payload,
|
payload=payload,
|
||||||
credits_cost=cost,
|
credits_cost=cost,
|
||||||
input_tokens=input_tokens,
|
input_tokens=-1,
|
||||||
incoming_request=incoming_request,
|
incoming_request=incoming_request,
|
||||||
),
|
),
|
||||||
media_type=media_type
|
media_type=media_type
|
||||||
|
|
|
@ -1,3 +1,5 @@
|
||||||
|
import time
|
||||||
|
import asyncio
|
||||||
import tiktoken
|
import tiktoken
|
||||||
|
|
||||||
async def count_for_messages(messages: list, model: str='gpt-3.5-turbo-0613') -> int:
|
async def count_for_messages(messages: list, model: str='gpt-3.5-turbo-0613') -> int:
|
||||||
|
@ -57,3 +59,15 @@ for information on how messages are converted to tokens.""")
|
||||||
num_tokens += 3 # every reply is primed with <|start|>assistant<|message|>
|
num_tokens += 3 # every reply is primed with <|start|>assistant<|message|>
|
||||||
|
|
||||||
return num_tokens
|
return num_tokens
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
start = time.perf_counter()
|
||||||
|
|
||||||
|
messages = [
|
||||||
|
{
|
||||||
|
'role': 'user',
|
||||||
|
'content': '1+1='
|
||||||
|
}
|
||||||
|
]
|
||||||
|
print(asyncio.run(count_for_messages(messages)))
|
||||||
|
print(f'Took {(time.perf_counter() - start) * 1000}ms')
|
||||||
|
|
|
@ -63,4 +63,5 @@ async def root():
|
||||||
|
|
||||||
@app.route('/v1/{path:path}', methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH'])
|
@app.route('/v1/{path:path}', methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH'])
|
||||||
async def v1_handler(request: fastapi.Request):
|
async def v1_handler(request: fastapi.Request):
|
||||||
return await handler.handle(request)
|
res = await handler.handle(request)
|
||||||
|
return res
|
||||||
|
|
|
@ -30,8 +30,6 @@ async def is_policy_violated(inp: Union[str, list]) -> bool:
|
||||||
else:
|
else:
|
||||||
text = '\n'.join(inp)
|
text = '\n'.join(inp)
|
||||||
|
|
||||||
print(f'[i] checking moderation for {text}')
|
|
||||||
|
|
||||||
for _ in range(3):
|
for _ in range(3):
|
||||||
req = await load_balancing.balance_organic_request(
|
req = await load_balancing.balance_organic_request(
|
||||||
{
|
{
|
||||||
|
@ -39,7 +37,6 @@ async def is_policy_violated(inp: Union[str, list]) -> bool:
|
||||||
'payload': {'input': text}
|
'payload': {'input': text}
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
print(f'[i] moderation request sent to {req["url"]}')
|
|
||||||
|
|
||||||
async with aiohttp.ClientSession(connector=proxies.get_proxy().connector) as session:
|
async with aiohttp.ClientSession(connector=proxies.get_proxy().connector) as session:
|
||||||
try:
|
try:
|
||||||
|
@ -52,16 +49,13 @@ async def is_policy_violated(inp: Union[str, list]) -> bool:
|
||||||
headers=req.get('headers'),
|
headers=req.get('headers'),
|
||||||
cookies=req.get('cookies'),
|
cookies=req.get('cookies'),
|
||||||
ssl=False,
|
ssl=False,
|
||||||
timeout=aiohttp.ClientTimeout(total=2),
|
timeout=aiohttp.ClientTimeout(total=3),
|
||||||
) as res:
|
) as res:
|
||||||
res.raise_for_status()
|
res.raise_for_status()
|
||||||
json_response = await res.json()
|
json_response = await res.json()
|
||||||
print(json_response)
|
|
||||||
|
|
||||||
categories = json_response['results'][0]['category_scores']
|
categories = json_response['results'][0]['category_scores']
|
||||||
|
|
||||||
print(f'[i] moderation check took {time.perf_counter() - start:.2f}s')
|
|
||||||
|
|
||||||
if json_response['results'][0]['flagged']:
|
if json_response['results'][0]['flagged']:
|
||||||
return max(categories, key=categories.get)
|
return max(categories, key=categories.get)
|
||||||
|
|
||||||
|
|
|
@ -10,16 +10,13 @@ import starlette
|
||||||
|
|
||||||
from rich import print
|
from rich import print
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
from python_socks._errors import ProxyError
|
|
||||||
|
|
||||||
import chunks
|
import chunks
|
||||||
import proxies
|
import proxies
|
||||||
import provider_auth
|
import provider_auth
|
||||||
|
import after_request
|
||||||
import load_balancing
|
import load_balancing
|
||||||
|
|
||||||
from db import logs
|
|
||||||
from db.users import UserManager
|
|
||||||
from db.stats import StatsManager
|
|
||||||
from helpers import network, chat, errors
|
from helpers import network, chat, errors
|
||||||
|
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
|
@ -54,13 +51,11 @@ async def stream(
|
||||||
If not streaming, it sends the result in its entirety.
|
If not streaming, it sends the result in its entirety.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
## Setup managers
|
|
||||||
db = UserManager()
|
|
||||||
stats = StatsManager()
|
|
||||||
|
|
||||||
is_chat = False
|
is_chat = False
|
||||||
is_stream = payload.get('stream', False)
|
is_stream = payload.get('stream', False)
|
||||||
|
|
||||||
|
model = None
|
||||||
|
|
||||||
if 'chat/completions' in path:
|
if 'chat/completions' in path:
|
||||||
is_chat = True
|
is_chat = True
|
||||||
model = payload['model']
|
model = payload['model']
|
||||||
|
@ -78,7 +73,6 @@ async def stream(
|
||||||
}
|
}
|
||||||
|
|
||||||
for _ in range(5):
|
for _ in range(5):
|
||||||
|
|
||||||
# Load balancing: randomly selecting a suitable provider
|
# Load balancing: randomly selecting a suitable provider
|
||||||
# If the request is a chat completion, then we need to load balance between chat providers
|
# If the request is a chat completion, then we need to load balance between chat providers
|
||||||
# If the request is an organic request, then we need to load balance between organic providers
|
# If the request is an organic request, then we need to load balance between organic providers
|
||||||
|
@ -120,17 +114,19 @@ async def stream(
|
||||||
cookies=target_request.get('cookies'),
|
cookies=target_request.get('cookies'),
|
||||||
ssl=False,
|
ssl=False,
|
||||||
timeout=aiohttp.ClientTimeout(
|
timeout=aiohttp.ClientTimeout(
|
||||||
connect=60,
|
connect=2,
|
||||||
total=float(os.getenv('TRANSFER_TIMEOUT', '120'))
|
total=float(os.getenv('TRANSFER_TIMEOUT', '120'))
|
||||||
),
|
),
|
||||||
) as response:
|
) as response:
|
||||||
|
|
||||||
if response.status == 429:
|
if response.status == 429:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if response.content_type == 'application/json':
|
if response.content_type == 'application/json':
|
||||||
data = await response.json()
|
data = await response.json()
|
||||||
|
|
||||||
if data.get('code') == 'invalid_api_key':
|
if 'invalid_api_key' in str(data) or 'account_deactivated' in str(data):
|
||||||
|
print('[!] invalid api key', target_request.get('provider_auth'))
|
||||||
await provider_auth.invalidate_key(target_request.get('provider_auth'))
|
await provider_auth.invalidate_key(target_request.get('provider_auth'))
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
@ -155,19 +151,11 @@ async def stream(
|
||||||
|
|
||||||
break
|
break
|
||||||
|
|
||||||
except ProxyError as exc:
|
except Exception as exc:
|
||||||
print('[!] aiohttp ProxyError')
|
print(f'[!] {type(exc)} - {exc}')
|
||||||
continue
|
continue
|
||||||
|
|
||||||
except ConnectionResetError as exc:
|
if (not json_response) and is_chat:
|
||||||
print('[!] aiohttp ConnectionResetError')
|
|
||||||
continue
|
|
||||||
|
|
||||||
except aiohttp.client_exceptions.ClientConnectionError:
|
|
||||||
print('[!] aiohttp ClientConnectionError')
|
|
||||||
continue
|
|
||||||
|
|
||||||
if not json_response and is_chat and is_stream:
|
|
||||||
print('[!] chat response is empty')
|
print('[!] chat response is empty')
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
@ -178,20 +166,16 @@ async def stream(
|
||||||
if not is_stream and json_response:
|
if not is_stream and json_response:
|
||||||
yield json.dumps(json_response)
|
yield json.dumps(json_response)
|
||||||
|
|
||||||
if user and incoming_request:
|
await after_request.after_request(
|
||||||
await logs.log_api_request(user=user, incoming_request=incoming_request, target_url=target_request['url'])
|
incoming_request=incoming_request,
|
||||||
|
target_request=target_request,
|
||||||
if credits_cost and user:
|
user=user,
|
||||||
await db.update_by_id(user['_id'], {'$inc': {'credits': -credits_cost}})
|
credits_cost=credits_cost,
|
||||||
|
input_tokens=input_tokens,
|
||||||
ip_address = await network.get_ip(incoming_request)
|
path=path,
|
||||||
await stats.add_date()
|
is_chat=is_chat,
|
||||||
await stats.add_ip_address(ip_address)
|
model=model,
|
||||||
await stats.add_path(path)
|
)
|
||||||
await stats.add_target(target_request['url'])
|
|
||||||
if is_chat:
|
|
||||||
await stats.add_model(model)
|
|
||||||
await stats.add_tokens(input_tokens, model)
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
asyncio.run(stream())
|
asyncio.run(stream())
|
||||||
|
|
Loading…
Reference in a new issue