Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Redis Cache support for worker reports #230

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CONTRIBUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,7 @@ Nebula contributors
* **[Viney Yadav](https://github.com/vineyyadav/)**

* Fix for #145

* **[Paritosh Ramanan](https://github.com/paritoshpr/)**

* Redis Support
1 change: 0 additions & 1 deletion functions/misc/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ def get_memory_usage():
print("error getting memory usage")
os._exit(2)


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PEP8 requires 2 empty lines between functions so this shouldn't be removed

# return host fqdn
def get_fqdn():
try:
Expand Down
22 changes: 22 additions & 0 deletions functions/reporting/redis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import sys, json, redis, pickle

class RedisConnection:

def __init__(self, redis_host, redis_port, redis_auth_token, expire_time,topic="nebula-reports"):
self.topic = topic
self.expireTime=None
if expire_time is not None:
self.expireTime = int(expire_time)
self.redisObj = redis.StrictRedis(host=redis_host, port=redis_port, password=redis_auth_token)

@staticmethod
def on_send_error(excp):
print("Report delivery to redis failed: " + str(excp))

def push_report(self, report):
try:
key = self.topic+"_"+str(report["report_creation_time"])+"_"+str(report["device_group"])+"@"+str(report["hostname"])
self.redisObj.set(key, pickle.dumps(report),ex=self.expireTime,)
except Exception as e:
print(e, file=sys.stderr)
print("Report delivery to redis failed")
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ toml==0.10.2
urllib3==1.26.6
websocket-client==1.2.1
xmltodict==0.12.0
redis==3.5.3
38 changes: 38 additions & 0 deletions worker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from NebulaPythonSDK import Nebula
from functions.reporting.reporting import *
from functions.reporting.kafka import *
from functions.reporting.redis import *
from functions.docker_engine.docker_engine import *
from functions.misc.server import *
from functions.misc.cron_schedule import *
Expand Down Expand Up @@ -219,6 +220,7 @@ def get_device_group_info(nebula_connection_object, device_group_to_get_info):
parser = ParseIt(config_location="config", recurse=True)

print("reading config variables")

# the following config variables are for configuring Nebula workers
nebula_manager_auth_user = parser.read_configuration_variable("nebula_manager_auth_user", default_value=None)
nebula_manager_auth_password = parser.read_configuration_variable("nebula_manager_auth_password",
Expand All @@ -242,6 +244,13 @@ def get_device_group_info(nebula_connection_object, device_group_to_get_info):
# is mandatory
reporting_fail_hard = parser.read_configuration_variable("reporting_fail_hard", default_value=True)
report_on_update_only = parser.read_configuration_variable("report_on_update_only", default_value=False)

redis_host = parser.read_configuration_variable("redis_host", default_value=None)
redis_port = parser.read_configuration_variable("redis_port", default_value=None)
redis_auth_token = parser.read_configuration_variable("redis_auth_token", default_value=None)
redis_expire_time = parser.read_configuration_variable("redis_expire_time", default_value=2*nebula_manager_check_in_time)
redis_key_prefix = parser.read_configuration_variable("redis_key_prefix", default_value="nebula-reports")

kafka_bootstrap_servers = parser.read_configuration_variable("kafka_bootstrap_servers", default_value=None)
kafka_security_protocol = parser.read_configuration_variable("kafka_security_protocol",
default_value="PLAINTEXT")
Expand Down Expand Up @@ -361,6 +370,21 @@ def get_device_group_info(nebula_connection_object, device_group_to_get_info):
print("failed creating reporting kafka connection object - exiting")
os._exit(2)

if redis_host is not None:
try:
print("creating reporting redis connection object")
redis_connection = RedisConnection(redis_host, redis_port=redis_port, redis_auth_token=redis_auth_token,
expire_time=redis_expire_time, topic=redis_key_prefix)
except Exception as e:
print(e, file=sys.stderr)
if reporting_fail_hard is False:
print("failed creating reporting redis connection object")
pass
else:
print("failed creating reporting redis connection object - exiting")
os._exit(2)

if kafka_bootstrap_servers is not None or redis_host is not None:
try:
reporting_object = ReportingDocument(docker_socket, device_group)
except Exception as e:
Expand Down Expand Up @@ -492,6 +516,20 @@ def get_device_group_info(nebula_connection_object, device_group_to_get_info):
print("failed reporting state to kafka - exiting")
os._exit(2)

if redis_host is not None:
try:
if monotonic_id_increase is True or report_on_update_only is False:
report = reporting_object.current_status_report(local_device_group_info, monotonic_id_increase)
redis_connection.push_report(report)
except Exception as e:
print(e, file=sys.stderr)
if reporting_fail_hard is False:
print("failed reporting state to redis")
pass
else:
print("failed reporting state to redis - exiting")
os._exit(2)

except Exception as e:
print(e, file=sys.stderr)
print("failed main loop - exiting")
Expand Down