Skip to content

BIE Kafka Client

Cheng edited this page Jul 14, 2023 · 50 revisions

Intro

Status: Currently, this page is a Work-In-Progress Tech Spec. Once implementation is completed, update and transition this page to become documentation for the BIE Kafka Client microservice.

Epic and tickets: Consume BIP contention event streams #1642

Goal: Implement a BIE Kafka Client microservice that other VRO components can use to subscribe to and handle BIE's Event Stream

Diagram

graph TD

subgraph Kafka
    subgraph BIE[BIE env]
        bie-kafka[kafka-server]
    end

    subgraph test_env[Test env]
        mock-kafka[mock-kafka-server]
    end
end

%% kafka-client -->|subscribe| bie-kafka
bie-kafka -.->|event| kafka-client

subgraph VRO
    subgraph svc-bie-kafka
        kafka-client -.-> kafkaEvent-notifier
    end
    subgraph MQ["(Platform) MQ"]
%%        subscribeQ[\subscribe Queue/]
        kafkaEventQ[\X-created kafkaEvent Exchange/]
        kafkaEventQ2[\X-deleted kafkaEvent Exchange/]
    end
    subgraph xample-workflow
%%        subscribe(subscribe to kafka topic)
        event-handler(kafkaEvent-handler) -.-> saveToDB
    end

%%    subscribe --> subscribeQ --> svc-bie-kafka
    kafkaEvent-notifier -.-> kafkaEventQ & kafkaEventQ2 -.-> event-handler
    saveToDB -.-> DB
    DB[("(Platform)\nDB")]
end

DB --> DataDog

style DB fill:#aea,stroke-width:4px
style svc-bie-kafka fill:#AAF,stroke-width:2px,stroke:#777
style xample-workflow fill:#FAA,stroke-width:2px,stroke:#777
style test_env fill:#AAA,stroke-width:2px,stroke:#777
style Kafka fill:#EEE,stroke-width:2px,stroke:#777
style MQ fill:#AA7,stroke-width:2px,stroke:#997
Loading
  • Dotted arrows represent many events flowing, as a result of the topic subscription.
  • There may be one kafkaEvent-handler to handle multiple Kafka event exchanges; or one kafkaEvent-handlers per exchange.

When VRO is deployed to LHDI, VRO uses the respective BIE env (TBD in Establish connectivity between VRO and BIE kafka service environments #1674). When VRO is run locally or as part of integration tests, the mock-kafka-server will be used instead.

Verify connection to Kafka clusters using kcat (a.k.a kafkacat)

  1. visit Lightkeeper-tool and setup lighthouse and kubectl cli tool and get the config file, set an alias for dev env alias kcdev=va-abd-rrd-dev
  2. run kcdev apply -f sleep-pod.yaml to deploy a k8s pod to dev namespace using a manifest file like this one (run kcdev get pods you should see the pod from the list)
apiVersion: v1
kind: Pod
metadata:
  name: sleep-pod
  namespace: va-abd-rrd-dev
spec:
  containers:
    - name: sleep
      image: debian:latest
      command: ["/bin/sleep", "3650d"]
      resources:
        limits:
          cpu: "1"
          memory: "1Gi"
  1. run kcdev exec -it sleep-pod -- /bin/sh to access the pod
  2. run apt-get update && apt-get install kafkacat to install kcat, then exit the pod
  3. copy CA, private key, certificate from here (in VA network) and kafka_config.txt local files into the pod tmp folder e.g. kcdev cp /test.vro.bip.va.gov.pem sleep-pod:/tmp/DEV

kafka_config.txt content

bootstrap.servers=kafka.dev.bip.va.gov:443
security.protocol=SSL
ssl.key.location=test.vro.bip.va.gov.pem
ssl.certificate.location=test.vro.bip.va.gov.crt.pem
ssl.ca.location=VACACerts.pem
  1. access the pod again and cd to tmp folder and run kcat -F kafkacat_config.txt -L
  2. can access topic message by running kcat -F kafkacat_config.txt -L TST_CONTENTION_BIE_CONTENTION_ASSOCIATED_TO_CLAIM_V02 -C

Create keystore and truststore files from pem files

  1. Run this to create the keystore file
openssl pkcs12 -export -in test.vro.bip.va.gov.crt -inkey test.vro.bip.va.gov.key -out dev-keystore.p12 -name kafka-keystore-dev -CAfile VACACerts.pem -caname root
  • test.vro.bip.va.gov.crt certificate file
  • test.vro.bip.va.gov.key private key file
  • VACACerts.pem CA certificates file
  1. Run this to create the truststore file
keytool -import -file VACACerts.pem -alias kafka-truststore-dev -keystore truststore.p12 -storetype pkcs12

svc-bie-kafka

  • When this microservice starts up, it will immediately subscribe to Kafka topics based on a configuration settings.
    • Authenticate to mock BIE's Kafka service; only this microservice holds the credentials and is responsible for authenticating (microservice clients don't need to provide BIE credentials).
  • When Kafka-topic events come in, send a RabbitMQ message into a RabbitMQ exchange with a payload as follows:
# exchange: 'bie-events-contention-associated'
  (async, one-way) payload = {
    "topic": "TST_CONTENTION_BIE_CONTENTION_ASSOCIATED_TO_CLAIM_V02",
    "notifiedAt": "2023-06-12T19:29 UTC+00:00", // when `svc-bie-kafka` received the event
    "event": { ...entire Kafka event object }
  }
# exchange: 'bie-events-contention-deleted'
  (async, one-way) payload = {
    "topic": "TST_CONTENTION_BIE_CONTENTION_DELETED_V02",
    "notifiedAt": "2023-06-12T19:30 UTC+00:00",
    "event": { ...entire Kafka event object }
  }
  • The RabbitMQ exchange name will be apriori constants, so the microservice and its clients know which exchange to use for each Kafka topic.
  • For the RabbitMQ exchange, create a topic exchange (or fanout exchange if desired), which will allow multiple svc-bie-kafka clients to subscribe to the topic.
    • Using routing keys was considered but we decided to implement a more straightforward 1-to-1 mapping between Kafka topic to RabbitMQ exchange.

xample-workflow

  • Implement an example VRO component that subscribes to RabbitMQ exchange(s) upon startup.
    • If using Java, it's encouraged to add a new Camel route to domain-xample/xample-workflows/ that does event-handling when a msg is available in the RabbitMQ exchange.
  • When a Kafka event comes through the RabbitMQ topic exchange, log the event and save it to the DB.
    • Initially, structure the DB entity generically. Iteratively add more specificity when we know what event fields should be stored as DB columns.
-- table: 'bie_event' columns: 
- uuid: "PK: Primary key used to reference this DB record"
- created_at: "Creation time of this DB record"
- notified_at: "When the event was received by VRO"
- event_type (indexed): "Enum: contention-associated, contention-deleted, ..."
- event_details: "JSON text with PII removed"
- event_at: "Event timestamp extracted from event_details"
- benefit_claim_id (indexed): "Benefit Claim ID extracted from event_details"
- contention_id (indexed): "Contention ID extracted from event_details"
- classification (indexed): "Classification of contention extracted from event_details"

-- table: 'diagnostic_code'
- code: "PK: Diagnostic Codes extracted from event_details"

-- many-to-many association table: 'bie_event_to_diagnostic_code'
- bie_event_uuid: "FK: Primary key used to reference this DB record"
- diagnostic_code: "FK: Diagnostic Codes extracted from event_details"
# Follow guidance at https://stackoverflow.com/a/9790225
  • Timestamps without a time zone should be in GMT
  • Create an integration test that registers a Kafka topic and publishes events to the topic, then checks if expected DB entry exists.

Future Improvements

The following possible improvements are not currently part of the Consume BIP contention event streams #1642 epic unless there is sufficient reason to include one or more of them.

  • Resilience: Store Kafka subscription state in Redis for observability.
  • Correctness when scaling: When multiple svc-bie-kafka instances, ensure there are no duplicate notification events in the RabbitMQ exchange.
Clone this wiki locally