Skip to content

Commit 4cbe22e

Browse files
sattvikctamassolteszanku255
authored
feat: multithreaded bulk import (#162) (#167)
* feat: Add BulkImport APIs and cron * fix: PR changes * fix: PR changes * fix: Update version and changelog * fix: PR changes * fix: PR changes * fix: fixing transaction rolled back issues with multithreaded bulk import * fix: changelog * fix: reusing gson object * feat: bulk inserting the bulk migration data * fix: fixes and error handling changes * fix: fixing tests * chore: update build version and changelog * fix: adding exeption thrown declaration for bulk import and mysql --------- Co-authored-by: Tamas Soltesz <[email protected]> Co-authored-by: Ankit Tiwari <[email protected]>
1 parent aa11e40 commit 4cbe22e

27 files changed

+615
-2
lines changed

CHANGELOG.md

+10
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,16 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

88
## [Unreleased]
99

10+
## [6.4.0]
11+
12+
- Adds support for Bulk Import
13+
- Adds `BulkImportUser` class to represent a bulk import user
14+
- Adds `BulkImportStorage` interface
15+
- Adds `DuplicateUserIdException` class
16+
- Adds `createBulkImportProxyStorageInstance` method in `Storage` class
17+
- Adds `closeConnectionForBulkImportProxyStorage`, `commitTransactionForBulkImportProxyStorage`, and `rollbackTransactionForBulkImportProxyStorage` method in `SQLStorage` class
18+
- Adds `BulkImportTransactionRolledBackException` for signaling if the transaction was rolled back by the DBMS
19+
1020
## [6.3.0] - 2024-10-02
1121

1222
- Adds `OAuthStorage` interface for OAuth Provider support

build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ plugins {
22
id 'java-library'
33
}
44

5-
version = "6.3.0"
5+
version = "6.4.0"
66

77
repositories {
88
mavenCentral()

src/main/java/io/supertokens/pluginInterface/Storage.java

+6
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,16 @@
2626
import io.supertokens.pluginInterface.multitenancy.exceptions.TenantOrAppNotFoundException;
2727

2828
import java.util.List;
29+
import java.util.Map;
2930
import java.util.Set;
3031

3132
public interface Storage {
3233

3334
// if silent is true, do not log anything out on the console
3435
void constructor(String processId, boolean silent, boolean isTesting);
3536

37+
Storage createBulkImportProxyStorageInstance();
38+
3639
void loadConfig(JsonObject jsonConfig, Set<LOG_LEVEL> logLevels, TenantIdentifier tenantIdentifier)
3740
throws InvalidConfigException;
3841

@@ -78,6 +81,9 @@ void setKeyValue(TenantIdentifier tenantIdentifier, String key, KeyValueInfo inf
7881
boolean isUserIdBeingUsedInNonAuthRecipe(AppIdentifier appIdentifier, String className, String userId)
7982
throws StorageQueryException;
8083

84+
Map<String, List<String>> findNonAuthRecipesWhereForUserIdsUsed(AppIdentifier appIdentifier, List<String> userIds)
85+
throws StorageQueryException;
86+
8187
// to be used for testing purposes only. This function will add dummy data to non-auth tables.
8288
void addInfoToNonAuthRecipesBasedOnUserId(TenantIdentifier tenantIdentifier, String className, String userId)
8389
throws StorageQueryException;

src/main/java/io/supertokens/pluginInterface/StorageUtils.java

+10
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.supertokens.pluginInterface;
1818

1919
import io.supertokens.pluginInterface.authRecipe.sqlStorage.AuthRecipeSQLStorage;
20+
import io.supertokens.pluginInterface.bulkimport.sqlStorage.BulkImportSQLStorage;
2021
import io.supertokens.pluginInterface.dashboard.sqlStorage.DashboardSQLStorage;
2122
import io.supertokens.pluginInterface.emailpassword.sqlStorage.EmailPasswordSQLStorage;
2223
import io.supertokens.pluginInterface.emailverification.sqlStorage.EmailVerificationSQLStorage;
@@ -134,6 +135,15 @@ public static MultitenancyStorage getMultitenancyStorage(Storage storage) {
134135
return (MultitenancyStorage) storage;
135136
}
136137

138+
public static BulkImportSQLStorage getBulkImportStorage(Storage storage) {
139+
if (storage.getType() != STORAGE_TYPE.SQL) {
140+
// we only support SQL for now
141+
throw new UnsupportedOperationException("");
142+
}
143+
144+
return (BulkImportSQLStorage) storage;
145+
}
146+
137147
public static OAuthStorage getOAuthStorage(Storage storage) {
138148
if (storage.getType() != STORAGE_TYPE.SQL) {
139149
// we only support SQL for now

src/main/java/io/supertokens/pluginInterface/authRecipe/AuthRecipeStorage.java

+3
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
import javax.annotation.Nonnull;
2828
import javax.annotation.Nullable;
29+
import java.util.List;
2930

3031
public interface AuthRecipeStorage extends Storage {
3132

@@ -45,6 +46,8 @@ AuthRecipeUserInfo[] getUsers(TenantIdentifier tenantIdentifier, @Nonnull Intege
4546

4647
boolean doesUserIdExist(TenantIdentifier tenantIdentifierIdentifier, String userId) throws StorageQueryException;
4748

49+
List<String> findExistingUserIds(AppIdentifier appIdentifier, List<String> userIds) throws StorageQueryException;
50+
4851
AuthRecipeUserInfo getPrimaryUserById(AppIdentifier appIdentifier, String userId) throws StorageQueryException;
4952

5053
String getPrimaryUserIdStrForUserId(AppIdentifier appIdentifier, String userId) throws StorageQueryException;

src/main/java/io/supertokens/pluginInterface/authRecipe/sqlStorage/AuthRecipeSQLStorage.java

+20
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,19 @@
2323
import io.supertokens.pluginInterface.sqlStorage.SQLStorage;
2424
import io.supertokens.pluginInterface.sqlStorage.TransactionConnection;
2525

26+
import java.util.List;
27+
import java.util.Map;
28+
2629
public interface AuthRecipeSQLStorage extends AuthRecipeStorage, SQLStorage {
2730

2831
AuthRecipeUserInfo getPrimaryUserById_Transaction(AppIdentifier appIdentifier, TransactionConnection con,
2932
String userId)
3033
throws StorageQueryException;
3134

35+
List<AuthRecipeUserInfo> getPrimaryUsersByIds_Transaction(AppIdentifier appIdentifier, TransactionConnection con,
36+
List<String> userIds)
37+
throws StorageQueryException;
38+
3239
// lock order:
3340
// - emailpassword table
3441
// - thirdparty table
@@ -38,6 +45,13 @@ AuthRecipeUserInfo[] listPrimaryUsersByEmail_Transaction(AppIdentifier appIdenti
3845
String email)
3946
throws StorageQueryException;
4047

48+
//helper method for bulk import
49+
AuthRecipeUserInfo[] listPrimaryUsersByMultipleEmailsOrPhoneNumbersOrThirdparty_Transaction(AppIdentifier appIdentifier,
50+
TransactionConnection con,
51+
List<String> emails, List<String> phones,
52+
Map<String, String> thirdpartyIdToThirdpartyUserId)
53+
throws StorageQueryException;
54+
4155
// locks only passwordless table
4256
AuthRecipeUserInfo[] listPrimaryUsersByPhoneNumber_Transaction(AppIdentifier appIdentifier,
4357
TransactionConnection con, String phoneNumber)
@@ -52,9 +66,15 @@ AuthRecipeUserInfo[] listPrimaryUsersByThirdPartyInfo_Transaction(AppIdentifier
5266
void makePrimaryUser_Transaction(AppIdentifier appIdentifier, TransactionConnection con, String userId)
5367
throws StorageQueryException;
5468

69+
void makePrimaryUsers_Transaction(AppIdentifier appIdentifier, TransactionConnection con, List<String> userIds)
70+
throws StorageQueryException;
71+
5572
void linkAccounts_Transaction(AppIdentifier appIdentifier, TransactionConnection con, String recipeUserId,
5673
String primaryUserId) throws StorageQueryException;
5774

75+
void linkMultipleAccounts_Transaction(AppIdentifier appIdentifier, TransactionConnection con,
76+
Map<String, String> recipeUserIdByPrimaryUserId) throws StorageQueryException;
77+
5878
void unlinkAccounts_Transaction(AppIdentifier appIdentifier, TransactionConnection con, String primaryUserId,
5979
String recipeUserId)
6080
throws StorageQueryException;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright (c) 2024, VRAI Labs and/or its affiliates. All rights reserved.
3+
*
4+
* This software is licensed under the Apache License, Version 2.0 (the
5+
* "License") as published by the Apache Software Foundation.
6+
*
7+
* You may not use this file except in compliance with the License. You may
8+
* obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
17+
package io.supertokens.pluginInterface.bulkimport;
18+
19+
import java.util.List;
20+
21+
import javax.annotation.Nonnull;
22+
import javax.annotation.Nullable;
23+
24+
import io.supertokens.pluginInterface.exceptions.StorageQueryException;
25+
import io.supertokens.pluginInterface.multitenancy.AppIdentifier;
26+
import io.supertokens.pluginInterface.multitenancy.exceptions.TenantOrAppNotFoundException;
27+
import io.supertokens.pluginInterface.nonAuthRecipe.NonAuthRecipeStorage;
28+
29+
public interface BulkImportStorage extends NonAuthRecipeStorage {
30+
/**
31+
* Add users to the bulk_import_users table
32+
*/
33+
void addBulkImportUsers(AppIdentifier appIdentifier, List<BulkImportUser> users)
34+
throws StorageQueryException,
35+
TenantOrAppNotFoundException,
36+
io.supertokens.pluginInterface.bulkimport.exceptions.DuplicateUserIdException;
37+
38+
/**
39+
* Get users from the bulk_import_users table
40+
*/
41+
List<BulkImportUser> getBulkImportUsers(AppIdentifier appIdentifier, @Nonnull Integer limit, @Nullable BULK_IMPORT_USER_STATUS status,
42+
@Nullable String bulkImportUserId, @Nullable Long createdAt) throws StorageQueryException;
43+
44+
/**
45+
* Delete users by id from the bulk_import_users table
46+
*/
47+
List<String> deleteBulkImportUsers(AppIdentifier appIdentifier, @Nonnull String[] bulkImportUserIds) throws StorageQueryException;
48+
49+
/**
50+
* Returns the users from the bulk_import_users table for processing
51+
*/
52+
List<BulkImportUser> getBulkImportUsersAndChangeStatusToProcessing(AppIdentifier appIdentifier, @Nonnull Integer limit) throws StorageQueryException;
53+
54+
55+
/**
56+
* Update the bulk_import_user's primary_user_id by bulk_import_user_id
57+
*/
58+
void updateBulkImportUserPrimaryUserId(AppIdentifier appIdentifier, @Nonnull String bulkImportUserId, @Nonnull String primaryUserId) throws StorageQueryException;
59+
60+
/**
61+
* Returns the count of users from the bulk_import_users table
62+
*/
63+
long getBulkImportUsersCount(AppIdentifier appIdentifier, @Nullable BULK_IMPORT_USER_STATUS status) throws StorageQueryException;
64+
65+
public enum BULK_IMPORT_USER_STATUS {
66+
NEW, PROCESSING, FAILED
67+
}
68+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/*
2+
* Copyright (c) 2024, VRAI Labs and/or its affiliates. All rights reserved.
3+
*
4+
* This software is licensed under the Apache License, Version 2.0 (the
5+
* "License") as published by the Apache Software Foundation.
6+
*
7+
* You may not use this file except in compliance with the License. You may
8+
* obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
17+
package io.supertokens.pluginInterface.bulkimport;
18+
19+
import java.util.List;
20+
21+
import com.google.gson.Gson;
22+
import com.google.gson.JsonObject;
23+
24+
import io.supertokens.pluginInterface.bulkimport.BulkImportStorage.BULK_IMPORT_USER_STATUS;
25+
26+
public class BulkImportUser {
27+
public String id;
28+
public String externalUserId;
29+
public JsonObject userMetadata;
30+
public List<UserRole> userRoles;
31+
public List<TotpDevice> totpDevices;
32+
public List<LoginMethod> loginMethods;
33+
34+
// Following fields come from the DB Record.
35+
public BULK_IMPORT_USER_STATUS status;
36+
public String primaryUserId;
37+
public String errorMessage;
38+
public Long createdAt;
39+
public Long updatedAt;
40+
41+
private static final Gson gson = new Gson();
42+
43+
public BulkImportUser(String id, String externalUserId, JsonObject userMetadata, List<UserRole> userRoles,
44+
List<TotpDevice> totpDevices, List<LoginMethod> loginMethods) {
45+
this.id = id;
46+
this.externalUserId = externalUserId;
47+
this.userMetadata = userMetadata;
48+
this.userRoles = userRoles;
49+
this.totpDevices = totpDevices;
50+
this.loginMethods = loginMethods;
51+
}
52+
53+
public static BulkImportUser forTesting_fromJson(JsonObject jsonObject) {
54+
return gson.fromJson(jsonObject, BulkImportUser.class);
55+
}
56+
57+
// The bulk_import_users table stores users to be imported via a Cron Job.
58+
// It has a `raw_data` column containing user data in JSON format.
59+
60+
// The BulkImportUser class represents this `raw_data`, including additional fields like `status`, `createdAt`, and `updatedAt`.
61+
// First, we validate all fields of `raw_data` using the BulkImportUser class, then store this data in the bulk_import_users table.
62+
// This function retrieves the `raw_data` after removing the additional fields.
63+
public String toRawDataForDbStorage() {
64+
JsonObject jsonObject = gson.fromJson(new Gson().toJson(this), JsonObject.class);
65+
jsonObject.remove("status");
66+
jsonObject.remove("createdAt");
67+
jsonObject.remove("updatedAt");
68+
return jsonObject.toString();
69+
}
70+
71+
// The bulk_import_users table contains a `raw_data` column with user data in JSON format, along with other columns such as `id`, `status`, `primary_user_id`, and `error_msg` etc.
72+
73+
// When creating an instance of the BulkImportUser class, the extra fields must be passed separately as they are not part of the `raw_data`.
74+
// This function creates a BulkImportUser instance from a stored bulk_import_user entry.
75+
public static BulkImportUser fromRawDataFromDbStorage(String id, String rawData, BULK_IMPORT_USER_STATUS status, String primaryUserId, String errorMessage, long createdAt, long updatedAt) {
76+
BulkImportUser user = gson.fromJson(rawData, BulkImportUser.class);
77+
user.id = id;
78+
user.status = status;
79+
user.primaryUserId = primaryUserId;
80+
user.errorMessage = errorMessage;
81+
user.createdAt = createdAt;
82+
user.updatedAt = updatedAt;
83+
return user;
84+
}
85+
86+
public JsonObject toJsonObject() {
87+
return gson.fromJson(gson.toJson(this), JsonObject.class);
88+
}
89+
90+
public static class UserRole {
91+
public String role;
92+
public List<String> tenantIds;
93+
94+
public UserRole(String role, List<String> tenantIds) {
95+
this.role = role;
96+
this.tenantIds = tenantIds;
97+
}
98+
}
99+
100+
public static class TotpDevice {
101+
public String secretKey;
102+
public int period;
103+
public int skew;
104+
public String deviceName;
105+
106+
public TotpDevice(String secretKey, int period, int skew, String deviceName) {
107+
this.secretKey = secretKey;
108+
this.period = period;
109+
this.skew = skew;
110+
this.deviceName = deviceName;
111+
}
112+
}
113+
114+
public static class LoginMethod {
115+
public List<String> tenantIds;
116+
public boolean isVerified;
117+
public boolean isPrimary;
118+
public long timeJoinedInMSSinceEpoch;
119+
public String recipeId;
120+
public String email;
121+
public String passwordHash;
122+
public String hashingAlgorithm;
123+
public String plainTextPassword;
124+
public String thirdPartyId;
125+
public String thirdPartyUserId;
126+
public String phoneNumber;
127+
public String superTokensUserId;
128+
public String externalUserId;
129+
130+
public String getSuperTokenOrExternalUserId() {
131+
return this.externalUserId != null ? this.externalUserId : this.superTokensUserId;
132+
}
133+
134+
public LoginMethod(List<String> tenantIds, String recipeId, boolean isVerified, boolean isPrimary,
135+
long timeJoinedInMSSinceEpoch, String email, String passwordHash, String hashingAlgorithm, String plainTextPassword,
136+
String thirdPartyId, String thirdPartyUserId, String phoneNumber) {
137+
this.tenantIds = tenantIds;
138+
this.recipeId = recipeId;
139+
this.isVerified = isVerified;
140+
this.isPrimary = isPrimary;
141+
this.timeJoinedInMSSinceEpoch = timeJoinedInMSSinceEpoch;
142+
this.email = email;
143+
this.passwordHash = passwordHash;
144+
this.hashingAlgorithm = hashingAlgorithm;
145+
this.plainTextPassword = plainTextPassword;
146+
this.thirdPartyId = thirdPartyId;
147+
this.thirdPartyUserId = thirdPartyUserId;
148+
this.phoneNumber = phoneNumber;
149+
}
150+
}
151+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright (c) 2024, VRAI Labs and/or its affiliates. All rights reserved.
3+
*
4+
* This software is licensed under the Apache License, Version 2.0 (the
5+
* "License") as published by the Apache Software Foundation.
6+
*
7+
* You may not use this file except in compliance with the License. You may
8+
* obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
17+
package io.supertokens.pluginInterface.bulkimport;
18+
19+
import io.supertokens.pluginInterface.multitenancy.TenantIdentifier;
20+
21+
public class ImportUserBase {
22+
23+
public String userId;
24+
public String email;
25+
public TenantIdentifier tenantIdentifier;
26+
public long timeJoinedMSSinceEpoch;
27+
28+
public ImportUserBase(String userId, String email, TenantIdentifier tenantIdentifier, long timeJoinedMSSinceEpoch) {
29+
this.userId = userId; //this will be the supertokens userId.
30+
this.email = email;
31+
this.tenantIdentifier = tenantIdentifier;
32+
this.timeJoinedMSSinceEpoch = timeJoinedMSSinceEpoch;
33+
}
34+
}

0 commit comments

Comments
 (0)