-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathauth.py
More file actions
243 lines (202 loc) · 8.11 KB
/
auth.py
File metadata and controls
243 lines (202 loc) · 8.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
import os
import time
import jwt
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Request, Response
from fastapi.responses import RedirectResponse
from sqlalchemy.orm import Session
from fastapi_sso.sso.google import GoogleSSO
from fastapi_sso.sso.yandex import YandexSSO
import database
import models
router = APIRouter(prefix="/api/auth", tags=["auth"])
JWT_SECRET = os.getenv("JWT_SECRET", "super-secret-key-change-me")
JWT_ALGORITHM = "HS256"
JWT_EXPIRATION_DAYS = 30
GOOGLE_CLIENT_ID = os.getenv("GOOGLE_CLIENT_ID", "")
GOOGLE_CLIENT_SECRET = os.getenv("GOOGLE_CLIENT_SECRET", "")
YANDEX_CLIENT_ID = os.getenv("YANDEX_CLIENT_ID", "")
YANDEX_CLIENT_SECRET = os.getenv("YANDEX_CLIENT_SECRET", "")
HOST_URL = os.getenv("HOST_URL", "http://localhost:5173")
_allow_insecure = os.getenv("ENVIRONMENT", "production") != "production"
google_sso = GoogleSSO(
GOOGLE_CLIENT_ID,
GOOGLE_CLIENT_SECRET,
f"{HOST_URL}/api/auth/google/callback",
allow_insecure_http=_allow_insecure
)
yandex_sso = YandexSSO(
YANDEX_CLIENT_ID,
YANDEX_CLIENT_SECRET,
f"{HOST_URL}/api/auth/yandex/callback",
allow_insecure_http=_allow_insecure
)
google_sso_mobile = GoogleSSO(
GOOGLE_CLIENT_ID,
GOOGLE_CLIENT_SECRET,
f"{HOST_URL}/api/auth/google/callback/mobile",
allow_insecure_http=_allow_insecure
)
yandex_sso_mobile = YandexSSO(
YANDEX_CLIENT_ID,
YANDEX_CLIENT_SECRET,
f"{HOST_URL}/api/auth/yandex/callback/mobile",
allow_insecure_http=_allow_insecure
)
def create_jwt_token(user_id: str) -> str:
payload = {
"sub": user_id,
"exp": time.time() + (JWT_EXPIRATION_DAYS * 24 * 60 * 60)
}
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
def get_current_user_id(request: Request) -> Optional[str]:
auth_header = request.headers.get("Authorization")
token = None
if auth_header and auth_header.startswith("Bearer "):
token = auth_header.split(" ")[1]
if not token:
token = request.cookies.get("dubtab_token")
if not token:
return None
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
return payload.get("sub")
except jwt.PyJWTError:
return None
def get_current_user_id_sync(token: str) -> Optional[str]:
if not token:
return None
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
return payload.get("sub")
except jwt.PyJWTError:
return None
@router.get("/google/login")
async def google_login():
with google_sso:
return await google_sso.get_login_redirect()
@router.get("/google/callback")
async def google_callback(request: Request, db: Session = Depends(database.get_db)):
with google_sso:
user_info = await google_sso.verify_and_process(request)
return handle_sso_login(user_info, "google", db)
@router.get("/yandex/login")
async def yandex_login():
with yandex_sso:
return await yandex_sso.get_login_redirect()
@router.get("/yandex/callback")
async def yandex_callback(request: Request, db: Session = Depends(database.get_db)):
with yandex_sso:
user_info = await yandex_sso.verify_and_process(request)
return handle_sso_login(user_info, "yandex", db)
@router.get("/google/login/mobile")
async def google_login_mobile():
with google_sso_mobile:
return await google_sso_mobile.get_login_redirect()
@router.get("/google/callback/mobile")
async def google_callback_mobile(request: Request, db: Session = Depends(database.get_db)):
with google_sso_mobile:
user_info = await google_sso_mobile.verify_and_process(request)
return handle_sso_login_mobile(user_info, "google", db)
@router.get("/yandex/login/mobile")
async def yandex_login_mobile():
with yandex_sso_mobile:
return await yandex_sso_mobile.get_login_redirect()
@router.get("/yandex/callback/mobile")
async def yandex_callback_mobile(request: Request, db: Session = Depends(database.get_db)):
with yandex_sso_mobile:
user_info = await yandex_sso_mobile.verify_and_process(request)
return handle_sso_login_mobile(user_info, "yandex", db)
def handle_sso_login_mobile(user_info, provider: str, db: Session):
if not user_info:
return RedirectResponse(url="dubtab://auth?error=failed")
user_id = f"{provider}_{user_info.id}"
user = db.query(models.User).filter(models.User.id == user_id).first()
if not user:
user = models.User(
id=user_id,
email=user_info.email,
name=user_info.display_name,
picture=user_info.picture,
provider=provider
)
db.add(user)
else:
user.name = user_info.display_name
user.picture = user_info.picture
db.commit()
token = create_jwt_token(user_id)
return RedirectResponse(url=f"dubtab://auth?token={token}&provider={provider}")
def handle_sso_login(user_info, provider: str, db: Session):
if not user_info:
raise HTTPException(status_code=401, detail="SSO authentication failed")
user_id = f"{provider}_{user_info.id}"
user = db.query(models.User).filter(models.User.id == user_id).first()
if not user:
user = models.User(
id=user_id,
email=user_info.email,
name=user_info.display_name,
picture=user_info.picture,
provider=provider
)
db.add(user)
else:
user.name = user_info.display_name
user.picture = user_info.picture
db.commit()
token = create_jwt_token(user_id)
# We redirect to / without the token in the URL for security.
# The frontend reads the httponly cookie 'dubtab_token' (which is already set below)
# or a regular cookie if we were doing JS reads, but httponly is safer.
# Actually, the frontend might need to know they logged in, but we can check /api/auth/me
response = RedirectResponse(url="/")
response.set_cookie("dubtab_token", token, max_age=JWT_EXPIRATION_DAYS*24*60*60, httponly=True, samesite="lax")
return response
@router.get("/me")
async def get_me(request: Request, db: Session = Depends(database.get_db)):
user_id = get_current_user_id(request)
if not user_id:
raise HTTPException(status_code=401, detail="Unauthorized")
user = db.query(models.User).filter(models.User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return {
"id": user.id,
"email": user.email,
"name": user.name,
"picture": user.picture,
"provider": user.provider
}
@router.get("/me/rooms")
async def get_my_rooms(request: Request, db: Session = Depends(database.get_db)):
user_id = get_current_user_id(request)
if not user_id:
raise HTTPException(status_code=401, detail="Unauthorized")
rooms = db.query(models.Room).filter(models.Room.owner_id == user_id).order_by(models.Room.last_activity.desc()).all()
return [{"id": r.id, "ttl": r.ttl, "last_activity": r.last_activity} for r in rooms]
@router.post("/logout")
async def logout():
response = Response(content='{"status":"ok"}', media_type="application/json")
response.delete_cookie("dubtab_token")
return response
@router.get("/token")
async def get_token(request: Request):
"""Возвращает текущий JWT токен — для использования с CLI."""
token = request.cookies.get("dubtab_token")
if not token:
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
token = auth_header.split(" ")[1]
if not token:
raise HTTPException(status_code=401, detail="Не авторизован. Сначала войди на dubtab.app")
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
user_id = payload.get("sub", "")
except jwt.PyJWTError:
raise HTTPException(status_code=401, detail="Токен невалиден")
return {
"token": token,
"user_id": user_id,
"hint": "Скопируй token и вставь в: dubtab config --token <token>"
}