2023-08-04 17:29:49 +02:00
|
|
|
import base64
|
|
|
|
import asyncio
|
|
|
|
|
2023-08-04 03:30:56 +02:00
|
|
|
async def get_ip(request) -> str:
|
2023-08-12 17:49:31 +02:00
|
|
|
"""Get the IP address of the incoming request."""
|
|
|
|
|
2023-08-05 02:30:42 +02:00
|
|
|
xff = None
|
|
|
|
if request.headers.get('x-forwarded-for'):
|
|
|
|
xff, *_ = request.headers['x-forwarded-for'].split(', ')
|
|
|
|
|
|
|
|
possible_ips = [
|
|
|
|
xff,
|
|
|
|
request.headers.get('cf-connecting-ip'),
|
|
|
|
request.client.host
|
|
|
|
]
|
|
|
|
|
|
|
|
detected_ip = next((i for i in possible_ips if i), None)
|
|
|
|
|
|
|
|
return detected_ip
|
2023-08-04 17:29:49 +02:00
|
|
|
|
|
|
|
async def add_proxy_auth_to_headers(username: str, password: str, headers: dict) -> dict:
|
2023-08-12 17:49:31 +02:00
|
|
|
"""Add proxy authentication to the headers. This is useful if the proxy authentication doesn't work as it should."""
|
|
|
|
|
2023-08-04 17:29:49 +02:00
|
|
|
proxy_auth = base64.b64encode(f'{username}:{password}'.encode()).decode()
|
|
|
|
headers['Proxy-Authorization'] = f'Basic {proxy_auth}'
|
|
|
|
return headers
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
print(asyncio.run(add_proxy_auth_to_headers(
|
|
|
|
'user',
|
|
|
|
'pass',
|
|
|
|
{
|
|
|
|
'Authorization': 'Bearer demo',
|
|
|
|
'Another-Header': '123'
|
|
|
|
}
|
|
|
|
)))
|