Coverage for auth\jwt_handler.py: 80%

25 statements  

« prev     ^ index     » next       coverage.py v7.1.0, created at 2023-02-05 19:00 +0800

1import time 

2from datetime import datetime 

3 

4from database.connection import Settings 

5from fastapi import HTTPException, status 

6from jose import jwt, JWTError 

7from models.users import User 

8 

9settings = Settings() 

10 

11 

12def create_access_token(user: str) -> str: 

13 payload = { 

14 "user": user, 

15 "expires": time.time() + 3600 

16 } 

17 

18 token = jwt.encode(payload, settings.SECRET_KEY, algorithm="HS256") 

19 return token 

20 

21 

22async def verify_access_token(token: str) -> dict: 

23 try: 

24 data = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"]) 

25 

26 expire = data.get("expires") 

27 

28 if expire is None: 

29 raise HTTPException( 

30 status_code=status.HTTP_400_BAD_REQUEST, 

31 detail="No access token supplied" 

32 ) 

33 if datetime.utcnow() > datetime.utcfromtimestamp(expire): 

34 raise HTTPException( 

35 status_code=status.HTTP_403_FORBIDDEN, 

36 detail="Token expired!" 

37 ) 

38 user_exist = await User.find_one(User.email == data["user"]) 

39 if not user_exist: 

40 raise HTTPException( 

41 status_code=status.HTTP_400_BAD_REQUEST, 

42 detail="Invalid token" 

43 ) 

44 

45 return data 

46 

47 except JWTError: 

48 raise HTTPException( 

49 status_code=status.HTTP_400_BAD_REQUEST, 

50 detail="Invalid token" 

51 )