diff --git a/src/main/java/io/confluent/connect/jdbc/sink/BufferedRecords.java b/src/main/java/io/confluent/connect/jdbc/sink/BufferedRecords.java index e51d4a1f8..498a2a049 100644 --- a/src/main/java/io/confluent/connect/jdbc/sink/BufferedRecords.java +++ b/src/main/java/io/confluent/connect/jdbc/sink/BufferedRecords.java @@ -195,15 +195,30 @@ public List flush() throws SQLException { } private void executeUpdates() throws SQLException { - int[] batchStatus = updatePreparedStatement.executeBatch(); - for (int updateCount : batchStatus) { - if (updateCount == Statement.EXECUTE_FAILED) { - throw new BatchUpdateException( - "Execution failed for part of the batch update", batchStatus); + try { + updatePreparedStatement.executeBatch(); + } catch (BatchUpdateException bue) { + SQLException nextException = bue.getNextException(); + boolean hasNonDuplicateErrors = false; + + while (nextException != null) { + if (nextException.getErrorCode() == 1 || nextException.getMessage().contains("ORA-00001")) { + log.warn("Skipping duplicate record due to ORA-00001: {}", nextException.getMessage()); + } else { + hasNonDuplicateErrors = true; + log.error("SQL Error during batch update: {}", nextException.getMessage(), nextException); + } + nextException = nextException.getNextException(); + } + + // Only throw the original exception if it contains non-duplicate errors + if (hasNonDuplicateErrors) { + throw bue; + } } - } } + private void executeDeletes() throws SQLException { if (nonNull(deletePreparedStatement)) { int[] batchStatus = deletePreparedStatement.executeBatch();