6161import java .util .concurrent .ConcurrentHashMap ;
6262import java .util .concurrent .ConcurrentLinkedQueue ;
6363import java .util .concurrent .ExecutionException ;
64+ import java .util .concurrent .ExecutorService ;
6465import java .util .concurrent .Executors ;
6566import java .util .concurrent .ScheduledExecutorService ;
6667import java .util .concurrent .ScheduledFuture ;
68+ import java .util .concurrent .Semaphore ;
69+ import java .util .concurrent .ThreadFactory ;
6770import java .util .concurrent .TimeUnit ;
6871import java .util .concurrent .TimeoutException ;
6972import java .util .concurrent .atomic .AtomicInteger ;
@@ -133,8 +136,8 @@ private ElasticsearchIO() {} // disable new
133136 private static final ConcurrentHashMap <String , RestClient > clientPool = new ConcurrentHashMap <>();
134137 private static final ConcurrentHashMap <String , Integer > versionCache = new ConcurrentHashMap <>();
135138
136- // Shared scheduler for time-based operations
137- private static final ScheduledExecutorService scheduler = Executors . newScheduledThreadPool ( 2 );
139+ // Note: scheduler is now instance-scoped in AppendFn (see @Setup/@Teardown)
140+ // to avoid cross-pipeline interference when multiple pipelines share the JVM.
138141
139142 // Performance metrics
140143 private static final AtomicLong totalDocuments = new AtomicLong (0 );
@@ -185,10 +188,22 @@ public abstract static class ConnectionConf implements Serializable {
185188 // added for ignore self-signed certs
186189 public abstract boolean isIgnoreInsecureSSL ();
187190
188- // Connection pooling key for reuse
191+ /**
192+ * Connection pooling key for client reuse. Includes all security-relevant fields
193+ * so that connections with different credentials, SSL settings, or keystores
194+ * are never incorrectly shared.
195+ */
189196 public String getPoolKey () {
190- return getAddress () + ":" + getIndex () + ":" +
191- (getUsername () != null ? getUsername () : "noauth" );
197+ StringBuilder key = new StringBuilder ();
198+ key .append (getAddress ()).append ('|' )
199+ .append (getIndex ()).append ('|' )
200+ .append (getUsername () != null ? getUsername () : "noauth" ).append ('|' )
201+ .append (getPassword () != null ? getPassword ().hashCode () : 0 ).append ('|' )
202+ .append (getApiKey () != null ? getApiKey ().hashCode () : 0 ).append ('|' )
203+ .append (isIgnoreInsecureSSL ()).append ('|' )
204+ .append (isTrustSelfSignedCerts ()).append ('|' )
205+ .append (getKeystorePath () != null ? getKeystorePath () : "nokeys" );
206+ return key .toString ();
192207 }
193208
194209 abstract Builder builder ();
@@ -307,6 +322,17 @@ public HttpAsyncClientBuilder customizeHttpClient(
307322 }
308323
309324 if (isIgnoreInsecureSSL ()) {
325+ // SECURITY WARNING: This disables ALL TLS certificate verification.
326+ // An attacker in network position can intercept all traffic including
327+ // credentials and documents (MITM attack).
328+ String env = System .getenv ("ENVIRONMENT" );
329+ if ("production" .equalsIgnoreCase (env ) || "prod" .equalsIgnoreCase (env )) {
330+ throw new IllegalStateException (
331+ "isIgnoreInsecureSSL is FORBIDDEN in production environments. " +
332+ "Configure proper TLS certificates via keystorePath instead." );
333+ }
334+ logger .warn ("*** INSECURE SSL IS ENABLED — ALL CERTIFICATE VERIFICATION DISABLED ***" );
335+ logger .warn ("*** DO NOT USE IN PRODUCTION — VULNERABLE TO MITM ATTACKS ***" );
310336 try {
311337 SSLContext context = SSLContext .getInstance ("TLS" );
312338
@@ -615,9 +641,18 @@ static class AppendFn extends DoFn<String, Void> {
615641 // Performance optimization: reuse byte buffers
616642 private transient ThreadLocal <ByteArrayOutputStream > byteBufferPool ;
617643
644+ // Instance-scoped scheduler for time-based flushing (not static — avoids cross-pipeline interference)
645+ private transient ScheduledExecutorService scheduler ;
646+
618647 // Store ScheduledFuture to cancel on bundle finish
619648 private transient ScheduledFuture <?> flushTask ;
620649
650+ // Dedicated IO executor — avoids starving ForkJoinPool.commonPool() with blocking HTTP calls
651+ private transient ExecutorService ioExecutor ;
652+
653+ // Backpressure: limits concurrent in-flight ES requests
654+ private transient Semaphore concurrencySemaphore ;
655+
621656 // transient ConcurrentLinkedQueue — lock-free, no serialization issue
622657 private transient Queue <CompletableFuture <Void >> pendingOperations ;
623658
@@ -639,6 +674,17 @@ public void setup() throws IOException {
639674 // Initialize ThreadLocal after deserialization
640675 byteBufferPool = ThreadLocal .withInitial (() -> new ByteArrayOutputStream (8192 ));
641676
677+ // Instance-scoped scheduler — each DoFn instance gets its own, avoiding
678+ // cross-pipeline interference when cleanup() is called
679+ scheduler = Executors .newSingleThreadScheduledExecutor (daemonThreadFactory ("es-flush-" + poolKey ));
680+
681+ // Dedicated IO thread pool for ES bulk requests — never starves ForkJoinPool.commonPool()
682+ int maxConcurrent = spec .getMaxConcurrentRequests ();
683+ ioExecutor = Executors .newFixedThreadPool (maxConcurrent , daemonThreadFactory ("es-io-" + poolKey ));
684+
685+ // Semaphore enforces the configured maxConcurrentRequests as actual backpressure
686+ concurrencySemaphore = new Semaphore (maxConcurrent );
687+
642688 // Use cached version or get from server
643689 esVersion = versionCache .computeIfAbsent (poolKey , k -> {
644690 try {
@@ -686,10 +732,20 @@ public void startBundle(StartBundleContext context) {
686732 currentBatchSizeBytes = new AtomicLong (0 );
687733 lastFlushTime = System .currentTimeMillis ();
688734
689- // Schedule periodic flush and store the future for cancellation
735+ // Schedule periodic flush and store the future for cancellation.
736+ // IMPORTANT: scheduleAtFixedRate silently suppresses exceptions — if the
737+ // task throws, it stops running with no notification. We wrap in try-catch
738+ // to log and continue, preventing silent flush death.
690739 if (spec .getFlushIntervalMillis () > 0 ) {
691740 flushTask = scheduler .scheduleAtFixedRate (
692- this ::timeBasedFlush ,
741+ () -> {
742+ try {
743+ timeBasedFlush ();
744+ } catch (Throwable t ) {
745+ totalErrors .incrementAndGet ();
746+ logger .error ("Scheduled flush task failed — flush will continue on next interval" , t );
747+ }
748+ },
693749 spec .getFlushIntervalMillis (),
694750 spec .getFlushIntervalMillis (),
695751 TimeUnit .MILLISECONDS
@@ -767,6 +823,35 @@ public void closeClient() throws IOException {
767823 flushTask .cancel (false );
768824 flushTask = null ;
769825 }
826+
827+ // Shutdown instance-scoped scheduler
828+ if (scheduler != null ) {
829+ scheduler .shutdown ();
830+ try {
831+ if (!scheduler .awaitTermination (5 , TimeUnit .SECONDS )) {
832+ scheduler .shutdownNow ();
833+ }
834+ } catch (InterruptedException e ) {
835+ scheduler .shutdownNow ();
836+ Thread .currentThread ().interrupt ();
837+ }
838+ scheduler = null ;
839+ }
840+
841+ // Shutdown dedicated IO executor
842+ if (ioExecutor != null ) {
843+ ioExecutor .shutdown ();
844+ try {
845+ if (!ioExecutor .awaitTermination (10 , TimeUnit .SECONDS )) {
846+ ioExecutor .shutdownNow ();
847+ }
848+ } catch (InterruptedException e ) {
849+ ioExecutor .shutdownNow ();
850+ Thread .currentThread ().interrupt ();
851+ }
852+ ioExecutor = null ;
853+ }
854+
770855 // Don't close pooled clients - they're shared
771856 if (pendingOperations != null ) {
772857 pendingOperations .clear ();
@@ -793,21 +878,29 @@ private void flushBatchAsync() {
793878 lastFlushTime = System .currentTimeMillis ();
794879 }
795880
796- // Process batch asynchronously
881+ // Enforce backpressure: block if too many batches are in-flight
882+ try {
883+ concurrencySemaphore .acquire ();
884+ } catch (InterruptedException e ) {
885+ Thread .currentThread ().interrupt ();
886+ logger .warn ("Interrupted while waiting for backpressure semaphore" , e );
887+ return ;
888+ }
889+
890+ // Process batch on dedicated IO executor — never starves ForkJoinPool.commonPool()
797891 CompletableFuture <Void > future = CompletableFuture .runAsync (() -> {
798892 try {
799893 processBatch (currentBatch , batchBytes );
800894 } catch (Exception e ) {
801895 logger .error ("Failed to process batch of {} documents" , currentBatch .size (), e );
802896 totalErrors .incrementAndGet ();
803897 throw new RuntimeException ("Batch processing failed" , e );
898+ } finally {
899+ concurrencySemaphore .release ();
804900 }
805- });
901+ }, ioExecutor );
806902
807903 pendingOperations .add (future );
808-
809- // Periodic cleanup of completed futures
810- pendingOperations .removeIf (CompletableFuture ::isDone );
811904 }
812905
813906 private void processBatch (List <String > batchToProcess , long batchSizeBytes )
@@ -949,6 +1042,16 @@ private void timeBasedFlush() {
9491042 logger .warn ("Error during time-based flush" , e );
9501043 }
9511044 }
1045+
1046+ /** Creates a daemon ThreadFactory with the given name prefix. */
1047+ private static ThreadFactory daemonThreadFactory (String prefix ) {
1048+ AtomicInteger counter = new AtomicInteger (0 );
1049+ return r -> {
1050+ Thread t = new Thread (r , prefix + "-" + counter .getAndIncrement ());
1051+ t .setDaemon (true );
1052+ return t ;
1053+ };
1054+ }
9521055 }
9531056 }
9541057
@@ -1046,11 +1149,15 @@ static int getEsVersion(ConnectionConf connectionConf) {
10461149 }
10471150
10481151 /**
1049- * Cleanup method for releasing shared resources.
1152+ * Cleanup method for releasing shared resources (client pool and version cache) .
10501153 * Should be called when the application shuts down.
1154+ *
1155+ * Note: The scheduler and IO executor are instance-scoped and cleaned up
1156+ * in AppendFn.closeClient() (@Teardown), so they are NOT managed here.
1157+ * This prevents one pipeline's cleanup from killing another pipeline's resources.
10511158 */
10521159 public static void cleanup () {
1053- logger .info ("Cleaning up ElasticsearchIO resources..." );
1160+ logger .info ("Cleaning up ElasticsearchIO shared resources..." );
10541161
10551162 // Close all pooled clients
10561163 clientPool .forEach ((key , client ) -> {
@@ -1063,17 +1170,6 @@ public static void cleanup() {
10631170 clientPool .clear ();
10641171 versionCache .clear ();
10651172
1066- // Shutdown scheduler
1067- scheduler .shutdown ();
1068- try {
1069- if (!scheduler .awaitTermination (10 , TimeUnit .SECONDS )) {
1070- scheduler .shutdownNow ();
1071- }
1072- } catch (InterruptedException e ) {
1073- scheduler .shutdownNow ();
1074- Thread .currentThread ().interrupt ();
1075- }
1076-
10771173 logger .info ("ElasticsearchIO cleanup completed. Total documents: {}, batches: {}, errors: {}" ,
10781174 totalDocuments .get (), totalBatches .get (), totalErrors .get ());
10791175 }
0 commit comments