@@ -31,8 +31,8 @@ public abstract class AbstractPreLoader implements PreLoader {
3131 // 预取阀值
3232 private double factor = 0.8 ;
3333
34- private ConcurrentHashMap <String /*taskTrackerNodeGroup*/ , List <JobPo >>
35- JOB_MAP = new ConcurrentHashMap <String , List <JobPo >>();
34+ private ConcurrentHashMap <String /*taskTrackerNodeGroup*/ , BlockingQueue <JobPo >>
35+ JOB_MAP = new ConcurrentHashMap <String , BlockingQueue <JobPo >>();
3636
3737 // 加载的信号
3838 private ConcurrentHashSet <String > LOAD_SIGNAL = new ConcurrentHashSet <String >();
@@ -58,7 +58,7 @@ public void run() {
5858 LOAD_SIGNAL .remove (loadTaskTrackerNodeGroup );
5959 }
6060 }
61- }, 3 , 1 , TimeUnit .SECONDS );
61+ }, 500 , 500 , TimeUnit .MILLISECONDS );
6262 }
6363
6464 application .getEventCenter ().subscribe (new EventSubscriber (application .getConfig ().getIdentity () + "_preLoader" , new Observer () {
@@ -117,10 +117,23 @@ public JobPo take(String taskTrackerNodeGroup, String taskTrackerIdentity) {
117117 protected abstract List <JobPo > load (String loadTaskTrackerNodeGroup , int offset );
118118
119119 private JobPo get (String taskTrackerNodeGroup ) {
120- List <JobPo > jobPos = JOB_MAP .get (taskTrackerNodeGroup );
120+ BlockingQueue <JobPo > jobPos = JOB_MAP .get (taskTrackerNodeGroup );
121121 if (jobPos == null ) {
122- jobPos = new CopyOnWriteArrayList <JobPo >();
123- List <JobPo > oldJobPos = JOB_MAP .putIfAbsent (taskTrackerNodeGroup , jobPos );
122+ jobPos = new PriorityBlockingQueue <JobPo >(step , new Comparator <JobPo >() {
123+ @ Override
124+ public int compare (JobPo left , JobPo right ) {
125+ int compare = left .getTriggerTime ().compareTo (right .getTriggerTime ());
126+ if (compare != 0 ) {
127+ return compare ;
128+ }
129+ compare = left .getPriority ().compareTo (left .getPriority ());
130+ if (compare != 0 ) {
131+ return compare ;
132+ }
133+ return left .getGmtCreated ().compareTo (right .getGmtCreated ());
134+ }
135+ });
136+ BlockingQueue <JobPo > oldJobPos = JOB_MAP .putIfAbsent (taskTrackerNodeGroup , jobPos );
124137 if (oldJobPos != null ) {
125138 jobPos = oldJobPos ;
126139 }
@@ -132,13 +145,7 @@ private JobPo get(String taskTrackerNodeGroup) {
132145 LOAD_SIGNAL .add (taskTrackerNodeGroup );
133146 }
134147 }
135- if (jobPos .size () > 0 ) {
136- try {
137- return jobPos .remove (0 );
138- } catch (ArrayIndexOutOfBoundsException e ) {
139- return null ;
140- }
141- }
142- return null ;
148+ return jobPos .poll ();
143149 }
150+
144151}
0 commit comments