diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index 90515db..09cefb0 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -20,3 +20,7 @@ Nebula contributors * **[Viney Yadav](https://github.com/vineyyadav/)** * Fix for #145 + +* **[Paritosh Ramanan](https://github.com/paritoshpr/)** + + * Redis Support \ No newline at end of file diff --git a/functions/docker_engine/docker_engine.py b/functions/docker_engine/docker_engine.py index 2719ed1..0fbf5dd 100644 --- a/functions/docker_engine/docker_engine.py +++ b/functions/docker_engine/docker_engine.py @@ -111,7 +111,10 @@ def pull_image(self, image_name, version_tag="latest"): try: print(image_name) for line in self.cli.pull(image_name, str(version_tag), stream=True): - print(json.dumps(json.loads(line), indent=4)) + try: + print(json.dumps(json.loads(line), indent=4)) + except Exception as e: + print(line) except Exception as e: print(e, file=sys.stderr) print("problem pulling image " + image_name + ":" + str(version_tag)) diff --git a/functions/misc/server.py b/functions/misc/server.py index dc39cc5..7f51d77 100644 --- a/functions/misc/server.py +++ b/functions/misc/server.py @@ -67,7 +67,6 @@ def get_memory_usage(): print("error getting memory usage") os._exit(2) - # return host fqdn def get_fqdn(): try: diff --git a/functions/reporting/redis.py b/functions/reporting/redis.py new file mode 100644 index 0000000..41c823b --- /dev/null +++ b/functions/reporting/redis.py @@ -0,0 +1,47 @@ +import sys, json, redis, pickle + +class RedisConnection: + + def __init__(self, redis_host, redis_port, redis_auth_token, expire_time,topic="nebula-reports"): + self.topic = topic + self.expireTime=None + if expire_time is not None: + self.expireTime = int(expire_time) + self.redisObj = redis.StrictRedis(host=redis_host, port=redis_port, password=redis_auth_token) + + @staticmethod + def on_send_error(excp): + print("Report delivery to redis failed: " + str(excp)) + + def push_report(self, report): + try: + key = self.topic+"_"+str(report["report_creation_time"])+"_"+str(report["device_group"])+"@"+str(report["hostname"]) + self.redisObj.set(key, pickle.dumps(report),ex=self.expireTime,) + except Exception as e: + print(e, file=sys.stderr) + print("Report delivery to redis failed") + + def map_container_id(self,report): + container_info = report["apps_containers"] + for container in container_info: + + try: + container_id = container["id"] + container_name = container["name"].split("-")[0].replace("/","") + container_time = container["read"] + except Exception as e: + print(e, file=sys.stderr) + print("Error reporting containers on host {}".format(report["hostname"])) + continue + + try: + container_map = {} + container_map["name"] = container_name + container_map["time"] = container_time + + #keys can be signed here with timestamp for extra protection + self.redisObj.set("container_"+container_id, pickle.dumps(container_map),) + + except Exception as e: + print(e, file=sys.stderr) + print("Container Map push to redis failed for {}".format(container_name)) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index e457cc8..1c60a40 100644 --- a/requirements.txt +++ b/requirements.txt @@ -25,3 +25,4 @@ toml==0.10.2 urllib3==1.26.6 websocket-client==1.2.1 xmltodict==0.12.0 +redis==3.5.3 diff --git a/worker.py b/worker.py index 01d1b2b..b01ebd3 100644 --- a/worker.py +++ b/worker.py @@ -1,6 +1,7 @@ from NebulaPythonSDK import Nebula from functions.reporting.reporting import * from functions.reporting.kafka import * +from functions.reporting.redis import * from functions.docker_engine.docker_engine import * from functions.misc.server import * from functions.misc.cron_schedule import * @@ -219,6 +220,7 @@ def get_device_group_info(nebula_connection_object, device_group_to_get_info): parser = ParseIt(config_location="config", recurse=True) print("reading config variables") + # the following config variables are for configuring Nebula workers nebula_manager_auth_user = parser.read_configuration_variable("nebula_manager_auth_user", default_value=None) nebula_manager_auth_password = parser.read_configuration_variable("nebula_manager_auth_password", @@ -242,6 +244,13 @@ def get_device_group_info(nebula_connection_object, device_group_to_get_info): # is mandatory reporting_fail_hard = parser.read_configuration_variable("reporting_fail_hard", default_value=True) report_on_update_only = parser.read_configuration_variable("report_on_update_only", default_value=False) + + redis_host = parser.read_configuration_variable("redis_host", default_value=None) + redis_port = parser.read_configuration_variable("redis_port", default_value=None) + redis_auth_token = parser.read_configuration_variable("redis_auth_token", default_value=None) + redis_expire_time = parser.read_configuration_variable("redis_expire_time", default_value=2*nebula_manager_check_in_time) + redis_key_prefix = parser.read_configuration_variable("redis_key_prefix", default_value="nebula-reports") + kafka_bootstrap_servers = parser.read_configuration_variable("kafka_bootstrap_servers", default_value=None) kafka_security_protocol = parser.read_configuration_variable("kafka_security_protocol", default_value="PLAINTEXT") @@ -361,6 +370,21 @@ def get_device_group_info(nebula_connection_object, device_group_to_get_info): print("failed creating reporting kafka connection object - exiting") os._exit(2) + if redis_host is not None: + try: + print("creating reporting redis connection object") + redis_connection = RedisConnection(redis_host, redis_port=redis_port, redis_auth_token=redis_auth_token, + expire_time=redis_expire_time, topic=redis_key_prefix) + except Exception as e: + print(e, file=sys.stderr) + if reporting_fail_hard is False: + print("failed creating reporting redis connection object") + pass + else: + print("failed creating reporting redis connection object - exiting") + os._exit(2) + + if kafka_bootstrap_servers is not None or redis_host is not None: try: reporting_object = ReportingDocument(docker_socket, device_group) except Exception as e: @@ -492,6 +516,21 @@ def get_device_group_info(nebula_connection_object, device_group_to_get_info): print("failed reporting state to kafka - exiting") os._exit(2) + if redis_host is not None: + try: + if monotonic_id_increase is True or report_on_update_only is False: + report = reporting_object.current_status_report(local_device_group_info, monotonic_id_increase) + redis_connection.push_report(report) + redis_connection.map_container_id(report) + except Exception as e: + print(e, file=sys.stderr) + if reporting_fail_hard is False: + print("failed reporting state to redis") + pass + else: + print("failed reporting state to redis - exiting") + os._exit(2) + except Exception as e: print(e, file=sys.stderr) print("failed main loop - exiting")