Jingxun
8 months ago
3 changed files with 154 additions and 0 deletions
@ -0,0 +1,90 @@ |
|||
# encoding: utf-8 |
|||
""" |
|||
@author: Qiancj |
|||
@contact: qiancj@risenenergy.com |
|||
@file: models |
|||
@create-time: 2023-09-26 14:07 |
|||
@description: The new python script |
|||
""" |
|||
import binascii |
|||
import os |
|||
|
|||
from utils import mysql_db, gm_encrypt, redis_db, gm_decrypt |
|||
|
|||
|
|||
class LoginOperations: |
|||
_instance = None |
|||
|
|||
def __new__(cls, *args, **kw): |
|||
if not cls._instance: |
|||
cls._instance = object.__new__(cls) |
|||
return cls._instance |
|||
|
|||
def __init__(self, username, step, password=None): |
|||
self.__user = username |
|||
self.__step = step |
|||
self.__pwd = password |
|||
|
|||
def get_user_salt(self): |
|||
sql = f""" |
|||
select salt |
|||
from tb_users |
|||
where username='{self.__user}' and |
|||
step='{self.__step}' |
|||
""" |
|||
row = mysql_db.get_one(sql) |
|||
if not row: |
|||
return {"status": 400404, "data": None, "msg": "未查询到该用户请查实后重新登录"} |
|||
salt = row[0] |
|||
salt_out_value = gm_encrypt.sm2_encrypt(salt) |
|||
return { |
|||
"status": 200, "data": salt_out_value, "msg": "Sucess!" |
|||
} |
|||
|
|||
@property |
|||
def __generate_user_token(self): |
|||
return binascii.hexlify(os.urandom(32)).decode() |
|||
|
|||
@staticmethod |
|||
def __save_token(key, token): |
|||
with redis_db as r: |
|||
r.set(key, token) |
|||
|
|||
def login(self): |
|||
pwd = gm_decrypt.sm2_decrypt(self.__pwd) |
|||
sql = f""" |
|||
select is_admin |
|||
from tb_users |
|||
where username='{self.__user}' and |
|||
step='{self.__step}' and |
|||
concat(password)='{pwd}' |
|||
""" |
|||
row = mysql_db.get_one(sql) |
|||
if not row: |
|||
sql = f""" |
|||
select username |
|||
from tb_users |
|||
where username='{self.__user}' and |
|||
concat(password)='{pwd}' |
|||
""" |
|||
row = mysql_db.get_one(sql) |
|||
if not row: |
|||
return {"status": 400403, "data": None, "msg": "用户名或密码错误,请查实后重新登录"} |
|||
return {"status": 400200, "data": None, "msg": "该用户没有登录该岗位的权限,请重新选择正确岗位后登录"} |
|||
token = self.__generate_user_token |
|||
self.__save_token(f"{self.__user}_token", token) |
|||
return { |
|||
"status": 200, |
|||
"data": { |
|||
"user": self.__user, |
|||
"role": row[0], |
|||
"token": f"{self.__user}_token---{token}" |
|||
}, |
|||
"msg": "Sucess!" |
|||
} |
|||
|
|||
|
|||
def user_logout(user): |
|||
with redis_db as r: |
|||
r.delete(f"{user}_token") |
|||
return {"status": 200, "data": None, "msg": "用户已退出登录"} |
@ -0,0 +1,19 @@ |
|||
# encoding: utf-8 |
|||
""" |
|||
@author: Qiancj |
|||
@contact: qiancj@risenenergy.com |
|||
@file: request_body |
|||
@create-time: 2023-09-26 14:54 |
|||
@description: The new python script |
|||
""" |
|||
from pydantic import BaseModel |
|||
|
|||
|
|||
class UserInfo(BaseModel): |
|||
user: str |
|||
step: str |
|||
pwd: str |
|||
|
|||
|
|||
class Username(BaseModel): |
|||
user: str |
@ -0,0 +1,45 @@ |
|||
# encoding: utf-8 |
|||
""" |
|||
@author: Qiancj |
|||
@contact: qiancj@risenenergy.com |
|||
@file: account |
|||
@create-time: 2023-09-25 15:04 |
|||
@description: The new python script |
|||
""" |
|||
|
|||
from fastapi import APIRouter |
|||
|
|||
from decorators import cbv |
|||
from decorators import DecratorSet |
|||
from .models import LoginOperations, user_logout |
|||
from .request_body import UserInfo, Username |
|||
|
|||
account_router = APIRouter() |
|||
|
|||
|
|||
@cbv(account_router) |
|||
class LoginAPI: |
|||
|
|||
@account_router.post("/login/", tags=["account"]) |
|||
@DecratorSet.log_dec |
|||
def login(self, user_info: UserInfo): |
|||
user = user_info.user |
|||
step = user_info.step |
|||
pwd = user_info.pwd |
|||
out = LoginOperations(user, step, pwd).login() |
|||
return out |
|||
|
|||
@account_router.get("/get_user_salt/", tags=["account"]) |
|||
@DecratorSet.log_dec |
|||
def get_salt(self, user: str, step: str): |
|||
out = LoginOperations(user, step).get_user_salt() |
|||
return out |
|||
|
|||
|
|||
@cbv(account_router) |
|||
class LogoutAPI: |
|||
@account_router.post("/logout/", tags=["account"]) |
|||
def logout(self, user_info: Username): |
|||
user = user_info.user |
|||
out = user_logout(user) |
|||
return out |
Loading…
Reference in new issue