Skip to content

fix: bulk migration stuck in processing #263

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

## [9.0.2]
## [9.0.3]

- Fixes BatchUpdateException checks and error handling to prevent bulk import users stuck in `PROCESSING` state

## [9.0.2]

- Fixes `AuthRecipe#getUserByAccountInfo` to consider the tenantId instead of the appId when fetching the webauthn user
- Changes dependency structure to avoid multiple dependency declarations for the same library

Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ plugins {
id 'java-library'
}

version = "9.0.2"
version = "9.0.3"

repositories {
mavenCentral()
Expand Down
131 changes: 76 additions & 55 deletions src/main/java/io/supertokens/storage/postgresql/Start.java
Original file line number Diff line number Diff line change
Expand Up @@ -1046,10 +1046,9 @@ public void signUpMultipleViaBulkImport_Transaction(TransactionConnection connec
try {
Connection sqlConnection = (Connection) connection.getConnection();
EmailPasswordQueries.signUpMultipleForBulkImport_Transaction(this, sqlConnection, users);
} catch (StorageQueryException | SQLException | StorageTransactionLogicException e) {
} catch (StorageQueryException | StorageTransactionLogicException e) {
Throwable actual = e.getCause();
if (actual instanceof BatchUpdateException) {
BatchUpdateException batchUpdateException = (BatchUpdateException) actual;
if (actual instanceof BatchUpdateException batchUpdateException) {
Map<String, Exception> errorByPosition = new HashMap<>();
SQLException nextException = batchUpdateException.getNextException();
while (nextException != null) {
Expand Down Expand Up @@ -1271,6 +1270,15 @@ public void updateIsEmailVerified_Transaction(AppIdentifier appIdentifier, Trans
}
}

/**
* Update the isEmailVerified column for multiple users in the email verification table. This method is used in the
* Bulk Migration process.
* Important note: this method expects a Map of email to userId, but if there is an error in the batch processing,
* it will throw a BulkImportBatchInsertException with a map of userid to Exception, based on the position of the
* erroneous item in the batch.
* This means, that the underlying map implementation must be one that preserves iteration order (LinkedHashMap
* is a good choice) and this is the responsibility of the caller to ensure that the passed map is such.
*/
@Override
public void updateMultipleIsEmailVerified_Transaction(AppIdentifier appIdentifier, TransactionConnection con,
Map<String, String> emailToUserId, boolean isEmailVerified)
Expand All @@ -1280,23 +1288,35 @@ public void updateMultipleIsEmailVerified_Transaction(AppIdentifier appIdentifie
EmailVerificationQueries.updateMultipleUsersIsEmailVerified_Transaction(this, sqlCon, appIdentifier,
emailToUserId, isEmailVerified);
} catch (SQLException e) {
if (e instanceof PSQLException) {
PostgreSQLConfig config = Config.getConfig(this);
ServerErrorMessage serverMessage = ((PSQLException) e).getServerErrorMessage();
if (e instanceof BatchUpdateException batchUpdateException) {
SQLException nextException = batchUpdateException.getNextException();
Map<String, Exception> errorByPosition = new HashMap<>();
while (nextException != null) {

if (isForeignKeyConstraintError(serverMessage, config.getEmailVerificationTable(), "app_id")) {
throw new TenantOrAppNotFoundException(appIdentifier);
}
}
if (nextException instanceof PSQLException) {
PostgreSQLConfig config = Config.getConfig(this);
ServerErrorMessage serverMessage = ((PSQLException) nextException).getServerErrorMessage();

boolean isPSQLPrimKeyError = e instanceof PSQLException && isPrimaryKeyError(
((PSQLException) e).getServerErrorMessage(),
Config.getConfig(this).getEmailVerificationTable());
int position = getErroneousEntryPosition(batchUpdateException);
String userid = ((Map.Entry<String, String>) emailToUserId.entrySet().toArray()[position]).getKey();
if (isNullConstraintError(serverMessage, config.getEmailVerificationTable(), "email")) {
errorByPosition.put(userid, new NullPointerException("email is null"));
} else if (isPrimaryKeyError(serverMessage, config.getEmailVerificationTable())) {
errorByPosition.put(userid,
new DuplicateEmailException());
}
if (isForeignKeyConstraintError(serverMessage, config.getEmailVerificationTable(),
"app_id")) {
throw new TenantOrAppNotFoundException(appIdentifier);
}
}

if (!isEmailVerified || !isPSQLPrimKeyError) {
nextException = nextException.getNextException();
}
throw new StorageQueryException(
new BulkImportBatchInsertException("emailverification errors", errorByPosition));
}
throw new StorageQueryException(e);
}
// we do not throw an error since the email is already verified
}
}

Expand Down Expand Up @@ -1499,9 +1519,7 @@ public void importThirdPartyUsers_Transaction(TransactionConnection con,
Connection sqlCon = (Connection) con.getConnection();
ThirdPartyQueries.importUser_Transaction(this, sqlCon, usersToImport);
} catch (SQLException e) {
Throwable actual = e.getCause();
if (actual instanceof BatchUpdateException) {
BatchUpdateException batchUpdateException = (BatchUpdateException) actual;
if (e instanceof BatchUpdateException batchUpdateException) {
Map<String, Exception> errorByPosition = new HashMap<>();
SQLException nextException = batchUpdateException.getNextException();
while (nextException != null) {
Expand Down Expand Up @@ -1769,6 +1787,13 @@ private boolean isForeignKeyConstraintError(ServerErrorMessage serverMessage, St
&& serverMessage.getConstraint().equals(tableName + "_" + columnName + "_fkey");
}

private boolean isNullConstraintError(ServerErrorMessage serverMessage, String tableName, String columnName) {
String[] tableNameParts = tableName.split("\\.");
tableName = tableNameParts[tableNameParts.length - 1];
return serverMessage.getSQLState().equals("23502")
&& serverMessage.getMessage().contains("null value in column \"" + columnName + "\" of relation \"" + tableName + "\" violates not-null constraint");
}

private boolean isPrimaryKeyError(ServerErrorMessage serverMessage, String tableName) {
String[] tableNameParts = tableName.split("\\.");
tableName = tableNameParts[tableNameParts.length - 1];
Expand Down Expand Up @@ -2098,50 +2123,46 @@ public void importPasswordlessUsers_Transaction(TransactionConnection con,
Connection sqlCon = (Connection) con.getConnection();
PasswordlessQueries.importUsers_Transaction(sqlCon, this, users);
} catch (SQLException e) {
if (e instanceof BatchUpdateException) {
Throwable actual = e.getCause();
if (actual instanceof BatchUpdateException) {
BatchUpdateException batchUpdateException = (BatchUpdateException) actual;
Map<String, Exception> errorByPosition = new HashMap<>();
SQLException nextException = batchUpdateException.getNextException();
while (nextException != null) {
if (e instanceof BatchUpdateException batchUpdateException) {
Map<String, Exception> errorByPosition = new HashMap<>();
SQLException nextException = batchUpdateException.getNextException();
while (nextException != null) {

if (nextException instanceof PSQLException) {
PostgreSQLConfig config = Config.getConfig(this);
ServerErrorMessage serverMessage = ((PSQLException) nextException).getServerErrorMessage();
if (nextException instanceof PSQLException) {
PostgreSQLConfig config = Config.getConfig(this);
ServerErrorMessage serverMessage = ((PSQLException) nextException).getServerErrorMessage();

int position = getErroneousEntryPosition(batchUpdateException);
int position = getErroneousEntryPosition(batchUpdateException);

if (isPrimaryKeyError(serverMessage, config.getPasswordlessUsersTable())
|| isPrimaryKeyError(serverMessage, config.getUsersTable())
|| isPrimaryKeyError(serverMessage, config.getPasswordlessUserToTenantTable())
|| isPrimaryKeyError(serverMessage, config.getAppIdToUserIdTable())) {
errorByPosition.put(users.get(position).userId, new DuplicateUserIdException());
}
if (isUniqueConstraintError(serverMessage, config.getPasswordlessUserToTenantTable(),
"email")) {
errorByPosition.put(users.get(position).userId, new DuplicateEmailException());
if (isPrimaryKeyError(serverMessage, config.getPasswordlessUsersTable())
|| isPrimaryKeyError(serverMessage, config.getUsersTable())
|| isPrimaryKeyError(serverMessage, config.getPasswordlessUserToTenantTable())
|| isPrimaryKeyError(serverMessage, config.getAppIdToUserIdTable())) {
errorByPosition.put(users.get(position).userId, new DuplicateUserIdException());
}
if (isUniqueConstraintError(serverMessage, config.getPasswordlessUserToTenantTable(),
"email")) {
errorByPosition.put(users.get(position).userId, new DuplicateEmailException());

} else if (isUniqueConstraintError(serverMessage, config.getPasswordlessUserToTenantTable(),
"phone_number")) {
errorByPosition.put(users.get(position).userId, new DuplicatePhoneNumberException());
} else if (isUniqueConstraintError(serverMessage, config.getPasswordlessUserToTenantTable(),
"phone_number")) {
errorByPosition.put(users.get(position).userId, new DuplicatePhoneNumberException());

} else if (isForeignKeyConstraintError(serverMessage, config.getAppIdToUserIdTable(),
"app_id")) {
throw new TenantOrAppNotFoundException(users.get(position).tenantIdentifier.toAppIdentifier());
} else if (isForeignKeyConstraintError(serverMessage, config.getAppIdToUserIdTable(),
"app_id")) {
throw new TenantOrAppNotFoundException(users.get(position).tenantIdentifier.toAppIdentifier());

} else if (isForeignKeyConstraintError(serverMessage, config.getUsersTable(),
"tenant_id")) {
throw new TenantOrAppNotFoundException(users.get(position).tenantIdentifier.toAppIdentifier());
}
} else if (isForeignKeyConstraintError(serverMessage, config.getUsersTable(),
"tenant_id")) {
throw new TenantOrAppNotFoundException(users.get(position).tenantIdentifier.toAppIdentifier());
}
nextException = nextException.getNextException();
}
throw new StorageQueryException(
new BulkImportBatchInsertException("passwordless errors", errorByPosition));
nextException = nextException.getNextException();
}
throw new StorageQueryException(e);
throw new StorageQueryException(
new BulkImportBatchInsertException("passwordless errors", errorByPosition));
}
throw new StorageQueryException(e);
}
}

Expand Down Expand Up @@ -3627,8 +3648,7 @@ public void addBulkImportUsers(AppIdentifier appIdentifier, List<BulkImportUser>
try {
BulkImportQueries.insertBulkImportUsers_Transaction(this, (Connection) con.getConnection(), appIdentifier, users);
} catch (SQLException e) {
if (e instanceof BatchUpdateException) {
BatchUpdateException batchUpdateException = (BatchUpdateException) e;
if (e instanceof BatchUpdateException batchUpdateException) {
SQLException nextException = batchUpdateException.getNextException();
if(nextException instanceof PSQLException){
ServerErrorMessage serverErrorMessage = ((PSQLException) nextException).getServerErrorMessage();
Expand Down Expand Up @@ -3756,6 +3776,7 @@ public List<String> deleteBulkImportUsers(AppIdentifier appIdentifier, @Nonnull
try {
return BulkImportQueries.deleteBulkImportUsers(this, appIdentifier, bulkImportUserIds);
} catch (SQLException e) {
Logging.error(this, "Error deleting bulk import users", true, e);
throw new StorageQueryException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ public static AuthRecipeUserInfo signUp(Start start, TenantIdentifier tenantIden
}

public static void signUpMultipleForBulkImport_Transaction(Start start, Connection sqlCon, List<EmailPasswordImportUser> usersToSignUp)
throws StorageQueryException, StorageTransactionLogicException, SQLException {
throws StorageQueryException, StorageTransactionLogicException {
try {
String app_id_to_user_id_QUERY = "INSERT INTO " + getConfig(start).getAppIdToUserIdTable()
+ "(app_id, user_id, primary_or_recipe_user_id, recipe_id)" + " VALUES(?, ?, ?, ?)";
Expand Down
Loading