Skip to content

Commit c444c6f

Browse files
committed
Expand the return value of /user/stats to include total values.
1 parent d1c3514 commit c444c6f

File tree

7 files changed

+469
-331
lines changed

7 files changed

+469
-331
lines changed

api/handlers.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ type (
9191
Items []database.UploadResponse `json:"items"`
9292
Offset int `json:"offset"`
9393
PageSize int `json:"pageSize"`
94-
Count int `json:"count"`
94+
Count int64 `json:"count"`
9595
}
9696
// UserGET defines a representation of the User struct returned by all
9797
// handlers. This allows us to tweak the fields of the struct before
@@ -1255,13 +1255,13 @@ func (api *API) userUploadsDELETE(u *database.User, w http.ResponseWriter, req *
12551255
// and sets the QuotaExceeded flag on their account if they exceed any.
12561256
func (api *API) checkUserQuotas(ctx context.Context, u *database.User) {
12571257
startOfTime := time.Time{}
1258-
numUploads, storageUsed, _, _, err := api.staticDB.UserUploadStats(ctx, u.ID, startOfTime)
1258+
upStats, err := api.staticDB.UserStatsUpload(ctx, u.ID, startOfTime)
12591259
if err != nil {
12601260
api.staticLogger.Debugln("Failed to get user's upload bandwidth used:", err)
12611261
return
12621262
}
12631263
quota := database.UserLimits[u.Tier]
1264-
quotaExceeded := numUploads > quota.MaxNumberUploads || storageUsed > quota.Storage
1264+
quotaExceeded := upStats.CountTotal > int64(quota.MaxNumberUploads) || upStats.SizeTotal > quota.Storage
12651265
if quotaExceeded != u.QuotaExceeded {
12661266
u.QuotaExceeded = quotaExceeded
12671267
err = api.staticDB.UserSave(ctx, u)

database/upload.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func (db *DB) UploadCreate(ctx context.Context, user User, ip string, skylink Sk
6464

6565
// UploadsBySkylink fetches a page of uploads of this skylink and the total
6666
// number of such uploads.
67-
func (db *DB) UploadsBySkylink(ctx context.Context, skylink Skylink, offset, pageSize int) ([]UploadResponse, int, error) {
67+
func (db *DB) UploadsBySkylink(ctx context.Context, skylink Skylink, offset, pageSize int) ([]UploadResponse, int64, error) {
6868
if skylink.ID.IsZero() {
6969
return nil, 0, ErrInvalidSkylink
7070
}
@@ -119,7 +119,7 @@ func (db *DB) UnpinUploads(ctx context.Context, skylink Skylink, user User) (int
119119

120120
// UploadsByUser fetches a page of uploads by this user and the total number of
121121
// such uploads.
122-
func (db *DB) UploadsByUser(ctx context.Context, user User, offset, pageSize int) ([]UploadResponse, int, error) {
122+
func (db *DB) UploadsByUser(ctx context.Context, user User, offset, pageSize int) ([]UploadResponse, int64, error) {
123123
if user.ID.IsZero() {
124124
return nil, 0, errors.New("invalid user")
125125
}
@@ -135,7 +135,7 @@ func (db *DB) UploadsByUser(ctx context.Context, user User, offset, pageSize int
135135

136136
// uploadsBy fetches a page of uploads, filtered by an arbitrary match criteria.
137137
// It also reports the total number of records in the list.
138-
func (db *DB) uploadsBy(ctx context.Context, matchStage bson.D, offset, pageSize int) ([]UploadResponse, int, error) {
138+
func (db *DB) uploadsBy(ctx context.Context, matchStage bson.D, offset, pageSize int) ([]UploadResponse, int64, error) {
139139
if err := validateOffsetPageSize(offset, pageSize); err != nil {
140140
return nil, 0, err
141141
}
@@ -156,7 +156,7 @@ func (db *DB) uploadsBy(ctx context.Context, matchStage bson.D, offset, pageSize
156156
for ix := range uploads {
157157
uploads[ix].RawStorage = skynet.RawStorageUsed(uploads[ix].Size)
158158
}
159-
return uploads, int(cnt), nil
159+
return uploads, cnt, nil
160160
}
161161

162162
// validateOffsetPageSize returns an error if offset and/or page size are invalid.

database/user.go

-268
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"crypto/subtle"
66
"fmt"
77
"net/mail"
8-
"sync"
98
"time"
109

1110
"github.com/SkynetLabs/skynet-accounts/hash"
@@ -127,20 +126,6 @@ type (
127126
QuotaExceeded bool `bson:"quota_exceeded" json:"quotaExceeded"`
128127
PubKeys []PubKey `bson:"pub_keys" json:"-"`
129128
}
130-
// UserStats contains statistical information about the user.
131-
UserStats struct {
132-
RawStorageUsed int64 `json:"rawStorageUsed"`
133-
NumRegReads int64 `json:"numRegReads"`
134-
NumRegWrites int64 `json:"numRegWrites"`
135-
NumUploads int `json:"numUploads"`
136-
NumDownloads int `json:"numDownloads"`
137-
TotalUploadsSize int64 `json:"totalUploadsSize"`
138-
TotalDownloadsSize int64 `json:"totalDownloadsSize"`
139-
BandwidthUploads int64 `json:"bwUploads"`
140-
BandwidthDownloads int64 `json:"bwDownloads"`
141-
BandwidthRegReads int64 `json:"bwRegReads"`
142-
BandwidthRegWrites int64 `json:"bwRegWrites"`
143-
}
144129
// TierLimits defines the speed limits imposed on the user based on their
145130
// tier.
146131
TierLimits struct {
@@ -577,11 +562,6 @@ func (db *DB) UserSetTier(ctx context.Context, u *User, t int) error {
577562
return nil
578563
}
579564

580-
// UserStats returns statistical information about the user.
581-
func (db *DB) UserStats(ctx context.Context, user User) (*UserStats, error) {
582-
return db.userStats(ctx, user)
583-
}
584-
585565
// Ping sends a ping command to verify that the client can connect to the DB and
586566
// specifically to the primary.
587567
func (db *DB) Ping(ctx context.Context) error {
@@ -635,254 +615,6 @@ func (db *DB) managedUserBySub(ctx context.Context, sub string) (*User, error) {
635615
return &u, nil
636616
}
637617

638-
// userStats reports statistical information about the user.
639-
func (db *DB) userStats(ctx context.Context, user User) (*UserStats, error) {
640-
stats := UserStats{}
641-
var errs []error
642-
var errsMux sync.Mutex
643-
regErr := func(msg string, e error) {
644-
db.staticLogger.Infoln(msg, e)
645-
errsMux.Lock()
646-
errs = append(errs, e)
647-
errsMux.Unlock()
648-
}
649-
startOfMonth := monthStart(user.SubscribedUntil)
650-
651-
var wg sync.WaitGroup
652-
wg.Add(1)
653-
go func() {
654-
defer wg.Done()
655-
n, size, rawStorage, bw, err := db.UserUploadStats(ctx, user.ID, startOfMonth)
656-
if err != nil {
657-
regErr("Failed to get user's upload bandwidth used:", err)
658-
return
659-
}
660-
stats.NumUploads = n
661-
stats.TotalUploadsSize = size
662-
stats.RawStorageUsed = rawStorage
663-
stats.BandwidthUploads = bw
664-
db.staticLogger.Tracef("User %s upload bandwidth: %v", user.ID.Hex(), bw)
665-
}()
666-
wg.Add(1)
667-
go func() {
668-
defer wg.Done()
669-
n, size, bw, err := db.userDownloadStats(ctx, user.ID, startOfMonth)
670-
if err != nil {
671-
regErr("Failed to get user's download bandwidth used:", err)
672-
return
673-
}
674-
stats.NumDownloads = n
675-
stats.TotalDownloadsSize = size
676-
stats.BandwidthDownloads = bw
677-
db.staticLogger.Tracef("User %s download bandwidth: %v", user.ID.Hex(), bw)
678-
}()
679-
wg.Add(1)
680-
go func() {
681-
defer wg.Done()
682-
n, bw, err := db.userRegistryWriteStats(ctx, user.ID, startOfMonth)
683-
if err != nil {
684-
regErr("Failed to get user's registry write bandwidth used:", err)
685-
return
686-
}
687-
stats.NumRegWrites = n
688-
stats.BandwidthRegWrites = bw
689-
db.staticLogger.Tracef("User %s registry write bandwidth: %v", user.ID.Hex(), bw)
690-
}()
691-
wg.Add(1)
692-
go func() {
693-
defer wg.Done()
694-
n, bw, err := db.userRegistryReadStats(ctx, user.ID, startOfMonth)
695-
if err != nil {
696-
regErr("Failed to get user's registry read bandwidth used:", err)
697-
return
698-
}
699-
stats.NumRegReads = n
700-
stats.BandwidthRegReads = bw
701-
db.staticLogger.Tracef("User %s registry read bandwidth: %v", user.ID.Hex(), bw)
702-
}()
703-
704-
wg.Wait()
705-
if len(errs) > 0 {
706-
return nil, errors.Compose(errs...)
707-
}
708-
return &stats, nil
709-
}
710-
711-
// UserUploadStats reports on the user's uploads - count, total size and total
712-
// bandwidth used. It uses the total size of the uploaded skyfiles as basis.
713-
func (db *DB) UserUploadStats(ctx context.Context, id primitive.ObjectID, since time.Time) (count int, totalSize int64, rawStorageUsed int64, totalBandwidth int64, err error) {
714-
matchStage := bson.D{{"$match", bson.M{"user_id": id}}}
715-
lookupStage := bson.D{
716-
{"$lookup", bson.D{
717-
{"from", "skylinks"},
718-
{"localField", "skylink_id"},
719-
{"foreignField", "_id"},
720-
{"as", "skylink_data"},
721-
}},
722-
}
723-
replaceStage := bson.D{
724-
{"$replaceRoot", bson.D{
725-
{"newRoot", bson.D{
726-
{"$mergeObjects", bson.A{
727-
bson.D{{"$arrayElemAt", bson.A{"$skylink_data", 0}}}, "$$ROOT"},
728-
},
729-
}},
730-
}},
731-
}
732-
// These are the fields we don't need.
733-
projectStage := bson.D{{"$project", bson.D{
734-
{"_id", 0},
735-
{"user_id", 0},
736-
{"skylink_data", 0},
737-
{"name", 0},
738-
{"skylink_id", 0},
739-
}}}
740-
741-
pipeline := mongo.Pipeline{matchStage, lookupStage, replaceStage, projectStage}
742-
c, err := db.staticUploads.Aggregate(ctx, pipeline)
743-
if err != nil {
744-
return
745-
}
746-
defer func() {
747-
if errDef := c.Close(ctx); errDef != nil {
748-
db.staticLogger.Traceln("Error on closing DB cursor.", errDef)
749-
}
750-
}()
751-
752-
// We need this struct, so we can safely decode both int32 and int64.
753-
result := struct {
754-
Size int64 `bson:"size"`
755-
Skylink string `bson:"skylink"`
756-
Unpinned bool `bson:"unpinned"`
757-
Timestamp time.Time `bson:"timestamp"`
758-
}{}
759-
processedSkylinks := make(map[string]bool)
760-
for c.Next(ctx) {
761-
if err = c.Decode(&result); err != nil {
762-
err = errors.AddContext(err, "failed to decode DB data")
763-
return
764-
}
765-
// We first weed out any old uploads that we fetch only in order to
766-
// calculate the total used storage.
767-
if result.Timestamp.Before(since) {
768-
if result.Unpinned || processedSkylinks[result.Skylink] {
769-
continue
770-
}
771-
processedSkylinks[result.Skylink] = true
772-
totalSize += result.Size
773-
continue
774-
}
775-
// All bandwidth is counted, regardless of unpinned status.
776-
totalBandwidth += skynet.BandwidthUploadCost(result.Size)
777-
// Count only uploads that are still pinned towards total count.
778-
if result.Unpinned {
779-
continue
780-
}
781-
count++
782-
// Count only unique uploads towards total size and used storage.
783-
if processedSkylinks[result.Skylink] {
784-
continue
785-
}
786-
processedSkylinks[result.Skylink] = true
787-
totalSize += result.Size
788-
rawStorageUsed += skynet.RawStorageUsed(result.Size)
789-
}
790-
return count, totalSize, rawStorageUsed, totalBandwidth, nil
791-
}
792-
793-
// userDownloadStats reports on the user's downloads - count, total size and
794-
// total bandwidth used. It uses the actual bandwidth used, as reported by nginx.
795-
func (db *DB) userDownloadStats(ctx context.Context, id primitive.ObjectID, monthStart time.Time) (count int, totalSize int64, totalBandwidth int64, err error) {
796-
matchStage := bson.D{{"$match", bson.D{
797-
{"user_id", id},
798-
{"created_at", bson.D{{"$gt", monthStart}}},
799-
}}}
800-
lookupStage := bson.D{
801-
{"$lookup", bson.D{
802-
{"from", "skylinks"},
803-
{"localField", "skylink_id"}, // field in the downloads collection
804-
{"foreignField", "_id"}, // field in the skylinks collection
805-
{"as", "fromSkylinks"},
806-
}},
807-
}
808-
replaceStage := bson.D{
809-
{"$replaceRoot", bson.D{
810-
{"newRoot", bson.D{
811-
{"$mergeObjects", bson.A{
812-
bson.D{{"$arrayElemAt", bson.A{"$fromSkylinks", 0}}}, "$$ROOT"},
813-
},
814-
}},
815-
}},
816-
}
817-
// This stage checks if the download has a non-zero `bytes` field and if so,
818-
// it takes it as the download's size. Otherwise, it reports the full
819-
// skylink's size as download's size.
820-
projectStage := bson.D{{"$project", bson.D{
821-
{"size", bson.D{
822-
{"$cond", bson.A{
823-
bson.D{{"$gt", bson.A{"$bytes", 0}}}, // if
824-
"$bytes", // then
825-
"$size", // else
826-
}},
827-
}},
828-
}}}
829-
830-
pipeline := mongo.Pipeline{matchStage, lookupStage, replaceStage, projectStage}
831-
c, err := db.staticDownloads.Aggregate(ctx, pipeline)
832-
if err != nil {
833-
err = errors.AddContext(err, "DB query failed")
834-
return
835-
}
836-
defer func() {
837-
if errDef := c.Close(ctx); errDef != nil {
838-
db.staticLogger.Traceln("Error on closing DB cursor.", errDef)
839-
}
840-
}()
841-
842-
// We need this struct, so we can safely decode both int32 and int64.
843-
result := struct {
844-
Size int64 `bson:"size"`
845-
}{}
846-
for c.Next(ctx) {
847-
if err = c.Decode(&result); err != nil {
848-
err = errors.AddContext(err, "failed to decode DB data")
849-
return
850-
}
851-
count++
852-
totalSize += result.Size
853-
totalBandwidth += skynet.BandwidthDownloadCost(result.Size)
854-
}
855-
return count, totalSize, totalBandwidth, nil
856-
}
857-
858-
// userRegistryWriteStats reports the number of registry writes by the user and
859-
// the bandwidth used.
860-
func (db *DB) userRegistryWriteStats(ctx context.Context, userID primitive.ObjectID, monthStart time.Time) (int64, int64, error) {
861-
matchStage := bson.D{{"$match", bson.D{
862-
{"user_id", userID},
863-
{"timestamp", bson.D{{"$gt", monthStart}}},
864-
}}}
865-
writes, err := db.count(ctx, db.staticRegistryWrites, matchStage)
866-
if err != nil {
867-
return 0, 0, errors.AddContext(err, "failed to fetch registry write bandwidth")
868-
}
869-
return writes, writes * skynet.CostBandwidthRegistryWrite, nil
870-
}
871-
872-
// userRegistryReadsStats reports the number of registry reads by the user and
873-
// the bandwidth used.
874-
func (db *DB) userRegistryReadStats(ctx context.Context, userID primitive.ObjectID, monthStart time.Time) (int64, int64, error) {
875-
matchStage := bson.D{{"$match", bson.D{
876-
{"user_id", userID},
877-
{"timestamp", bson.D{{"$gt", monthStart}}},
878-
}}}
879-
reads, err := db.count(ctx, db.staticRegistryReads, matchStage)
880-
if err != nil {
881-
return 0, 0, errors.AddContext(err, "failed to fetch registry read bandwidth")
882-
}
883-
return reads, reads * skynet.CostBandwidthRegistryRead, nil
884-
}
885-
886618
// HasKey checks if the given pubkey is among the pubkeys registered for the
887619
// user.
888620
func (u User) HasKey(pk PubKey) bool {

0 commit comments

Comments
 (0)