Исходный код pyqiwip2p.notify.async_client

import asyncio
import hashlib
import hmac
import inspect

from hypercorn.asyncio import serve
from hypercorn.config import Config
from starlette import status
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import Response
from starlette.routing import Route

from pyqiwip2p.p2p_types import Bill


[документация]class AioQiwiNotify: """ Штучка, чтобы работать с серверными уведомлениями Qiwi. Запускает сервер для приема уведомлений. А ещё тут есть хендлеры. :param auth_key: приватный ключ, на который настроен прием уведомлений. :type auth_key: ``str`` """ def __init__(self, auth_key: str, once=False): self.auth_key = auth_key self.handlers = [] self.once = once routes = [Route("/qiwi_notify", endpoint=self.server)] self.app = Starlette(routes=routes) def handler(self, func=None): def decorator(handler): self.handlers.append({"handler": handler, "filter": func}) return handler return decorator def _check_valid(self, bill: Bill, sha256: str): invoice_parameters = f"{bill.currency} | {bill.amount} | {bill.bill_id} | {bill.site_id} | {bill.status}" return bool( hmac.new( self.auth_key.encode(), invoice_parameters.encode(), hashlib.sha256 ).hexdigest() == sha256 ) async def _parse(self, response, sha256): bill = Bill(response) if self._check_valid(bill, sha256): loop = asyncio.get_event_loop() loop.create_task(self._check(bill)) return Response(status_code=status.HTTP_200_OK) return Response(status_code=status.HTTP_401_UNAUTHORIZED) async def _check(self, bill): for handler in self.handlers: if handler["filter"](bill): if inspect.iscoroutinefunction(handler["handler"]): await handler["handler"](bill) else: handler["handler"](bill) if self.once: break async def server(self, request: Request): await self._parse( await request.json(), request.headers["X-Api-Signature-SHA256"] ) def start(self, port: int = 12345, log_level="info"): config = Config() config.bind = [f"0.0.0.0:{port}"] asyncio.run(serve(self.app, config)) async def a_start(self, port: int = 12345, log_level="info"): config = Config() config.bind = [f"0.0.0.0:{port}"] await serve(self.app, Config())