16
16
17
17
package org .kairosdb .plugin .remote ;
18
18
19
+ import com .google .common .base .Stopwatch ;
19
20
import com .google .common .collect .ArrayListMultimap ;
20
21
import com .google .common .collect .ImmutableSortedMap ;
21
22
import com .google .common .collect .Multimap ;
39
40
import java .nio .file .Files ;
40
41
import java .util .Arrays ;
41
42
import java .util .SortedMap ;
43
+ import java .util .concurrent .TimeUnit ;
42
44
import java .util .zip .GZIPOutputStream ;
43
45
44
46
import static com .google .common .base .Preconditions .checkArgument ;
45
47
import static com .google .common .base .Preconditions .checkNotNull ;
46
48
import static com .google .common .base .Strings .isNullOrEmpty ;
47
49
48
50
49
- public class RemoteDatastore
51
+ public class RemoteListener
50
52
{
51
- private static final Logger logger = LoggerFactory .getLogger (RemoteDatastore .class );
52
- private static final String DATA_DIR_PROP = "kairosdb.datastore.remote.data_dir" ;
53
- private static final String DROP_PERCENT_PROP = "kairosdb.datastore.remote.drop_on_used_disk_space_threshold_percent" ;
54
- private static final String METRIC_PREFIX_FILTER = "kairosdb.datastore.remote.prefix_filter" ;
55
-
56
- private static final String FILE_SIZE_METRIC = "kairosdb.datastore.remote.file_size" ;
57
- private static final String ZIP_FILE_SIZE_METRIC = "kairosdb.datastore.remote.zip_file_size" ;
58
- private static final String WRITE_SIZE_METRIC = "kairosdb.datastore.remote.write_size" ;
59
- private static final String TIME_TO_SEND_METRIC = "kairosdb.datastore.remote.time_to_send" ;
60
- private static final String DELETE_ZIP_METRIC = "kairosdb.datastore.remote.deleted_zipFile_size" ;
53
+ private static final Logger logger = LoggerFactory .getLogger (RemoteListener .class );
54
+ private static final String DATA_DIR_PROP = "kairosdb.remote.data_dir" ;
55
+ private static final String DROP_PERCENT_PROP = "kairosdb.remote.drop_on_used_disk_space_threshold_percent" ;
56
+ private static final String METRIC_PREFIX_FILTER = "kairosdb.remote.prefix_filter" ;
57
+
58
+ private static final String FILE_SIZE_METRIC = "kairosdb.remote.file_size" ;
59
+ private static final String ZIP_FILE_SIZE_METRIC = "kairosdb.remote.zip_file_size" ;
60
+ private static final String WRITE_SIZE_METRIC = "kairosdb.remote.write_size" ;
61
+ private static final String TIME_TO_SEND_METRIC = "kairosdb.remote.time_to_send" ;
62
+ private static final String DELETE_ZIP_METRIC = "kairosdb.remote.deleted_zipFile_size" ;
63
+ private static final String FLUSH_INTERVAL = "kairosdb.remote.flush_interval_ms" ;
64
+ private static final String MAX_FILE_SIZE_MB = "kairosdb.remote.max_file_size_mb" ;
61
65
62
66
private final Object m_dataFileLock = new Object ();
63
67
private final Object m_sendLock = new Object ();
64
68
private final int m_dropPercent ;
65
69
private final File m_dataDirectory ;
66
70
private final RemoteHost m_remoteHost ;
67
71
private final DiskUtils m_diskUtils ;
72
+ private final int m_flushInterval ;
73
+ private final ImmutableSortedMap <String , String > m_tags ;
68
74
private BufferedWriter m_dataWriter ;
69
75
private final Publisher <DataPointEvent > m_publisher ;
70
76
private String m_dataFileName ;
71
77
private volatile boolean m_firstDataPoint = true ;
72
78
private int m_dataPointCounter ;
79
+ private Stopwatch m_sendTimer = Stopwatch .createUnstarted ();
73
80
74
81
private volatile Multimap <DataPointKey , DataPoint > m_dataPointMultimap ;
75
82
private final Object m_mapLock = new Object (); //Lock for the above map
76
83
77
84
private volatile boolean m_running ;
78
85
79
- @ Inject
80
- @ Named ("HOSTNAME" )
81
- private String m_hostName = "localhost" ;
82
-
83
86
private String [] m_prefixFilterArray = new String [0 ];
84
87
88
+ private long m_maxFileSize = 1024 *1024 *10 ;
89
+
85
90
@ Inject
86
91
private LongDataPointFactory m_longDataPointFactory = new LongDataPointFactoryImpl ();
87
92
@@ -96,9 +101,13 @@ public void setPrefixFilter(@Named(METRIC_PREFIX_FILTER) String prefixFilter)
96
101
}
97
102
98
103
@ Inject
99
- public RemoteDatastore (@ Named (DATA_DIR_PROP ) String dataDir ,
100
- @ Named (DROP_PERCENT_PROP ) String dropPercent , RemoteHost remoteHost ,
101
- FilterEventBus eventBus , DiskUtils diskUtils ) throws IOException , DatastoreException
104
+ public RemoteListener (@ Named (DATA_DIR_PROP ) String dataDir ,
105
+ @ Named (DROP_PERCENT_PROP ) String dropPercent ,
106
+ @ Named (FLUSH_INTERVAL ) int flushInterval ,
107
+ @ Named ("HOSTNAME" ) String hostName ,
108
+ RemoteHost remoteHost ,
109
+ FilterEventBus eventBus ,
110
+ DiskUtils diskUtils ) throws IOException , DatastoreException
102
111
{
103
112
m_dataDirectory = new File (dataDir );
104
113
m_dropPercent = Integer .parseInt (dropPercent );
@@ -107,6 +116,11 @@ public RemoteDatastore(@Named(DATA_DIR_PROP) String dataDir,
107
116
m_remoteHost = checkNotNull (remoteHost , "remote host must not be null" );
108
117
m_publisher = eventBus .createPublisher (DataPointEvent .class );
109
118
m_diskUtils = checkNotNull (diskUtils , "diskUtils must not be null" );
119
+ m_flushInterval = flushInterval ;
120
+
121
+ m_tags = ImmutableSortedMap .<String , String >naturalOrder ()
122
+ .put ("host" , hostName )
123
+ .build ();
110
124
111
125
createNewMap ();
112
126
@@ -123,7 +137,7 @@ public RemoteDatastore(@Named(DATA_DIR_PROP) String dataDir,
123
137
{
124
138
flushMap ();
125
139
126
- Thread .sleep (2000 );
140
+ Thread .sleep (m_flushInterval );
127
141
}
128
142
catch (Exception e )
129
143
{
@@ -138,6 +152,12 @@ public RemoteDatastore(@Named(DATA_DIR_PROP) String dataDir,
138
152
139
153
}
140
154
155
+ @ Inject
156
+ public void setMaxFileSize (@ Named (MAX_FILE_SIZE_MB )long maxFileSize )
157
+ {
158
+ m_maxFileSize = maxFileSize * 1024 * 1024 ;
159
+ }
160
+
141
161
private Multimap <DataPointKey , DataPoint > createNewMap ()
142
162
{
143
163
Multimap <DataPointKey , DataPoint > ret ;
@@ -161,6 +181,9 @@ private void flushMap()
161
181
{
162
182
try
163
183
{
184
+ //Check if we need to role to a new file because of size
185
+ rollAndZipFile (System .currentTimeMillis (), true );
186
+
164
187
for (DataPointKey dataPointKey : flushMap .keySet ())
165
188
{
166
189
//We have to clear the writer every time or it gets confused
@@ -359,7 +382,7 @@ private void cleanDiskSpace()
359
382
Files .delete (zipFiles [0 ].toPath ());
360
383
logger .warn ("Disk is too full to create zip file. Deleted older zip file " + zipFiles [0 ].getName () + " size: " + size );
361
384
// For forwarding this metric will be reported both on the local kairos node and the remote
362
- m_publisher .post (new DataPointEvent (DELETE_ZIP_METRIC , ImmutableSortedMap . of ( "host" , m_hostName ) ,
385
+ m_publisher .post (new DataPointEvent (DELETE_ZIP_METRIC , m_tags ,
363
386
m_longDataPointFactory .createDataPoint (System .currentTimeMillis (), size )));
364
387
cleanDiskSpace (); // continue cleaning until space is freed up or all zip files are deleted.
365
388
}
@@ -376,39 +399,65 @@ private boolean hasSpace()
376
399
return m_dropPercent >= 100 || m_diskUtils .percentAvailable (m_dataDirectory ) < m_dropPercent ;
377
400
}
378
401
379
- void sendData () throws IOException
402
+ //Rolls to a new file and zips up the current one
403
+ private void rollAndZipFile (long now , boolean conditionalRoll ) throws IOException
380
404
{
381
- synchronized (m_sendLock )
382
- {
383
- String oldDataFile = m_dataFileName ;
384
- long now = System .currentTimeMillis ();
385
-
386
- long fileSize = (new File (m_dataFileName )).length ();
405
+ int dataPointCounter ;
406
+ String oldDataFile ;
407
+ long fileSize ;
387
408
388
- ImmutableSortedMap < String , String > tags = ImmutableSortedMap .< String , String > naturalOrder ( )
389
- . put ( "host" , m_hostName )
390
- . build ();
409
+ synchronized ( m_dataFileLock )
410
+ {
411
+ fileSize = ( new File ( m_dataFileName )). length ();
391
412
392
- int dataPointCounter ;
393
- synchronized (m_dataFileLock )
413
+ if (conditionalRoll )
394
414
{
395
- closeDataFile ();
396
- dataPointCounter = m_dataPointCounter ;
397
- openDataFile () ;
415
+ //Check file size
416
+ if ( fileSize < m_maxFileSize )
417
+ return ;
398
418
}
399
419
400
- long zipSize = zipFile (oldDataFile );
420
+ oldDataFile = m_dataFileName ;
421
+
422
+ closeDataFile ();
423
+ //m_dataPointCounter gets reset in openDataFile()
424
+ dataPointCounter = m_dataPointCounter ;
425
+ openDataFile ();
426
+ }
427
+
428
+ long zipSize = zipFile (oldDataFile );
429
+
430
+ try
431
+ {
432
+ putDataPoint (new DataPointEvent (FILE_SIZE_METRIC , m_tags , m_longDataPointFactory .createDataPoint (now , fileSize ), 0 ));
433
+ putDataPoint (new DataPointEvent (WRITE_SIZE_METRIC , m_tags , m_longDataPointFactory .createDataPoint (now , dataPointCounter ), 0 ));
434
+ putDataPoint (new DataPointEvent (ZIP_FILE_SIZE_METRIC , m_tags , m_longDataPointFactory .createDataPoint (now , zipSize ), 0 ));
435
+ }
436
+ catch (DatastoreException e )
437
+ {
438
+ logger .error ("Error writing remote metrics" , e );
439
+ }
440
+ }
441
+
442
+ //Called by RemoteSendJob that is on a timer set in config
443
+ void sendData () throws IOException
444
+ {
445
+ synchronized (m_sendLock )
446
+ {
447
+
448
+ long now = System .currentTimeMillis ();
449
+ m_sendTimer .start ();
450
+
451
+ rollAndZipFile (now , false );
401
452
402
453
sendAllZipfiles ();
403
454
404
- long timeToSend = System .currentTimeMillis () - now ;
455
+ long timeToSend = m_sendTimer .elapsed (TimeUnit .MILLISECONDS );
456
+ m_sendTimer .reset ();
405
457
406
458
try
407
459
{
408
- putDataPoint (new DataPointEvent (FILE_SIZE_METRIC , tags , m_longDataPointFactory .createDataPoint (now , fileSize ), 0 ));
409
- putDataPoint (new DataPointEvent (WRITE_SIZE_METRIC , tags , m_longDataPointFactory .createDataPoint (now , dataPointCounter ), 0 ));
410
- putDataPoint (new DataPointEvent (ZIP_FILE_SIZE_METRIC , tags , m_longDataPointFactory .createDataPoint (now , zipSize ), 0 ));
411
- putDataPoint (new DataPointEvent (TIME_TO_SEND_METRIC , tags , m_longDataPointFactory .createDataPoint (now , timeToSend ), 0 ));
460
+ putDataPoint (new DataPointEvent (TIME_TO_SEND_METRIC , m_tags , m_longDataPointFactory .createDataPoint (now , timeToSend ), 0 ));
412
461
}
413
462
catch (DatastoreException e )
414
463
{
0 commit comments