Browse Source

added request lib to utils

dev
Jingxun 8 months ago
parent
commit
810d53b1c4
  1. 68
      frontend/utils/request_util.py

68
frontend/utils/request_util.py

@ -0,0 +1,68 @@
# encoding: utf-8
"""
@author: Qiancj
@contact: qiancj@risenenergy.com
@file: request_util
@create-time: 2023-09-27 09:03
@description: The new python script
"""
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
class RequestToUrl:
def __init__(self, url_root, tags=None):
self.__url_root = url_root.rstrip("/")
self.__tags = tags.strip("/") if tags else tags
self.__headers = {
"Accept": "application/json, text/plain, */*",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9",
"Content-Type": "application/json;charset=utf-8",
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ("
"KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36"
)
}
@property
def __session(self):
session = requests.Session()
retry = Retry(total=3)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
def update_headers(self, **kwargs):
self.__headers = dict(self.__headers, **kwargs)
def update_tags(self, tags):
self.__tags = tags
return self
def __get_url(self, path):
path = f'{path.strip("/")}/'
url = "/".join([self.__url_root, self.__tags, path])
return url
def get(self, path, *, timeout=None, **query_params):
url = self.__get_url(path)
try:
with self.__session as s:
result = s.get(url, params=query_params, headers=self.__headers, timeout=timeout).json()
except Exception:
result = {"status": 500, "data": None, "msg": "网络连接超时,请检查网络连接"}
return result
def post(self, path, *, timeout=None, **body_params):
url = self.__get_url(path)
try:
with self.__session as s:
result = s.post(url, json=body_params, headers=self.__headers, timeout=timeout).json()
except Exception:
result = {"status": 500, "data": None, "msg": "网络连接超时,请检查网络连接"}
return result
Loading…
Cancel
Save