From f6db954bfd8877eb31743297241c458505af11da Mon Sep 17 00:00:00 2001 From: Frederic Brin Date: Tue, 5 Jan 2021 11:50:22 +0100 Subject: [PATCH 1/6] Support all protobuf fields v1 * Add specific functions to parse Response, RRs and common fields * Extended use of dnspython lib (especialy for rdata mgmt) --- pdns_protobuf_receiver/receiver.py | 181 ++++++++++++++++++++++++++--- 1 file changed, 166 insertions(+), 15 deletions(-) diff --git a/pdns_protobuf_receiver/receiver.py b/pdns_protobuf_receiver/receiver.py index 32abb7f..d77c149 100644 --- a/pdns_protobuf_receiver/receiver.py +++ b/pdns_protobuf_receiver/receiver.py @@ -23,17 +23,20 @@ # SOFTWARE. import argparse +import binascii import logging import asyncio import socket import json import sys +from datetime import datetime, timezone + import dns.rdatatype +import dns.rdataclass +import dns.rdata import dns.rcode -from datetime import datetime, timezone - # wget https://raw.githubusercontent.com/PowerDNS/dnsmessage/master/dnsmessage.proto # wget https://github.com/protocolbuffers/protobuf/releases/download/v3.12.2/protoc-3.12.2-linux-x86_64.zip # python3 -m pip install protobuf @@ -61,15 +64,165 @@ PBDNSMESSAGE_SOCKETPROTOCOL = {1: "UDP", 2: "TCP"} +PBDNSMESSAGE_POLICYTYPE = { + 1: "UNKNOWN", + 2: "QNAME", + 3: "CLIENTIP", + 4: "RESPONSEIP", + 5: "NSDNAME", + 6: "NSDNAME", +} + + +def get_rdata_attributes(cls, exclude_methods=True): + """ + Extract attributes to be set in rdata Json Dict + + Extract from dnspython class the attributes that + will be used to populate the rdata record + """ + base_attrs = dir(type("dummy", (object,), {})) + this_cls_attrs = dir(cls) + res = [] + for attr in this_cls_attrs: + if base_attrs.count(attr) or (callable(getattr(cls, attr)) and exclude_methods): + continue + if attr in ["rdclass", "rdtype", "__slots__"]: + continue + res += [attr] + return res + + +def parse_pb_msg(dns_pb2, dns_msg): + """ + Parse Common Fields in Protobuf PowerDNS Messages + """ + pb_fields_map = { + "type": ["dns_message", PBDNSMESSAGE_TYPE], # 1 + "messageId": "message_id", # 2 + "serverIdentity": "server_identity", # 3 + "socketFamily": ["socket_family", PBDNSMESSAGE_SOCKETFAMILY], # 4 + "socketProtocol": ["socket_protocol", PBDNSMESSAGE_SOCKETPROTOCOL], # 5 + "inBytes": "bytes", # 8 + "id": "dns_id", # 11 + # originalRequestorSubnet # 14 + # requestorId # 15 + "initialRequestId": "initial_request_id", # 16 + "deviceId": "device_id", # 17 + "newlyObservedDomain": "nod", # 18 + "deviceName": "device_name", # 19 + "fromPort": "from_port", # 20 + "toPort": "to_port", # 21 + } + + # print(dns_pb2) + for key, val in pb_fields_map.items(): + if dns_pb2.HasField(key): + if isinstance(val, str): + if key in {"messageId", "initialRequestId"}: + dns_msg[val] = binascii.hexlify( + bytearray(getattr(dns_pb2, key)) + ).decode() + else: + res = getattr(dns_pb2, key) + if isinstance(res, bytes): + dns_msg[val] = res.decode() + else: + dns_msg[val] = res + else: + dns_msg[val[0]] = val[1][getattr(dns_pb2, key)] + + +def parse_pb_msg_query(dns_pb2, dns_msg): + """ + Parse RRS Fields in Protobuf PowerDNS Messages + """ + pb_fields_map = { + } + +def parse_pb_msg_response(dns_pb2, dns_msg): + """ + Parse Response Fields in Protobuf PowerDNS Messages + """ + pb_fields_map = { + "rcode": "return_code", # 1 + "appliedPolicy": "applied_policy", # 3 + "tags": "tags", # 4 + "appliedPolicyType": ["applied_policy_type", PBDNSMESSAGE_POLICYTYPE], # 7 + "appliedPolicyTrigger": "applied_policy_trigger", # 8 + "appliedPolicyHit": "applied_policy_hit", # 9 + } + + dns_msg["response"] = {} + resp = dns_msg["response"] + + for key, val in pb_fields_map.items(): + if key == "tags": + try: + tags = [] + for i in getattr(dns_pb2.response, val): + tags.append(i) + if len(tags) > 0: + resp["tags"] = tags + except AttributeError: + pass + elif key == "rcode": + if dns_pb2.response.rcode == 65536: + dns_msg["response"]["return_code"] = "NETWORK_ERROR" + else: + dns_msg["response"]["return_code"] = dns.rcode.to_text(dns_pb2.response.rcode) + elif dns_pb2.response.HasField(key): + if isinstance(val, str): + res = getattr(dns_pb2.response, key) + resp[val] = res + else: + resp[val[0]] = val[1][getattr(dns_pb2.response, key)] + + +def parse_pb_msg_rrs(dns_pb2, dns_msg): + """ + Parse RRS Fields in Protobuf PowerDNS Messages + """ + pb_fields_map = { + "name": "name", # 1 + "type": "type", # 2 + "class": "class", # 3 + "ttl": "ttl", # 4 + "rdata": "rdata", # 5 + "udr": "udr", # 6 + } + + rrs = [] + + for rr in dns_pb2.response.rrs: + rr_dict = {} + for key, val in pb_fields_map.items(): + res = getattr(rr, key) + if key == "rdata": + rr_dict[val] = {} + rdata = dns.rdata.from_wire( + rr_dict["class"], rr_dict["type"], res, 0, len(res) + ) + for k in get_rdata_attributes(rdata): + rr_dict[val][k] = getattr(rdata, k) + elif key == "class": + rr_dict[val] = dns.rdataclass.to_text(res) + elif key == "type": + rr_dict[val] = dns.rdatatype.to_text(res) + else: + rr_dict[val] = res + rrs.append(rr_dict) + + if len(rrs) > 0: + dns_msg["response"]["rrs"] = rrs + async def cb_onpayload(dns_pb2, payload, tcp_writer, debug_mode, loop): """on dnsmessage protobuf2""" dns_pb2.ParseFromString(payload) dns_msg = {} - dns_msg["dns_message"] = PBDNSMESSAGE_TYPE[dns_pb2.type] - dns_msg["socket_family"] = PBDNSMESSAGE_SOCKETFAMILY[dns_pb2.socketFamily] - dns_msg["socket protocol"] = PBDNSMESSAGE_SOCKETPROTOCOL[dns_pb2.socketProtocol] + parse_pb_msg(dns_pb2, dns_msg) dns_msg["from_address"] = "0.0.0.0" from_addr = getattr(dns_pb2, "from") @@ -110,6 +263,9 @@ async def cb_onpayload(dns_pb2, payload, tcp_writer, debug_mode, loop): time_latency = round(float(time_rsp) - float(time_req), 6) + parse_pb_msg_response(dns_pb2, dns_msg) + parse_pb_msg_rrs(dns_pb2, dns_msg) + dns_msg["query_time"] = datetime.fromtimestamp( float(time_req), tz=timezone.utc ).isoformat() @@ -119,14 +275,9 @@ async def cb_onpayload(dns_pb2, payload, tcp_writer, debug_mode, loop): dns_msg["latency"] = time_latency - dns_msg["query_type"] = dns.rdatatype.to_text(dns_pb2.question.qType) - dns_msg["query_name"] = dns_pb2.question.qName - - if dns_pb2.response.rcode == 65536: - dns_msg["return_code"] = "NETWORK_ERROR" - else: - dns_msg["return_code"] = dns.rcode.to_text(dns_pb2.response.rcode) - dns_msg["bytes"] = dns_pb2.inBytes + dns_msg["query"] = {} + dns_msg["query"]["type"] = dns.rdatatype.to_text(dns_pb2.question.qType) + dns_msg["query"]["name"] = dns_pb2.question.qName dns_json = json.dumps(dns_msg) @@ -138,8 +289,8 @@ async def cb_onpayload(dns_pb2, payload, tcp_writer, debug_mode, loop): # exit if we lost the connection with the remote collector loop.stop() raise Exception("connection lost with remote") - else: - tcp_writer.write(dns_json.encode() + b"\n") + + tcp_writer.write(dns_json.encode() + b"\n") async def cb_onconnect(reader, writer, tcp_writer, debug_mode): From b5db6a0c24f5ccf883e541154838e3a1dbf1a9b4 Mon Sep 17 00:00:00 2001 From: Frederic Brin Date: Tue, 5 Jan 2021 12:27:48 +0100 Subject: [PATCH 2/6] V1.1: Rework layout * Add a dedicated Query Field parser * Move from/to parsing to the Common parser --- pdns_protobuf_receiver/receiver.py | 50 +++++++++++++++++------------- 1 file changed, 29 insertions(+), 21 deletions(-) diff --git a/pdns_protobuf_receiver/receiver.py b/pdns_protobuf_receiver/receiver.py index d77c149..1e350d7 100644 --- a/pdns_protobuf_receiver/receiver.py +++ b/pdns_protobuf_receiver/receiver.py @@ -103,6 +103,8 @@ def parse_pb_msg(dns_pb2, dns_msg): "serverIdentity": "server_identity", # 3 "socketFamily": ["socket_family", PBDNSMESSAGE_SOCKETFAMILY], # 4 "socketProtocol": ["socket_protocol", PBDNSMESSAGE_SOCKETPROTOCOL], # 5 + "from": "from_address", # 6 + "to": "to_address", # 7 "inBytes": "bytes", # 8 "id": "dns_id", # 11 # originalRequestorSubnet # 14 @@ -118,7 +120,14 @@ def parse_pb_msg(dns_pb2, dns_msg): # print(dns_pb2) for key, val in pb_fields_map.items(): if dns_pb2.HasField(key): - if isinstance(val, str): + if key in ["from", "to"]: + addr = getattr(dns_pb2, key) + if len(addr) > 0: + if dns_pb2.socketFamily == PBDNSMessage.SocketFamily.INET: + dns_msg[val] = socket.inet_ntop(socket.AF_INET, addr) + if dns_pb2.socketFamily == PBDNSMessage.SocketFamily.INET6: + dns_msg[val] = socket.inet_ntop(socket.AF_INET6, addr) + elif isinstance(val, str): if key in {"messageId", "initialRequestId"}: dns_msg[val] = binascii.hexlify( bytearray(getattr(dns_pb2, key)) @@ -135,11 +144,28 @@ def parse_pb_msg(dns_pb2, dns_msg): def parse_pb_msg_query(dns_pb2, dns_msg): """ - Parse RRS Fields in Protobuf PowerDNS Messages + Parse Query Fields in Protobuf PowerDNS Messages """ pb_fields_map = { + "qName": "name", + "qType": "type", + "qClass": "class" } + query = dns_pb2.question + query_d = {} + for key, val in pb_fields_map.items(): + if query.HasField(key): + if key == "qType": + query_d[val] = dns.rdatatype.to_text(getattr(query, key)) + elif key == "qClass": + query_d[val] = dns.rdataclass.to_text(getattr(query, key)) + else: + query_d[val] = getattr(query, key) + + dns_msg["query"] = query_d + + def parse_pb_msg_response(dns_pb2, dns_msg): """ Parse Response Fields in Protobuf PowerDNS Messages @@ -224,22 +250,6 @@ async def cb_onpayload(dns_pb2, payload, tcp_writer, debug_mode, loop): dns_msg = {} parse_pb_msg(dns_pb2, dns_msg) - dns_msg["from_address"] = "0.0.0.0" - from_addr = getattr(dns_pb2, "from") - if len(from_addr): - if dns_pb2.socketFamily == PBDNSMessage.SocketFamily.INET: - dns_msg["from_address"] = socket.inet_ntop(socket.AF_INET, from_addr) - if dns_pb2.socketFamily == PBDNSMessage.SocketFamily.INET6: - dns_msg["from_address"] = socket.inet_ntop(socket.AF_INET6, from_addr) - - dns_msg["to_address"] = "0.0.0.0" - to_addr = getattr(dns_pb2, "to") - if len(to_addr): - if dns_pb2.socketFamily == PBDNSMessage.SocketFamily.INET: - dns_msg["to_address"] = socket.inet_ntop(socket.AF_INET, to_addr) - if dns_pb2.socketFamily == PBDNSMessage.SocketFamily.INET6: - dns_msg["to_address"] = socket.inet_ntop(socket.AF_INET6, to_addr) - time_req = 0 time_rsp = 0 time_latency = 0 @@ -275,9 +285,7 @@ async def cb_onpayload(dns_pb2, payload, tcp_writer, debug_mode, loop): dns_msg["latency"] = time_latency - dns_msg["query"] = {} - dns_msg["query"]["type"] = dns.rdatatype.to_text(dns_pb2.question.qType) - dns_msg["query"]["name"] = dns_pb2.question.qName + parse_pb_msg_query(dns_pb2, dns_msg) dns_json = json.dumps(dns_msg) From 4910d5d65f129fe85b7f5ee5e92ecd30b63d1a5c Mon Sep 17 00:00:00 2001 From: Frederic Brin Date: Tue, 5 Jan 2021 12:42:05 +0100 Subject: [PATCH 3/6] Black the code --- pdns_protobuf_receiver/receiver.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/pdns_protobuf_receiver/receiver.py b/pdns_protobuf_receiver/receiver.py index 1e350d7..cfbbd2e 100644 --- a/pdns_protobuf_receiver/receiver.py +++ b/pdns_protobuf_receiver/receiver.py @@ -117,7 +117,7 @@ def parse_pb_msg(dns_pb2, dns_msg): "toPort": "to_port", # 21 } - # print(dns_pb2) + print(dns_pb2) for key, val in pb_fields_map.items(): if dns_pb2.HasField(key): if key in ["from", "to"]: @@ -146,11 +146,7 @@ def parse_pb_msg_query(dns_pb2, dns_msg): """ Parse Query Fields in Protobuf PowerDNS Messages """ - pb_fields_map = { - "qName": "name", - "qType": "type", - "qClass": "class" - } + pb_fields_map = {"qName": "name", "qType": "type", "qClass": "class"} query = dns_pb2.question query_d = {} @@ -196,7 +192,9 @@ def parse_pb_msg_response(dns_pb2, dns_msg): if dns_pb2.response.rcode == 65536: dns_msg["response"]["return_code"] = "NETWORK_ERROR" else: - dns_msg["response"]["return_code"] = dns.rcode.to_text(dns_pb2.response.rcode) + dns_msg["response"]["return_code"] = dns.rcode.to_text( + dns_pb2.response.rcode + ) elif dns_pb2.response.HasField(key): if isinstance(val, str): res = getattr(dns_pb2.response, key) From 6d979d9a338a211f12d3d27cde10c4408ee52c61 Mon Sep 17 00:00:00 2001 From: Frederic Brin Date: Tue, 5 Jan 2021 15:54:33 +0100 Subject: [PATCH 4/6] Support originalRequestorSubnet and requestorId originalRequestorSubnet support is loosely based on ProtobufLogger.py in PDNS source tree. --- pdns_protobuf_receiver/receiver.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/pdns_protobuf_receiver/receiver.py b/pdns_protobuf_receiver/receiver.py index cfbbd2e..22d7422 100644 --- a/pdns_protobuf_receiver/receiver.py +++ b/pdns_protobuf_receiver/receiver.py @@ -107,8 +107,8 @@ def parse_pb_msg(dns_pb2, dns_msg): "to": "to_address", # 7 "inBytes": "bytes", # 8 "id": "dns_id", # 11 - # originalRequestorSubnet # 14 - # requestorId # 15 + "originalRequestorSubnet": "original_requestor_subnet", # 14 + "requestorId": "requestor_id", # 15 "initialRequestId": "initial_request_id", # 16 "deviceId": "device_id", # 17 "newlyObservedDomain": "nod", # 18 @@ -117,7 +117,7 @@ def parse_pb_msg(dns_pb2, dns_msg): "toPort": "to_port", # 21 } - print(dns_pb2) + # print(dns_pb2) for key, val in pb_fields_map.items(): if dns_pb2.HasField(key): if key in ["from", "to"]: @@ -127,6 +127,12 @@ def parse_pb_msg(dns_pb2, dns_msg): dns_msg[val] = socket.inet_ntop(socket.AF_INET, addr) if dns_pb2.socketFamily == PBDNSMessage.SocketFamily.INET6: dns_msg[val] = socket.inet_ntop(socket.AF_INET6, addr) + elif key == "originalRequestorSubnet": + ors = getattr(dns_pb2, key) + if len(ors) == 4: + dns_msg[val] = socket.inet_ntop(socket.AF_INET, ors) + elif len(ors) == 16: + dns_msg[val] = socket.inet_ntop(socket.AF_INET6, ors) elif isinstance(val, str): if key in {"messageId", "initialRequestId"}: dns_msg[val] = binascii.hexlify( From e711d5b753d3ff8f01503f93cb1f00db3b685fe6 Mon Sep 17 00:00:00 2001 From: Frederic Brin Date: Fri, 8 Jan 2021 14:48:11 +0100 Subject: [PATCH 5/6] Fix rdata parsing Tested on the following query types (PowerDNS recursor export fields): + A + AAAA + CNAME + TXT + MX + NS + PTR + SRV Not tested: 'SPF': is deprecated, I don't know if there is still any usage. cf: https://tools.ietf.org/html/rfc7208#page-11 A small remark: * Transmitted MX & SRV fields are incomplete (Missing priority/ports ... integers). This may need an upstream fix. --- pdns_protobuf_receiver/receiver.py | 37 ++++++++++++++++++++++++++---- 1 file changed, 33 insertions(+), 4 deletions(-) diff --git a/pdns_protobuf_receiver/receiver.py b/pdns_protobuf_receiver/receiver.py index 22d7422..0b789c7 100644 --- a/pdns_protobuf_receiver/receiver.py +++ b/pdns_protobuf_receiver/receiver.py @@ -230,11 +230,40 @@ def parse_pb_msg_rrs(dns_pb2, dns_msg): res = getattr(rr, key) if key == "rdata": rr_dict[val] = {} - rdata = dns.rdata.from_wire( - rr_dict["class"], rr_dict["type"], res, 0, len(res) - ) + if rr.type in [dns.rdatatype.A, dns.rdatatype.AAAA]: + rdata = dns.rdata.from_wire( + rr_dict["class"], rr_dict["type"], res, 0, len(res) + ) + else: + try: + rdata = dns.rdata.from_text( + rr_dict["class"], rr_dict["type"], res.decode() + ) + except UnicodeDecodeError: + rdata = dns.rdata.from_wire( + rr_dict["class"], rr_dict["type"], res, 0, len(res) + ) + except dns.exception.SyntaxError as e: + # Fix for MX & SRV as info sent by Recursors does + # not contain the preference/priority/port (int value) + # like "10 mymx.e.com" for MX + # Stays here in case it is fixed upstream + if rr.type == dns.rdatatype.MX: + rr_dict[val]["exchange"] = str(res.decode()) + break + elif rr.type == dns.rdatatype.SRV: + rr_dict[val]["target"] = str(res.decode()) + break + else: + raise e + for k in get_rdata_attributes(rdata): - rr_dict[val][k] = getattr(rdata, k) + if rr.type == dns.rdatatype.TXT: + text_list = getattr(rdata, k) + text_list = [str(i.decode()) for i in text_list] + rr_dict[val][k] = " ".join(text_list) + else: + rr_dict[val][k] = str(getattr(rdata, k)) elif key == "class": rr_dict[val] = dns.rdataclass.to_text(res) elif key == "type": From a4cf405c48f6b5a2a106e2815d5affb3f4eedb80 Mon Sep 17 00:00:00 2001 From: Frederic Brin Date: Thu, 20 May 2021 11:19:27 +0200 Subject: [PATCH 6/6] Fix issue with Protobof Message Version DNSdist is not in line with PDNS Recursor Protobuf Message version. This is require catching the Exception raised (And silently ignored) --- pdns_protobuf_receiver/receiver.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/pdns_protobuf_receiver/receiver.py b/pdns_protobuf_receiver/receiver.py index 0b789c7..3934997 100644 --- a/pdns_protobuf_receiver/receiver.py +++ b/pdns_protobuf_receiver/receiver.py @@ -201,12 +201,18 @@ def parse_pb_msg_response(dns_pb2, dns_msg): dns_msg["response"]["return_code"] = dns.rcode.to_text( dns_pb2.response.rcode ) - elif dns_pb2.response.HasField(key): - if isinstance(val, str): - res = getattr(dns_pb2.response, key) - resp[val] = res - else: - resp[val[0]] = val[1][getattr(dns_pb2.response, key)] + else: + try: + assert dns_pb2.response.HasField(key) + if isinstance(val, str): + res = getattr(dns_pb2.response, key) + resp[val] = res + else: + resp[val[0]] = val[1][getattr(dns_pb2.response, key)] + except AssertionError: + # take into account fields map that may not + # exist due to pb message version + pass def parse_pb_msg_rrs(dns_pb2, dns_msg):