Skip to content

Commit 8c6bac1

Browse files
committed
UPSERT mode for DELETE operation not send external version
1 parent e4629e6 commit 8c6bac1

File tree

1 file changed

+7
-1
lines changed

1 file changed

+7
-1
lines changed

src/main/java/io/confluent/connect/elasticsearch/DataConverter.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@
5252
import java.util.List;
5353
import java.util.Map;
5454

55+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.WriteMethod.UPSERT;
56+
5557
public class DataConverter {
5658

5759
private static final Logger log = LoggerFactory.getLogger(DataConverter.class);
@@ -161,7 +163,11 @@ public DocWriteRequest<?> convertRecord(SinkRecord record, String index) {
161163

162164
// delete
163165
if (record.value() == null) {
164-
return maybeAddExternalVersioning(new DeleteRequest(index).id(id), record);
166+
if(config.writeMethod().name().equals(UPSERT.name())) {
167+
return new DeleteRequest(index).id(id);
168+
}else{
169+
return maybeAddExternalVersioning(new DeleteRequest(index).id(id), record);
170+
}
165171
}
166172

167173
String payload = getPayload(record);

0 commit comments

Comments
 (0)