From fa26b073e36c928c85304f2b93fe7d221c1ec7d0 Mon Sep 17 00:00:00 2001 From: Jonathan Creasy Date: Tue, 27 Oct 2020 22:58:53 -0400 Subject: [PATCH 1/2] Added Prometheus Exporter to the status interface --- collectors/etc/prometheus_conf.py | 40 ++-- tcollector.py | 357 ++++++++++++++++++------------ 2 files changed, 233 insertions(+), 164 deletions(-) diff --git a/collectors/etc/prometheus_conf.py b/collectors/etc/prometheus_conf.py index 1e225fac..4dda3ca7 100644 --- a/collectors/etc/prometheus_conf.py +++ b/collectors/etc/prometheus_conf.py @@ -1,29 +1,21 @@ #!/usr/bin/env python def enabled(): - return True + return False + def get_settings(): - """Prometheus Exporter Targets Connection Details""" - return { - 'targets': [{ - 'target_name': 'hazelcast', - 'target_host': 'localhost', - 'target_port': 8080, - }, - { - 'target_host': 'localhost', - 'target_port': 8080, - }, - { - 'target_service': 'hazelcast', - 'target_instance': 'hazelcast01.consul', - 'target_host': 'localhost', - 'target_port': 8080, - 'collection_interval': 5 - } - ], - 'collection_interval': 15, # seconds, How often to collect metric data - 'default_timeout': 10.0, # seconds - 'include_service_tags': False - } + """Prometheus Exporter Targets Connection Details""" + return { + 'targets': [{ + 'target_service': 'hazelcast', + 'target_instance': 'hazelcast01.consul', + 'target_host': 'localhost', + 'target_port': 8080, + 'collection_interval': 15 + } + ], + 'collection_interval': 60, # seconds, How often to collect metric data + 'default_timeout': 10.0, # seconds + 'include_service_tags': True + } diff --git a/tcollector.py b/tcollector.py index 21f9b231..f80419a5 100755 --- a/tcollector.py +++ b/tcollector.py @@ -38,21 +38,25 @@ from optparse import OptionParser -import collections - PY3 = sys.version_info[0] > 2 if PY3: import importlib - from queue import Queue, Empty, Full # pylint: disable=import-error - from urllib.request import Request, urlopen # pylint: disable=maybe-no-member,no-name-in-module,import-error - from urllib.error import HTTPError, URLError # pylint: disable=maybe-no-member,no-name-in-module,import-error - from http.server import HTTPServer, BaseHTTPRequestHandler # pylint: disable=maybe-no-member,no-name-in-module,import-error - from collections.abc import Callable # pylint: disable=maybe-no-member,no-name-in-module,import-error + from queue import Queue, Empty, Full # pylint: disable=import-error + from urllib.request import Request, urlopen # pylint: disable=maybe-no-member,no-name-in-module,import-error + from urllib.error import HTTPError, URLError # pylint: disable=maybe-no-member,no-name-in-module,import-error + from http.server import HTTPServer, \ + BaseHTTPRequestHandler # pylint: disable=maybe-no-member,no-name-in-module,import-error + from collections.abc import Callable # pylint: disable=maybe-no-member,no-name-in-module,import-error else: - from Queue import Queue, Empty, Full # pylint: disable=maybe-no-member,no-name-in-module,import-error - from urllib2 import Request, urlopen, HTTPError, URLError # pylint: disable=maybe-no-member,no-name-in-module,import-error - from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler # pylint: disable=maybe-no-member,no-name-in-module,import-error - from collections import Callable # pylint: disable=maybe-no-member,no-name-in-module,import-error + # noinspection PyUnresolvedReferences + from Queue import Queue, Empty, Full # pylint: disable=maybe-no-member,no-name-in-module,import-error + # noinspection PyUnresolvedReferences + from urllib2 import Request, urlopen, HTTPError, \ + URLError # pylint: disable=maybe-no-member,no-name-in-module,import-error + # noinspection PyUnresolvedReferences + from BaseHTTPServer import HTTPServer, \ + BaseHTTPRequestHandler # pylint: disable=maybe-no-member,no-name-in-module,import-error + from collections import Callable # pylint: disable=maybe-no-member,no-name-in-module,import-error # global variables. COLLECTORS = {} @@ -103,6 +107,25 @@ def nput(self, value): return True +class ItemStore(object): + def __init__(self): + self.cond = threading.Condition() + self.items = [] + + def add(self, item): + with self.cond: + self.items.append(item) + self.cond.notify() # Wake 1 thread waiting on cond (if any) + + def getAll(self, blocking=False): + with self.cond: + # If blocking is true, always return at least 1 item + while blocking and len(self.items) == 0: + self.cond.wait() + items, self.items = self.items, [] + return items + + class Collector(object): """A Collector is a script that is run that gathers some data and prints it out in standard TSD format on STDOUT. This @@ -182,7 +205,11 @@ def read(self): LOG.exception('caught exception, collector process went away while reading stdout') except TypeError as exc: # Sometimes the underlying buffer.read() returns None +<<<<<<< HEAD pass +======= + return +>>>>>>> Added Prometheus Exporter to the status interface except: LOG.exception('uncaught exception in stdout read') return @@ -198,7 +225,7 @@ def read(self): if line: self.datalines.append(line) self.last_datapoint = int(time.time()) - self.buffer = self.buffer[idx+1:] + self.buffer = self.buffer[idx + 1:] def collect(self): """Reads input from the collector and returns the lines up to whomever @@ -264,20 +291,54 @@ def do_GET(self): # another thread changing them midway (it's integers and strings and # the like), so worst case it's a tiny bit internally inconsistent. # Which is fine for monitoring. - result = json.dumps([c.to_json() for c in self.server.collectors.values()]) - self.send_response(200) - self.send_header("content-type", "text/json") - self.send_header("content-length", str(len(result))) - self.end_headers() - if PY3: - result = result.encode("utf-8") - self.wfile.write(result) + if self.path == "/" or self.path == "/status": + result = json.dumps([c.to_json() for c in self.server.collectors.values()]) + self.send_response(200) + self.send_header("content-type", "text/json") + self.send_header("content-length", str(len(result))) + self.end_headers() + if PY3: + result = result.encode("utf-8") + self.wfile.write(result) + elif self.path == "/metrics": + LOG.debug("Handling Request for /metrics") + result = "com_opentsdb_tcollector_reader_lines_collected{} %s" % (self.server.reader.lines_collected) + result = "%s\ncom_opentsdb_tcollector_reader_lines_dropped{} %s" % (result, self.server.reader.lines_dropped) + + for c in self.server.collectors.values(): + result = "%s\ncom_opentsdb_tcollector_collector_lines_sent{collector=\"%s\"} %s" % (result, c.name, int(c.lines_sent)) + result = "%s\ncom_opentsdb_tcollector_collector_lines_received{collector=\"%s\"} %s" % (result, c.name, int(c.lines_received)) + result = "%s\ncom_opentsdb_tcollector_collector_lines_invalid{collector=\"%s\"} %s" % (result, c.name, int(c.lines_invalid)) + result = "%s\ncom_opentsdb_tcollector_collector_dead{collector=\"%s\"} %s" % (result, c.name, int(c.dead)) + result = "%s\ncom_opentsdb_tcollector_collector_last_datapoint{collector=\"%s\"} %s" % (result, c.name, int(c.last_datapoint * 1000)) + result = "%s\ncom_opentsdb_tcollector_collector_last_spawn{collector=\"%s\"} %s" % (result, c.name,int(c.lastspawn * 1000)) + + metrics = self.server.reader.latestMetrics.getAll() + for metricJSON in metrics: + line = json.loads(metricJSON) + metric = line[0].strip().replace(".", "_") + tagString = line[3].strip() + result_tags = "" + if tagString != "": + tags = dict(item.split("=") for item in tagString.split(" ")) + for tag in tags.keys(): + result_tags = "%s, %s=\"%s\"" % (result_tags, tag, tags[tag]) + timestamp = line[1] + value = line[2] + result = "%s\n%s{%s} %s %d" % (result, metric, result_tags.lstrip(", "), value, int(timestamp * 1000)) + self.send_response(200) + self.send_header("content-type", "text/plain; version=0.0.4") + self.send_header("content-length", str(len(result))) + self.end_headers() + if PY3: + result = result.encode("utf-8") + self.wfile.write(result) class StatusServer(HTTPServer): """Serves status of collectors over HTTP.""" - def __init__(self, interface, port, collectors): + def __init__(self, interface, port, collectors, reader): """ interface: the interface to listen on, e.g. "127.0.0.1". port: the port to listen on, e.g. 8080. @@ -285,7 +346,9 @@ def __init__(self, interface, port, collectors): global COLLECTORS. """ self.collectors = collectors - HTTPServer.__init__(self, (interface, port), StatusRequestHandler) + self.reader = reader + LOG.debug("HTTP Server listening on %s:%s" % (interface, port)) + HTTPServer.__init__(self, (interface, int(port)), StatusRequestHandler) class StdinCollector(Collector): @@ -314,14 +377,11 @@ def read(self): else: ALIVE = False - def shutdown(self): pass - - class ReaderThread(threading.Thread): """The main ReaderThread is responsible for reading from the collectors and assuring that we always read from the input no matter what. @@ -347,6 +407,7 @@ def __init__(self, dedupinterval, evictinterval, deduponlyzero, ns_prefix=""): dedupinterval) super(ReaderThread, self).__init__() + self.latestMetrics = ItemStore() self.readerq = ReaderQueue(MAX_READQ_SIZE) self.lines_collected = 0 self.lines_dropped = 0 @@ -401,10 +462,10 @@ def process_line(self, col, line): line = self.ns_prefix + line - parsed = re.match('^([-_./a-zA-Z0-9]+)\s+' # Metric name. - '(\d+\.?\d+)\s+' # Timestamp. - '(\S+?)' # Value (int or float). - '((?:\s+[-_./a-zA-Z0-9]+=[-_./a-zA-Z0-9]+)*)$', # Tags + parsed = re.match('^([-_./a-zA-Z0-9]+)\s+' # Metric name. + '(\d+\.?\d+)\s+' # Timestamp. + '(\S+?)' # Value (int or float). + '((?:\s+[-_./a-zA-Z0-9]+=[-_./a-zA-Z0-9]+)*)$', # Tags line) if parsed is None: LOG.warning('%s sent invalid data: %s', col.name, line) @@ -413,21 +474,22 @@ def process_line(self, col, line): metric, timestamp, value, tags = parsed.groups() if isinstance(value, bool): - LOG.warning('%s sent boolean value, converted to int: %s', col.name, line) - value = int(value) + LOG.warning('%s sent boolean value, converted to int: %s', col.name, line) + value = int(value) if PY3: - string_types = str + string_types = str else: - string_types = basestring # pylint:disable=undefined-variable + # noinspection PyUnresolvedReferences + string_types = basestring # pylint:disable=undefined-variable if isinstance(value, string_types) and value.lower() == 'true': - LOG.warning('%s sent boolean value, converted to int: %s', col.name, line) - value = 1 + LOG.warning('%s sent boolean value, converted to int: %s', col.name, line) + value = 1 if isinstance(value, string_types) and value.lower() == 'false': - LOG.warning('%s sent boolean value, converted to int: %s', col.name, line) - value = 0 + LOG.warning('%s sent boolean value, converted to int: %s', col.name, line) + value = 0 try: # The regex above is fairly open, and would leave values like 'True' through @@ -480,8 +542,8 @@ def process_line(self, col, line): # value instead of the last. Fall through if we reach # the dedup interval so we can print the value. if ((not self.deduponlyzero or (self.deduponlyzero and float(value) == 0.0)) and - col.values[key][0] == value and - (timestamp - col.values[key][3] < local_dedupinterval)): + col.values[key][0] == value and + (timestamp - col.values[key][3] < local_dedupinterval)): col.values[key] = (value, True, line, col.values[key][3]) return @@ -490,8 +552,8 @@ def process_line(self, col, line): # replay the last value we skipped (if changed) so the jumps in # our graph are accurate, if ((col.values[key][1] or - (timestamp - col.values[key][3] >= local_dedupinterval)) - and col.values[key][0] != value): + (timestamp - col.values[key][3] >= local_dedupinterval)) + and col.values[key][0] != value): col.lines_sent += 1 if not self.readerq.nput(col.values[key][2]): self.lines_dropped += 1 @@ -508,6 +570,11 @@ def process_line(self, col, line): col.lines_sent += 1 if not self.readerq.nput(line): self.lines_dropped += 1 + self.store_latest(metric, timestamp, value, tags) + + def store_latest(self, metric, timestamp, value, tags): + # LOG.debug("adding to ItemStore: %s" % (json.dumps([metric, timestamp, value, tags]))) + self.latestMetrics.add(json.dumps([metric, timestamp, value, tags])) class SenderThread(threading.Thread): @@ -538,7 +605,7 @@ def __init__(self, reader, dryrun, hosts, self_report_stats, tags, self.dryrun = dryrun self.reader = reader - self.tags = sorted(tags.items()) # dictionary transformed to list + self.tags = sorted(tags.items()) # dictionary transformed to list self.http = http self.http_api_path = http_api_path self.http_username = http_username @@ -551,13 +618,13 @@ def __init__(self, reader, dryrun, hosts, self_report_stats, tags, self.current_tsd = -1 # Index in self.hosts where we're at. self.host = None # The current TSD host we've selected. self.port = None # The port of the current TSD. - self.tsd = None # The socket connected to the aforementioned TSD. + self.tsd = None # The socket connected to the aforementioned TSD. self.last_verify = 0 - self.reconnectinterval = reconnectinterval # in seconds. + self.reconnectinterval = reconnectinterval # in seconds. self.time_reconnect = 0 # if reconnectinterval > 0, used to track the time. self.sendq = [] self.self_report_stats = self_report_stats - self.maxtags = maxtags # The maximum number of tags TSD will accept. + self.maxtags = maxtags # The maximum number of tags TSD will accept. def pick_connection(self): """Picks up a random host/port connection.""" @@ -587,6 +654,9 @@ def blacklist_connection(self): LOG.info('Blacklisting %s:%s for a while', self.host, self.port) self.blacklisted_hosts.add((self.host, self.port)) + def append(self, line): + self.sendq.append(line) + def run(self): """Main loop. A simple scheduler. Loop waiting for 5 seconds for data on the queue. If there's no data, just @@ -603,7 +673,7 @@ def run(self): line = self.reader.readerq.get(True, 5) except Empty: continue - self.sendq.append(line) + self.append(line) time.sleep(5) # Wait for more data while True: # prevents self.sendq fast growing in case of sending fails @@ -614,7 +684,7 @@ def run(self): line = self.reader.readerq.get(False) except Empty: break - self.sendq.append(line) + self.append(line) if ALIVE: self.send_data() @@ -654,7 +724,7 @@ def verify_conn(self): try: self.tsd.close() except socket.error as msg: - pass # not handling that + pass # not handling that self.time_reconnect = time.time() return False @@ -697,11 +767,11 @@ def verify_conn(self): # TODO need to fix this for http if self.self_report_stats: strs = [ - ('reader.lines_collected', - '', self.reader.lines_collected), - ('reader.lines_dropped', - '', self.reader.lines_dropped) - ] + ('reader.lines_collected', + '', self.reader.lines_collected), + ('reader.lines_dropped', + '', self.reader.lines_dropped) + ] for col in all_living_collectors(): strs.append(('collector.lines_sent', 'collector=' @@ -715,7 +785,7 @@ def verify_conn(self): strout = ["tcollector.%s %d %d %s" % (x[0], ts, x[2], x[1]) for x in strs] for string in strout: - self.sendq.append(string) + self.append(string) break # TSD is alive. @@ -767,7 +837,7 @@ def maintain_conn(self): self.tsd.settimeout(15) self.tsd.connect(sockaddr) # if we get here it connected - LOG.debug('Connection to %s was successful'%(str(sockaddr))) + LOG.debug('Connection to %s was successful' % (str(sockaddr))) break except socket.error as msg: LOG.warning('Connection attempt failed to %s:%d: %s', @@ -830,9 +900,9 @@ def build_http_url(self): protocol = "https" else: protocol = "http" - details="" + details = "" if LOG.level == logging.DEBUG: - details="?details" + details = "?details" return "%s://%s:%s/%s%s" % (protocol, self.host, self.port, self.http_api_path, details) def send_data_via_http(self): @@ -843,10 +913,10 @@ def send_data_via_http(self): parts = line.split(None, 3) # not all metrics have metric-specific tags if len(parts) == 4: - (metric, timestamp, value, raw_tags) = parts + (metric, timestamp, value, raw_tags) = parts else: - (metric, timestamp, value) = parts - raw_tags = "" + (metric, timestamp, value) = parts + raw_tags = "" # process the tags metric_tags = {} for tag in raw_tags.strip().split(): @@ -858,11 +928,12 @@ def send_data_via_http(self): metric_entry["value"] = float(value) metric_entry["tags"] = dict(self.tags).copy() if len(metric_tags) + len(metric_entry["tags"]) > self.maxtags: - metric_tags_orig = set(metric_tags) - subset_metric_keys = frozenset(metric_tags[:len(metric_tags[:self.maxtags-len(metric_entry["tags"])])]) - metric_tags = dict((k, v) for k, v in metric_tags.items() if k in subset_metric_keys) - LOG.error("Exceeding maximum permitted metric tags - removing %s for metric %s", - str(metric_tags_orig - set(metric_tags)), metric) + metric_tags_orig = set(metric_tags) + subset_metric_keys = frozenset( + metric_tags[:len(metric_tags[:self.maxtags - len(metric_entry["tags"])])]) + metric_tags = dict((k, v) for k, v in metric_tags.items() if k in subset_metric_keys) + LOG.error("Exceeding maximum permitted metric tags - removing %s for metric %s", + str(metric_tags_orig - set(metric_tags)), metric) metric_entry["tags"].update(metric_tags) metrics.append(metric_entry) @@ -872,15 +943,15 @@ def send_data_via_http(self): indent=4)) return - if((self.current_tsd == -1) or (len(self.hosts) > 1)): + if ((self.current_tsd == -1) or (len(self.hosts) > 1)): self.pick_connection() url = self.build_http_url() LOG.debug("Sending metrics to url: %s", url) req = Request(url) if self.http_username and self.http_password: - req.add_header("Authorization", "Basic %s" - % base64.b64encode("%s:%s" % (self.http_username, self.http_password))) + req.add_header("Authorization", "Basic %s" + % base64.b64encode("%s:%s" % (self.http_username, self.http_password))) req.add_header("Content-Type", "application/json") try: body = json.dumps(metrics) @@ -970,82 +1041,82 @@ def parse_cmdline(argv): # get arguments parser = OptionParser(description='Manages collectors which gather ' - 'data and report back.') + 'data and report back.') parser.add_option('-c', '--collector-dir', dest='cdir', metavar='DIR', - default=defaults['cdir'], - help='Directory where the collectors are located.') + default=defaults['cdir'], + help='Directory where the collectors are located.') parser.add_option('-d', '--dry-run', dest='dryrun', action='store_true', - default=defaults['dryrun'], - help='Don\'t actually send anything to the TSD, ' + default=defaults['dryrun'], + help='Don\'t actually send anything to the TSD, ' 'just print the datapoints.') parser.add_option('-D', '--daemonize', dest='daemonize', action='store_true', - default=defaults['daemonize'], - help='Run as a background daemon.') + default=defaults['daemonize'], + help='Run as a background daemon.') parser.add_option('-H', '--host', dest='host', - metavar='HOST', - default=defaults['host'], - help='Hostname to use to connect to the TSD.') + metavar='HOST', + default=defaults['host'], + help='Hostname to use to connect to the TSD.') parser.add_option('-L', '--hosts-list', dest='hosts', - metavar='HOSTS', - default=defaults['hosts'], - help='List of host:port to connect to tsd\'s (comma separated).') + metavar='HOSTS', + default=defaults['hosts'], + help='List of host:port to connect to tsd\'s (comma separated).') parser.add_option('--no-tcollector-stats', dest='no_tcollector_stats', - action='store_true', - default=defaults['no_tcollector_stats'], - help='Prevent tcollector from reporting its own stats to TSD') + action='store_true', + default=defaults['no_tcollector_stats'], + help='Prevent tcollector from reporting its own stats to TSD') parser.add_option('-s', '--stdin', dest='stdin', action='store_true', - default=defaults['stdin'], - help='Run once, read and dedup data points from stdin.') + default=defaults['stdin'], + help='Run once, read and dedup data points from stdin.') parser.add_option('-p', '--port', dest='port', type='int', - default=defaults['port'], metavar='PORT', - help='Port to connect to the TSD instance on. ' - 'default=%default') + default=defaults['port'], metavar='PORT', + help='Port to connect to the TSD instance on. ' + 'default=%default') parser.add_option('-v', dest='verbose', action='store_true', - default=defaults['verbose'], - help='Verbose mode (log debug messages).') + default=defaults['verbose'], + help='Verbose mode (log debug messages).') parser.add_option('-t', '--tag', dest='tags', action='append', - default=defaults['tags'], metavar='TAG', - help='Tags to append to all timeseries we send, ' - 'e.g.: -t TAG=VALUE -t TAG2=VALUE') + default=defaults['tags'], metavar='TAG', + help='Tags to append to all timeseries we send, ' + 'e.g.: -t TAG=VALUE -t TAG2=VALUE') parser.add_option('-P', '--pidfile', dest='pidfile', - default=defaults['pidfile'], - metavar='FILE', help='Write our pidfile') + default=defaults['pidfile'], + metavar='FILE', help='Write our pidfile') parser.add_option('--dedup-interval', dest='dedupinterval', type='int', - default=defaults['dedupinterval'], metavar='DEDUPINTERVAL', - help='Number of seconds in which successive duplicate ' + default=defaults['dedupinterval'], metavar='DEDUPINTERVAL', + help='Number of seconds in which successive duplicate ' 'datapoints are suppressed before sending to the TSD. ' 'Use zero to disable. ' 'default=%default') parser.add_option('--dedup-only-zero', dest='deduponlyzero', action='store_true', - default=defaults['deduponlyzero'], - help='Only dedup 0 values.') + default=defaults['deduponlyzero'], + help='Only dedup 0 values.') parser.add_option('--evict-interval', dest='evictinterval', type='int', - default=defaults['evictinterval'], metavar='EVICTINTERVAL', - help='Number of seconds after which to remove cached ' + default=defaults['evictinterval'], metavar='EVICTINTERVAL', + help='Number of seconds after which to remove cached ' 'values of old data points to save memory. ' 'default=%default') parser.add_option('--allowed-inactivity-time', dest='allowed_inactivity_time', type='int', - default=ALLOWED_INACTIVITY_TIME, metavar='ALLOWEDINACTIVITYTIME', - help='How long to wait for datapoints before assuming ' - 'a collector is dead and restart it. ' - 'default=%default') + default=ALLOWED_INACTIVITY_TIME, metavar='ALLOWEDINACTIVITYTIME', + help='How long to wait for datapoints before assuming ' + 'a collector is dead and restart it. ' + 'default=%default') parser.add_option('--remove-inactive-collectors', dest='remove_inactive_collectors', action='store_true', default=defaults['remove_inactive_collectors'], help='Remove collectors not sending data ' 'in the max allowed inactivity interval') parser.add_option('--log-stdout', dest='logstdout', action='store_true', help='Send log message to stdout.') parser.add_option('--logfile', dest='logfile', type='str', - default=DEFAULT_LOG, - help='Filename where logs are written to.') - parser.add_option('--reconnect-interval',dest='reconnectinterval', type='int', - default=defaults['reconnectinterval'], metavar='RECONNECTINTERVAL', - help='Number of seconds after which the connection to' + default=DEFAULT_LOG, + help='Filename where logs are written to.') + parser.add_option('--reconnect-interval', dest='reconnectinterval', type='int', + default=defaults['reconnectinterval'], metavar='RECONNECTINTERVAL', + help='Number of seconds after which the connection to' 'the TSD hostname reconnects itself. This is useful' 'when the hostname is a multiple A record (RRDNS).') parser.add_option('--max-tags', dest='maxtags', type=int, default=defaults['maxtags'], - help='The maximum number of tags to send to our TSD Instances') + help='The maximum number of tags to send to our TSD Instances') parser.add_option('--http', dest='http', action='store_true', default=defaults['http'], - help='Send the data via the http interface') + help='Send the data via the http interface') parser.add_option('--http-api-path', dest='http_api_path', type='str', default=defaults['http_api_path'], help='URL path to use for HTTP requests to TSD.') parser.add_option('--http-username', dest='http_username', default=defaults['http_username'], @@ -1085,6 +1156,7 @@ def parse_cmdline(argv): return (options, args) + def daemonize(): """Performs the necessary dance to become a background daemon.""" if os.fork(): @@ -1107,7 +1179,7 @@ def daemonize(): try: os.close(fd) except OSError: # This FD wasn't opened... - pass # ... ignore the exception. + pass # ... ignore the exception. def setup_python_path(collector_dir): @@ -1172,13 +1244,6 @@ def main(argv): for sig in (signal.SIGTERM, signal.SIGINT): signal.signal(sig, shutdown_signal) - # Status server, if it's configured: - if options.monitoring_interface is not None: - status_server = StatusServer(options.monitoring_interface, options.monitoring_port, COLLECTORS) - thread = threading.Thread(target=status_server.serve_forever) - thread.setDaemon(True) # keep thread from preventing shutdown - thread.start() - # at this point we're ready to start processing, so start the ReaderThread # so we can have it running and pulling in data for us reader = ReaderThread(options.dedupinterval, options.evictinterval, options.deduponlyzero, options.namespace_prefix) @@ -1198,10 +1263,18 @@ def splitHost(hostport): host, port = hostport.split(":") return (host, int(port)) return (hostport, DEFAULT_PORT) + options.hosts = [splitHost(host) for host in options.hosts.split(",")] if options.host != "localhost" or options.port != DEFAULT_PORT: options.hosts.append((options.host, options.port)) + # Status server, if it's configured: + if options.monitoring_interface is not None: + status_server = StatusServer(options.monitoring_interface, options.monitoring_port, COLLECTORS, reader) + thread = threading.Thread(target=status_server.serve_forever) + thread.setDaemon(True) # keep thread from preventing shutdown + thread.start() + # and setup the sender to start writing out to the tsd sender = SenderThread(reader, options.dryrun, options.hosts, not options.no_tcollector_stats, tags, options.reconnectinterval, @@ -1221,12 +1294,13 @@ def splitHost(hostport): # We're exiting, make sure we don't leave any collector behind. for col in all_living_collectors(): - col.shutdown() + col.shutdown() LOG.debug('Shutting down -- joining the reader thread.') reader.join() LOG.debug('Shutting down -- joining the sender thread.') sender.join() + def stdin_loop(options, modules, sender, tags): """The main loop of the program that runs when we are in stdin mode.""" @@ -1241,6 +1315,7 @@ def stdin_loop(options, modules, sender, tags): % sum(1 for col in all_living_collectors())) next_heartbeat = now + 600 + def main_loop(options, modules, sender, tags): """The main loop of the program that runs when we're not in stdin mode.""" @@ -1296,15 +1371,16 @@ def load_config_module(name, options, tags): """ if isinstance(name, str): - LOG.info('Loading %s', name) - d = {} - # Strip the trailing .py - module = __import__(name[:-3], d, d) + LOG.info('Loading %s', name) + d = {} + # Strip the trailing .py + module = __import__(name[:-3], d, d) else: if PY3: - module = importlib.reload(name) # pylint: disable=no-member,undefined-variable + module = importlib.reload(name) # pylint: disable=no-member,undefined-variable else: - module = reload(name) # pylint: disable=undefined-variable + # noinspection PyUnresolvedReferences + module = reload(name) # pylint: disable=undefined-variable onload = module.__dict__.get('onload') if isinstance(onload, Callable): try: @@ -1401,10 +1477,10 @@ def shutdown_signal(signum, frame): def kill(proc, signum=signal.SIGTERM): - try: - os.killpg(proc.pid, signum) - except: # pylint: disable=bare-except - LOG.info('already killed: %s', proc.pid) + try: + os.killpg(proc.pid, signum) + except: # pylint: disable=bare-except + LOG.info('already killed: %s', proc.pid) def shutdown(): @@ -1447,7 +1523,7 @@ def reap_children(): # any other status code is an error and is logged. if status == 13: LOG.info('removing %s from the list of collectors (by request)', - col.name) + col.name) col.dead = True elif status != 0: LOG.warning('collector %s terminated after %d seconds with ' @@ -1458,6 +1534,7 @@ def reap_children(): register_collector(Collector(col.name, col.interval, col.filename, col.mtime, col.lastspawn)) + def check_children(options): """When a child process hasn't received a datapoint in a while, assume it's died in some fashion and restart it.""" @@ -1553,15 +1630,15 @@ def spawn_children(): col.killstate = 1 elif col.killstate == 1: LOG.error('error: %s (interval=%d, pid=%d) still not dead, ' - 'SIGKILL sent', - col.name, col.interval, col.proc.pid) + 'SIGKILL sent', + col.name, col.interval, col.proc.pid) kill(col.proc, signal.SIGKILL) col.nextkill = now + 5 col.killstate = 2 else: LOG.error('error: %s (interval=%d, pid=%d) needs manual ' - 'intervention to kill it', - col.name, col.interval, col.proc.pid) + 'intervention to kill it', + col.name, col.interval, col.proc.pid) col.nextkill = now + 300 @@ -1602,8 +1679,8 @@ def populate_collectors(coldir): # this... if col.interval != interval: LOG.error('two collectors with the same name %s and ' - 'different intervals %d and %d', - colname, interval, col.interval) + 'different intervals %d and %d', + colname, interval, col.interval) continue # we have to increase the generation or we will kill @@ -1626,7 +1703,7 @@ def populate_collectors(coldir): for col in all_collectors(): if col.generation < GENERATION: LOG.info('collector %s removed from the filesystem, forgetting', - col.name) + col.name) col.shutdown() to_delete.append(col.name) for name in to_delete: From f7d97a7939a81018ffc85875b1332110b89366a1 Mon Sep 17 00:00:00 2001 From: Jonathan Creasy Date: Wed, 28 Oct 2020 11:17:30 -0400 Subject: [PATCH 2/2] Fixed issue with tests and also allowed procstats to exit gracefully --- collectors/0/procstats.py | 19 +++++++++++-------- tests.py | 3 ++- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/collectors/0/procstats.py b/collectors/0/procstats.py index 0dd8e042..d103d23b 100755 --- a/collectors/0/procstats.py +++ b/collectors/0/procstats.py @@ -142,14 +142,17 @@ def cpus_csets(cpuset_path): def main(): """procstats main loop""" - - f_uptime = open("/proc/uptime", "r") - f_meminfo = open("/proc/meminfo", "r") - f_vmstat = open("/proc/vmstat", "r") - f_stat = open("/proc/stat", "r") - f_loadavg = open("/proc/loadavg", "r") - f_entropy_avail = open("/proc/sys/kernel/random/entropy_avail", "r") - f_interrupts = open("/proc/interrupts", "r") + try: + f_uptime = open("/proc/uptime", "r") + f_meminfo = open("/proc/meminfo", "r") + f_vmstat = open("/proc/vmstat", "r") + f_stat = open("/proc/stat", "r") + f_loadavg = open("/proc/loadavg", "r") + f_entropy_avail = open("/proc/sys/kernel/random/entropy_avail", "r") + f_interrupts = open("/proc/interrupts", "r") + except FileNotFoundError as exc: + utils.err('FileNotFoundError: %s' % (sys.exc_info()[1])) + sys.exit(1) f_scaling = "/sys/devices/system/cpu/cpu%s/cpufreq/%s_freq" f_scaling_min = dict([]) diff --git a/tests.py b/tests.py index b84a7c14..87f323b1 100755 --- a/tests.py +++ b/tests.py @@ -158,7 +158,8 @@ def test_endtoend(self): "a": tcollector.Collector("mycollector", 5, "a.py"), # pylint:disable=no-member "b": tcollector.Collector("second", 3, "b.py"), # pylint:disable=no-member } - server = tcollector.StatusServer("127.0.0.1", 32025, collectors) # pylint:disable=no-member + reader = {} + server = tcollector.StatusServer("127.0.0.1", 32025, collectors, reader) # pylint:disable=no-member # runs in background until test suite exits :( but it works. thread = threading.Thread(target=server.serve_forever) thread.setDaemon(True)