@@ -39,8 +39,8 @@ public abstract class RetryScheduler<T> {
3939 private Class <?> type = GenericsUtils .getSuperClassGenericType (this .getClass ());
4040
4141 // 定时检查是否有 师表的反馈任务信息(给客户端的)
42- private ScheduledExecutorService RETRY_EXECUTOR_SERVICE = Executors . newSingleThreadScheduledExecutor ( new NamedThreadFactory ( "LTS-RetryScheduler-retry" , true )) ;
43- private ScheduledExecutorService MASTER_RETRY_EXECUTOR_SERVICE = Executors . newSingleThreadScheduledExecutor ( new NamedThreadFactory ( "LTS-RetryScheduler-master-retry" , true )) ;
42+ private ScheduledExecutorService RETRY_EXECUTOR_SERVICE ;
43+ private ScheduledExecutorService MASTER_RETRY_EXECUTOR_SERVICE ;
4444 private ScheduledFuture <?> masterScheduledFuture ;
4545 private ScheduledFuture <?> scheduledFuture ;
4646 private AtomicBoolean selfCheckStart = new AtomicBoolean (false );
@@ -93,6 +93,7 @@ public RetryScheduler(String name, AppContext appContext, String storePath, int
9393 public void start () {
9494 try {
9595 if (selfCheckStart .compareAndSet (false , true )) {
96+ this .RETRY_EXECUTOR_SERVICE = Executors .newSingleThreadScheduledExecutor (new NamedThreadFactory ("LTS-RetryScheduler-retry" , true ));
9697 // 这个时间后面再去优化
9798 scheduledFuture = RETRY_EXECUTOR_SERVICE .scheduleWithFixedDelay
9899 (new CheckSelfRunner (), 10 , 30 , TimeUnit .SECONDS );
@@ -106,6 +107,7 @@ public void start() {
106107 private void startMasterCheck () {
107108 try {
108109 if (masterCheckStart .compareAndSet (false , true )) {
110+ this .MASTER_RETRY_EXECUTOR_SERVICE = Executors .newSingleThreadScheduledExecutor (new NamedThreadFactory ("LTS-RetryScheduler-master-retry" , true ));
109111 // 这个时间后面再去优化
110112 masterScheduledFuture = MASTER_RETRY_EXECUTOR_SERVICE .
111113 scheduleWithFixedDelay (new CheckDeadFailStoreRunner (), 30 , 60 , TimeUnit .SECONDS );
@@ -121,7 +123,9 @@ private void stopMasterCheck() {
121123 if (masterCheckStart .compareAndSet (true , false )) {
122124 if (masterScheduledFuture != null ) {
123125 masterScheduledFuture .cancel (true );
126+ masterScheduledFuture = null ;
124127 MASTER_RETRY_EXECUTOR_SERVICE .shutdown ();
128+ MASTER_RETRY_EXECUTOR_SERVICE = null ;
125129 }
126130 LOGGER .info ("Stop {} master RetryScheduler success, identity=[{}]" , name , appContext .getConfig ().getIdentity ());
127131 }
@@ -135,8 +139,10 @@ public void stop() {
135139 if (selfCheckStart .compareAndSet (true , false )) {
136140 if (scheduledFuture != null ) {
137141 scheduledFuture .cancel (true );
142+ scheduledFuture = null ;
138143 failStore .close ();
139144 RETRY_EXECUTOR_SERVICE .shutdown ();
145+ RETRY_EXECUTOR_SERVICE = null ;
140146 }
141147 LOGGER .info ("Stop {} RetryScheduler success, identity=[{}]" , name , appContext .getConfig ().getIdentity ());
142148 }
@@ -155,13 +161,20 @@ public void destroy() {
155161 }
156162 }
157163
164+ private AtomicBoolean checkSelfRunnerStart = new AtomicBoolean (false );
165+
158166 /**
159167 * 定时检查 提交失败任务的Runnable
160168 */
161169 private class CheckSelfRunner implements Runnable {
162170
163171 @ Override
164172 public void run () {
173+
174+ if (!checkSelfRunnerStart .compareAndSet (false , true )) {
175+ return ;
176+ }
177+
165178 try {
166179 // 1. 检测 远程连接 是否可用
167180 if (!isRemotingEnable ()) {
@@ -200,17 +213,24 @@ public void run() {
200213
201214 } catch (Throwable e ) {
202215 LOGGER .error ("Run {} RetryScheduler error , identity=[{}]" , name , appContext .getConfig ().getIdentity (), e );
216+ } finally {
217+ checkSelfRunnerStart .set (false );
203218 }
204219 }
205220 }
206221
222+ private AtomicBoolean checkDeadFailStoreRunnerStart = new AtomicBoolean (false );
223+
207224 /**
208225 * 定时检查 已经down掉的机器的FailStore目录
209226 */
210227 private class CheckDeadFailStoreRunner implements Runnable {
211228
212229 @ Override
213230 public void run () {
231+ if (!checkDeadFailStoreRunnerStart .compareAndSet (false , true )) {
232+ return ;
233+ }
214234 try {
215235 // 1. 检测 远程连接 是否可用
216236 if (!isRemotingEnable ()) {
@@ -255,6 +275,8 @@ public void run() {
255275 }
256276 } catch (Throwable e ) {
257277 LOGGER .error ("Run {} master RetryScheduler error, identity=[{}] " , name , appContext .getConfig ().getIdentity (), e );
278+ } finally {
279+ checkDeadFailStoreRunnerStart .set (false );
258280 }
259281 }
260282 }
0 commit comments