Исходный код pyqiwip2p.notify

import hashlib
import hmac
import json
import threading

import cherrypy

from pyqiwip2p.notify.async_client import AioQiwiNotify
from pyqiwip2p.p2p_types import Bill


[документация]class QiwiNotify: """ Штучка, чтобы работать с серверными уведомлениями Qiwi. Запускает сервер для приема уведомлений. А ещё тут есть хендлеры. :param auth_key: приватный ключ, на который настроен прием уведомлений. :type auth_key: ``str`` """ def __init__(self, auth_key: str, once=True): self.auth_key = auth_key self.handlers = [] self.once = once def handler(self, func=None): def decorator(handler): self.handlers.append({"handler": handler, "filter": func}) return handler return decorator def _check_valid(self, bill: Bill, sha256: str): invoice_parameters = f"{bill.currency} | {bill.amount} | {bill.bill_id} | {bill.site_id} | {bill.status}" return ( hmac.new( self.auth_key.encode(), invoice_parameters.encode(), hashlib.sha256 ).hexdigest() == sha256 ) def _parse(self, response, sha256): bill = Bill(response) if self._check_valid(bill, sha256): self._check(bill) else: raise cherrypy.HTTPError(403) def _check(self, bill): for handler in self.handlers: if handler["filter"](bill): handler["handler"](bill) if self.once: break
[документация] def start(qiwi, port: int = 8099): """ Функция для запуска веб-сервера, который будет обрабатывать входящие запросы и запускать захендленные функции. Внимание, предполагается проксирование, так как Qiwi отправляет запросы на 443 порт серверов с шифрованием доверенным сертификатом (SSL). Подниманиемый сервер не защищен никаким сертификатом и вообще ничего не может сделать для Qiwi. Рекомендую настроить Nginx. :param port: номер порта, на котором запустится сервер. :type port: ``int`` """ class WebhookServer(object): def check_headers(self, headers): if ( "content-type" in headers and "application/json" in headers["content-type"] and "X-Api-Signature-SHA256" in headers ): return True return False @cherrypy.expose def qiwi_notify(self): if self.check_headers(cherrypy.request.headers): json_string = cherrypy.request.body.read().decode("utf-8") qiwi._parse( json.loads(json_string), cherrypy.request.headers["X-Api-Signature-SHA256"], ) return "" raise cherrypy.HTTPError(403) cherrypy.config.update({"server.socket_port": port}) t = threading.Thread( target=lambda ws: cherrypy.quickstart(ws()), args=(WebhookServer,) ) t.start()