Compare commits

...

3 Commits

  1. 36
      frontend/utils/__init__.py
  2. 32
      frontend/utils/gmssl_pack.py
  3. 68
      frontend/utils/request_util.py

36
frontend/utils/__init__.py

@ -0,0 +1,36 @@
# encoding: utf-8
"""
@author: Qiancj
@contact: qiancj@risenenergy.com
@file: __init__.py
@create-time: 2023-09-21 09:21
@description: The new python script
"""
__all__ = [
"GmEncryptDecrypt",
"keyf",
"RequestToUrl",
"gm_encrypt",
"gm_decrypt",
"requester",
]
from .gmssl_pack import GmEncryptDecrypt
from .request_util import RequestToUrl
keyf = {
"pkf": "",
"pkb": "",
"sk": ""
}
gm_encrypt = GmEncryptDecrypt(
sm2_public_key=keyf.get("pkb")
)
gm_decrypt = GmEncryptDecrypt(
sm2_public_key=keyf.get("pkf"),
sm2_private_key=keyf.get("sk")
)
requester = RequestToUrl("http://127.0.0.1:8080/")

32
frontend/utils/gmssl_pack.py

@ -0,0 +1,32 @@
# encoding: utf-8
"""
@author: Qiancj
@contact: qiancj@risenenergy.com
@file: gmssl_pack
@create-time: 2023-09-21 09:22
@description: The new python script
"""
from gmssl.sm2 import CryptSM2
from gmssl.sm3 import sm3_hash
class GmEncryptDecrypt:
"""
国密sm4加解密
"""
def __init__(self, sm2_private_key=None, sm2_public_key=None, mode=1):
self.__sm2_endecrpt = CryptSM2(private_key=sm2_private_key, public_key=sm2_public_key, mode=mode)
def sm2_encrypt(self, text):
data = text.encode() if isinstance(text, str) else text
return self.__sm2_endecrpt.encrypt(data).hex()
def sm2_decrypt(self, msg):
data = bytes.fromhex(msg)
return self.__sm2_endecrpt.decrypt(data).decode()
@staticmethod
def sm3_encrypt(text):
return sm3_hash(bytearray(text, encoding="utf-8"))

68
frontend/utils/request_util.py

@ -0,0 +1,68 @@
# encoding: utf-8
"""
@author: Qiancj
@contact: qiancj@risenenergy.com
@file: request_util
@create-time: 2023-09-27 09:03
@description: The new python script
"""
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
class RequestToUrl:
def __init__(self, url_root, tags=None):
self.__url_root = url_root.rstrip("/")
self.__tags = tags.strip("/") if tags else tags
self.__headers = {
"Accept": "application/json, text/plain, */*",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9",
"Content-Type": "application/json;charset=utf-8",
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ("
"KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36"
)
}
@property
def __session(self):
session = requests.Session()
retry = Retry(total=3)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
def update_headers(self, **kwargs):
self.__headers = dict(self.__headers, **kwargs)
def update_tags(self, tags):
self.__tags = tags
return self
def __get_url(self, path):
path = f'{path.strip("/")}/'
url = "/".join([self.__url_root, self.__tags, path])
return url
def get(self, path, *, timeout=None, **query_params):
url = self.__get_url(path)
try:
with self.__session as s:
result = s.get(url, params=query_params, headers=self.__headers, timeout=timeout).json()
except Exception:
result = {"status": 500, "data": None, "msg": "网络连接超时,请检查网络连接"}
return result
def post(self, path, *, timeout=None, **body_params):
url = self.__get_url(path)
try:
with self.__session as s:
result = s.post(url, json=body_params, headers=self.__headers, timeout=timeout).json()
except Exception:
result = {"status": 500, "data": None, "msg": "网络连接超时,请检查网络连接"}
return result
Loading…
Cancel
Save