|
| 1 | +from dataclasses import dataclass |
| 2 | +from datetime import date, datetime |
| 3 | +from types import TracebackType |
| 4 | +from typing import Literal, Self |
| 5 | + |
| 6 | +from sqlalchemy import select, func |
| 7 | + |
| 8 | +from src.app.auth.artists.models import ArtistProfile |
| 9 | +from src.app.auth.producers.models import ProducerProfile |
| 10 | +from src.app.auth.users.interfaces.da.models import User |
| 11 | +from src.app.music.albums.interfaces.da.models import Album |
| 12 | +from src.app.music.squads.models import Squad |
| 13 | +from src.app.social.licenses.models import License |
| 14 | +from src.app.social.playlists.models import Playlist |
| 15 | +from src.app.social.tags.models import Tag |
| 16 | +from src.domain.auth.users.interfaces.da.dao import DAO |
| 17 | +from src.domain.auth.users.interfaces.da.models import AccessLevel, PremiumLevel |
| 18 | +from src.infrastructure.loggers import app as logger |
| 19 | +from src.infrastructure.postgres import PostgresSessionMixin |
| 20 | + |
| 21 | + |
| 22 | +@dataclass |
| 23 | +class PostgresDAOImplementation(DAO, PostgresSessionMixin): |
| 24 | + def __init__(self) -> None: |
| 25 | + super(PostgresDAOImplementation, self).__init__(table=User) |
| 26 | + self.table = User |
| 27 | + |
| 28 | + async def get_user_by_id(self, user_id: int) -> User | None: |
| 29 | + logger.info("get_user_by_id DAO request") |
| 30 | + response: User | None = await self.read(obj_id=user_id) |
| 31 | + return response |
| 32 | + |
| 33 | + async def get_user_by_username(self, username: str) -> User | None: |
| 34 | + logger.info("get_user_by_username DAO request") |
| 35 | + response: User | None = await self.run( |
| 36 | + statement=select(User).group_by(User.id).filter(User.username == username), |
| 37 | + method="scalar", |
| 38 | + ) |
| 39 | + return response |
| 40 | + |
| 41 | + async def get_users(self) -> list[User]: # type: ignore[override] |
| 42 | + logger.info("get_users DAO request") |
| 43 | + response: list[User] = await self.run( |
| 44 | + statement=select(User).group_by(User.id).order_by(User.updated_at), |
| 45 | + method="scalars", |
| 46 | + ) |
| 47 | + return response |
| 48 | + |
| 49 | + async def count_users(self) -> int: |
| 50 | + logger.info("count_users DAO request") |
| 51 | + response: int = await self.run( |
| 52 | + statement=select(func.count(User.id)), |
| 53 | + method="scalar", |
| 54 | + ) |
| 55 | + return response |
| 56 | + |
| 57 | + async def create_user( |
| 58 | + self, |
| 59 | + email: str, |
| 60 | + password: str, |
| 61 | + username: str, |
| 62 | + created_at: date, |
| 63 | + updated_at: datetime, |
| 64 | + access_level: Literal["user", "admin", "superuser"], |
| 65 | + premium_level: Literal["none", "bot", "full"], |
| 66 | + is_verified: bool, |
| 67 | + artist_id: int, |
| 68 | + producer_id: int, |
| 69 | + licenses_ids: list[int], |
| 70 | + followed_squads_ids: list[int], |
| 71 | + followed_artists_ids: list[int], |
| 72 | + coauthored_playlists_ids: list[int], |
| 73 | + saved_playlists_ids: list[int], |
| 74 | + followed_producers_ids: list[int], |
| 75 | + saved_albums_ids: list[int], |
| 76 | + followed_tags: list[str], |
| 77 | + telegram_id: int | None = None, |
| 78 | + description: str | None = None, |
| 79 | + picture_url: str | None = None, |
| 80 | + ) -> int: |
| 81 | + logger.info("create_user DAO request") |
| 82 | + user = User( |
| 83 | + username=username, |
| 84 | + description=description, |
| 85 | + email=email, |
| 86 | + password=password, |
| 87 | + picture_url=picture_url, |
| 88 | + created_at=created_at, |
| 89 | + updated_at=updated_at, |
| 90 | + access_level=access_level, |
| 91 | + premium_level=premium_level, |
| 92 | + telegram_id=telegram_id, |
| 93 | + is_verified=is_verified, |
| 94 | + artist_id=artist_id, |
| 95 | + producer_id=producer_id, |
| 96 | + artist_profile=await self.run( |
| 97 | + statement=select(ArtistProfile).group_by(ArtistProfile.id).filter(ArtistProfile.id == artist_id), |
| 98 | + method="scalar", |
| 99 | + ), |
| 100 | + producer_profile=await self.run( |
| 101 | + statement=select(ProducerProfile).group_by(ProducerProfile.id).filter(ProducerProfile.id == producer_id), |
| 102 | + method="scalar", |
| 103 | + ), |
| 104 | + licenses=await self.run( |
| 105 | + statement=select(License).group_by(License.id).filter(License.id.in_(licenses_ids)), |
| 106 | + method="scalars", |
| 107 | + ) if licenses_ids else list(), |
| 108 | + followed_squads=await self.run( |
| 109 | + statement=select(Squad).group_by(Squad.id).filter(Squad.id.in_(followed_squads_ids)), |
| 110 | + method="scalars", |
| 111 | + ) if followed_squads_ids else list(), |
| 112 | + followed_artists=await self.run( |
| 113 | + statement=select(ArtistProfile).group_by(ArtistProfile.id).filter(ArtistProfile.id.in_(followed_artists_ids)), |
| 114 | + method="scalars", |
| 115 | + ) if followed_artists_ids else list(), |
| 116 | + coauthored_playlists=await self.run( |
| 117 | + statement=select(Playlist).group_by(Playlist.id).filter(Playlist.id.in_(coauthored_playlists_ids)), |
| 118 | + method="scalars", |
| 119 | + ) if coauthored_playlists_ids else list(), |
| 120 | + saved_playlists=await self.run( |
| 121 | + statement=select(Playlist).group_by(Playlist.id).filter(Playlist.id.in_(saved_playlists_ids)), |
| 122 | + method="scalars", |
| 123 | + ) if saved_playlists_ids else list(), |
| 124 | + followed_producers=await self.run( |
| 125 | + statement=select(ProducerProfile).group_by(ProducerProfile.id).filter(ProducerProfile.id.in_(followed_producers_ids)), |
| 126 | + method="scalars", |
| 127 | + ) if followed_producers_ids else list(), |
| 128 | + saved_albums=await self.run( |
| 129 | + statement=select(Album).group_by(Album.id).filter(Album.id.in_(saved_albums_ids)), |
| 130 | + method="scalars", |
| 131 | + ) if saved_albums_ids else list(), |
| 132 | + followed_tags=await self.run( |
| 133 | + statement=select(Tag).group_by(Tag.id).filter(Tag.name.in_(followed_tags)), |
| 134 | + method="scalars", |
| 135 | + ) if followed_tags else list(), |
| 136 | + ) |
| 137 | + await self.create(user) |
| 138 | + return user.id |
| 139 | + |
| 140 | + async def update_user( |
| 141 | + self, |
| 142 | + user_id: int, |
| 143 | + username: str | None = None, |
| 144 | + description: str | None = None, |
| 145 | + email: str | None = None, |
| 146 | + password: str | None = None, |
| 147 | + picture_url: str | None = None, |
| 148 | + created_at: date | None = None, |
| 149 | + updated_at: datetime | None = None, |
| 150 | + access_level: AccessLevel | None = None, |
| 151 | + premium_level: PremiumLevel | None = None, |
| 152 | + artist_id: int | None = None, |
| 153 | + producer_id: int | None = None, |
| 154 | + telegram_id: int | None = None, |
| 155 | + is_verified: bool | None = None, |
| 156 | + licenses_ids: list[int] | None = None, |
| 157 | + followed_squads_ids: list[int] | None = None, |
| 158 | + followed_artists_ids: list[int] | None = None, |
| 159 | + coauthored_playlists_ids: list[int] | None = None, |
| 160 | + saved_playlists_ids: list[int] | None = None, |
| 161 | + followed_producers_ids: list[int] | None = None, |
| 162 | + saved_albums_ids: list[int] | None = None, |
| 163 | + followed_tags: list[str] | None = None, |
| 164 | + ) -> int: |
| 165 | + logger.info("update_user DAO request") |
| 166 | + user = User(**dict(filter( |
| 167 | + lambda item: bool(item[1]), |
| 168 | + { |
| 169 | + "id": user_id, |
| 170 | + "username": username if username else None, |
| 171 | + "description": description if description else None, |
| 172 | + "email": email if email else None, |
| 173 | + "password": password if password else None, |
| 174 | + "picture_url": picture_url if picture_url else None, |
| 175 | + "created_at": created_at if created_at else None, |
| 176 | + "updated_at": updated_at if updated_at else None, |
| 177 | + "access_level": str(access_level) if access_level else None, |
| 178 | + "premium_level": str(premium_level) if premium_level else None, |
| 179 | + "artist_id": artist_id if artist_id else None, |
| 180 | + "producer_id": producer_id if producer_id else None, |
| 181 | + "telegram_id": telegram_id if telegram_id else None, |
| 182 | + "is_verified": is_verified if is_verified else None, |
| 183 | + "artist_profile": await self.run( |
| 184 | + statement=select(ArtistProfile).group_by(ArtistProfile.id).filter(ArtistProfile.id == artist_id), |
| 185 | + method="scalar", |
| 186 | + ) if artist_id else None, |
| 187 | + "profile_profile": await self.run( |
| 188 | + statement=select(ProducerProfile).group_by(ProducerProfile.id).filter(ProducerProfile.id == producer_id), |
| 189 | + method="scalar", |
| 190 | + ), |
| 191 | + "licenses": await self.run( |
| 192 | + statement=select(License).group_by(License.id).filter(License.id.in_(licenses_ids)), |
| 193 | + method="scalars", |
| 194 | + ) if licenses_ids else list(), |
| 195 | + "followed_squads": await self.run( |
| 196 | + statement=select(Squad).group_by(Squad.id).filter(Squad.id.in_(followed_squads_ids)), |
| 197 | + method="scalars", |
| 198 | + ) if followed_squads_ids else list(), |
| 199 | + "followed_artists": await self.run( |
| 200 | + statement=select(ArtistProfile).group_by(ArtistProfile.id).filter(ArtistProfile.id.in_(followed_artists_ids)), |
| 201 | + method="scalars", |
| 202 | + ) if followed_artists_ids else list(), |
| 203 | + "coauthored_playlists": await self.run( |
| 204 | + statement=select(Playlist).group_by(Playlist.id).filter(Playlist.id.in_(coauthored_playlists_ids)), |
| 205 | + method="scalars", |
| 206 | + ) if coauthored_playlists_ids else list(), |
| 207 | + "saved_playlists": await self.run( |
| 208 | + statement=select(Playlist).group_by(Playlist.id).filter(Playlist.id.in_(saved_playlists_ids)), |
| 209 | + method="scalars", |
| 210 | + ) if saved_playlists_ids else list(), |
| 211 | + "followed_producers": await self.run( |
| 212 | + statement=select(ProducerProfile).group_by(ProducerProfile.id).filter(ProducerProfile.id.in_(followed_producers_ids)), |
| 213 | + method="scalars", |
| 214 | + ) if followed_producers_ids else list(), |
| 215 | + "saved_albums": await self.run( |
| 216 | + statement=select(Album).group_by(Album.id).filter(Album.id.in_(saved_albums_ids)), |
| 217 | + method="scalars", |
| 218 | + ) if saved_albums_ids else list(), |
| 219 | + "followed_tags": await self.run( |
| 220 | + statement=select(Tag).group_by(Tag.id).filter(Tag.name.in_(followed_tags)), |
| 221 | + method="scalars", |
| 222 | + ) if followed_tags else list(), |
| 223 | + }.items() |
| 224 | + ))) |
| 225 | + await self.create(user) |
| 226 | + return user.id |
| 227 | + |
| 228 | + async def delete_user(self, user_id: int) -> None: |
| 229 | + logger.info("delete_user DAO request") |
| 230 | + await self.delete_(obj_id=user_id) |
| 231 | + |
| 232 | + async def __aenter__(self) -> Self: |
| 233 | + return self |
| 234 | + |
| 235 | + async def __aexit__( |
| 236 | + self, |
| 237 | + exc_type: type[Exception], |
| 238 | + exc_val: Exception, |
| 239 | + exc_tb: TracebackType, |
| 240 | + ) -> None: |
| 241 | + await self.commit() |
| 242 | + await self.close() |
| 243 | + |
| 244 | + |
| 245 | +def get_postgres_dao_implementation() -> PostgresDAOImplementation: |
| 246 | + """ |
| 247 | + :return: instance of PostgresDAOImplementation |
| 248 | + """ |
| 249 | + return PostgresDAOImplementation() |
0 commit comments