@@ -330,7 +330,7 @@ Create cluster -> Basic
330330-> Begin configuration
331331<br ><br >
332332<img width =" 600 " alt =" image " src =" https://github.com/akhanalcs/dotnet-kafka/assets/30603497/8b188706-57f3-410e-8af9-de32b2aba91b " >
333- <br >< br >
333+ <br >
334334<img width =" 350 " alt =" image " src =" https://github.com/akhanalcs/dotnet-kafka/assets/30603497/ea734e4b-1f4a-4bea-992c-2d03d8d7d021 " >
335335
336336-> Launch cluster
@@ -349,7 +349,7 @@ We will need an API Key to allow applications to access our cluster.
349349
350350-> Create key
351351<br ><br >
352- <img width =" 450 " alt =" image " src =" https://github.com/akhanalcs/dotnet-kafka/assets/30603497/f94892bd-6027-49d7-aa6e-53702ced8a40 " >
352+ <img width =" 400 " alt =" image " src =" https://github.com/akhanalcs/dotnet-kafka/assets/30603497/f94892bd-6027-49d7-aa6e-53702ced8a40 " >
353353
354354Global access -> Next
355355
@@ -363,20 +363,44 @@ Download and save the key somewhere for future use.
363363<p align =" left " >
364364 <img width =" 250 " alt =" image " src =" https://github.com/akhanalcs/dotnet-kafka/assets/30603497/63252f75-54ab-4166-9902-f3635e86ba8f " >
365365  ;
366- <img width =" 450 " alt =" image " src =" https://github.com/akhanalcs/dotnet-kafka/assets/30603497/f4e92617-719a-42d2-a464-99a603094ae7 " >
366+ <img width =" 400 " alt =" image " src =" https://github.com/akhanalcs/dotnet-kafka/assets/30603497/f4e92617-719a-42d2-a464-99a603094ae7 " >
367367</p >
368368
369+ ## Create Topic
370+ [ Reference] ( https://developer.confluent.io/courses/apache-kafka-for-dotnet/producing-messages-hands-on/#create-a-new-topic )
371+
372+ A topic is an immutable, append-only log of events. Usually, a topic is comprised of the same kind of events.
373+
374+ <img width =" 450 " alt =" image " src =" https://github.com/akhanalcs/dotnet-kafka/assets/30603497/8338369a-158d-4932-8d82-3b301a85a628 " >
375+ <br ><br >
376+
377+ Create a new topic, ` RawBiometricsImported ` , which you will use to produce and consume events.
378+
379+ <img width =" 450 " alt =" image " src =" https://github.com/akhanalcs/dotnet-kafka/assets/30603497/324d25d8-a71f-4485-86b0-5470d45370d6 " >
380+
381+ -> Create with defaults
382+
383+ When asked to ** Define a data contract** select ** Skip** for now.
384+
385+ ## Populate config file using API Key file you downloaded earlier
386+ - The Kafka.BootstrapServers is the Bootstrap server in the file.
387+ - The Kafka.SaslUsername is the key value in the file.
388+ - The Kafka.SaslPassword is the secret value in the file.
389+ - SchemaRegistry.URL is the Stream Governance API endpoint url.<br >
390+ <img width =" 250 " alt =" image " src =" https://github.com/akhanalcs/dotnet-kafka/assets/30603497/6a7404d5-b3ec-47b7-9a03-883b53c4ea01 " >
391+ - SchemaRegistry.BasicAuthUserInfo is ` <key>:<secret> ` from the API Key file you downloaded for the Schema Registry.
392+
393+ Store user name and passwords inside ` secrets.json ` and other details in ` appsettings.json ` .
394+
369395## Kafka Messages
370- <img width =" 550 " alt =" image " src =" https://github.com/akhanalcs/dotnet-kafka/assets/30603497/2c56d72f-9788-437f-9715-2de12d2b3731 " >
396+ <img width =" 500 " alt =" image " src =" https://github.com/akhanalcs/dotnet-kafka/assets/30603497/2c56d72f-9788-437f-9715-2de12d2b3731 " >
371397
372398### Event
373399A domain event signals something that has happened in the outside world that is of interest to the application.
374400
375401Events are something that happened in the past. So they are immutable.
376402
377- Use past tense when naming events.
378-
379- For eg: UserCreated, UserAddressChanged etc.
403+ Use past tense when naming events. For eg: ` UserCreated ` , ` UserAddressChanged ` etc.
380404
381405### Kafka Message Example
382406``` cs
@@ -386,14 +410,13 @@ var message = new Message<string, Biometrics>
386410 Value = metrics
387411};
388412```
389- If you care about message ordering, provide key, otherwise it's optional.
390-
413+ If you care about message ordering, provide key, otherwise it's optional.
391414In above example, because we're using ` DeviceId ` as key, all messages of that specific device are handled in order.
392415
393416Value can be a primitive type such as string or some object that can be serialized into formats such as JSON, Avro or Protobuf.
394417
395418## Producing messages to a topic
396- <img width =" 550 " alt =" image " src =" https://github.com/akhanalcs/dotnet-kafka/assets/30603497/1d0706c6-5b6e-465f-ba32-2aef3628dd8d " >
419+ <img width =" 500 " alt =" image " src =" https://github.com/akhanalcs/dotnet-kafka/assets/30603497/1d0706c6-5b6e-465f-ba32-2aef3628dd8d " >
397420
398421You can consider the messages being produced by your system to be just another type of API.
399422
@@ -413,85 +436,55 @@ Some APIs will be consumed through HTTP while others might be consumed through K
413436 // Used to identify the producer.
414437 // In other words, to give it a name.
415438 // Although it's not strictly required, providing a ClientId will make debugging a lot easier.
416- "ClientId" : " my-dotnet-kafka "
439+ "ClientId" : " ClientGateway "
417440 }
418441```
419442
420443Grab the config
421444``` cs
422- var producerConfig = builder .Configuration .GetSection (" KafkaProducer" ).Get <ProducerConfig >();
445+ builder . Services . Configure < ProducerConfig >( builder . Configuration . GetSection ( " KafkaProducer " )); // OR var producerConfig = builder.Configuration.GetSection("KafkaProducer").Get<ProducerConfig>();
423446```
424447
425- Create the Producer
426- ``` cs
427- using var producer = new ProducerBuilder <string , Biometrics >(producerConfig ).Build ();
428- ```
448+ ## Produce messages in your Web API app
449+ Go to the web api you created earlier.
429450
430- Send the message
451+ It will work as a simple REST endpoint that accepts data from a fitness tracker in the form of strings and pushes it to Kafka with no intermediate processing.
452+
453+ In the long run, this may be dangerous because it could allow a malfunctioning device to push invalid data into our stream. We probably want to perform a minimal amount of validation, prior to pushing the data. We'll do that later.
454+
455+ Register an instance of ` IProducer<string, string> ` . We use a singleton because the producer maintains connections that we want to reuse.
431456``` cs
432- var result = await producer .ProduceAsync (BiometricsImportedTopicName , message );
457+ builder .Services .AddSingleton <IProducer <string , string >>(sp =>
458+ {
459+ var config = sp .GetRequiredService <IOptions <ProducerConfig >>();
460+ return new ProducerBuilder <string , Biometrics >(config .Value )
461+ .Build ();
462+ });
433463```
434464
435- The messages aren't necessarily sent immediately.
436- They may be buffered in memory so that multiple messages can be sent as a batch.
437- Once we're sure we want the messages to be sent, it's a good idea to call the Flush method.
438-
465+ Send the message
439466``` cs
467+ var result = await producer .ProduceAsync (biometricsImportedTopicName , message );
440468// Synchronous method, so it will wait for acknowledgement from broker before continuing
441469// It's often better to produce multiple messages into a batch prior to calling Flush.
442470producer .Flush ();
443471```
444472
445- ## Create Topic
446- [ Reference] ( https://developer.confluent.io/courses/apache-kafka-for-dotnet/producing-messages-hands-on/#create-a-new-topic )
447-
448- A topic is an immutable, append-only log of events. Usually, a topic is comprised of the same kind of events.
449-
450- <img width =" 450 " alt =" image " src =" https://github.com/akhanalcs/dotnet-kafka/assets/30603497/8338369a-158d-4932-8d82-3b301a85a628 " >
451-
452- Create a new topic, ` RawBiometricsImported ` , which you will use to produce and consume events.
453-
454- <img width =" 450 " alt =" image " src =" https://github.com/akhanalcs/dotnet-kafka/assets/30603497/324d25d8-a71f-4485-86b0-5470d45370d6 " >
455-
456- -> Create with defaults
457-
458- When asked to ** Define a data contract** select ** Skip** .
459-
460- ## Populate config file using API Key file you downloaded earlier
461- - The Kafka.BootstrapServers is the Bootstrap server in the file.
462- - The Kafka.SaslUsername is the key value in the file.
463- - The Kafka.SaslPassword is the secret value in the file.
464- - SchemaRegistry.URL is the Stream Governance API endpoint url.<br >
465- <img width =" 200 " alt =" image " src =" https://github.com/akhanalcs/dotnet-kafka/assets/30603497/6a7404d5-b3ec-47b7-9a03-883b53c4ea01 " >
466- - SchemaRegistry.BasicAuthUserInfo is ` <key>:<secret> ` from the API Key file you downloaded for the Schema Registry.
467-
468- Store user name and passwords inside ` secrets.json ` and other details in ` appsettings.json ` .
469-
470- ## Produce messages
471- Go to the web api you created earlier.
472-
473- It will work as a simple REST endpoint that accepts data from a fitness tracker in the form of strings and pushes it to Kafka with no intermediate processing.
474-
475- In the long run, this may be dangerous because it could allow a malfunctioning device to push invalid data into our stream. We probably want to perform a minimal amount of validation, prior to pushing the data. We'll do that later.
476-
477- Register an instance of ` IProducer<string, string> ` .
478- We use a singleton because the producer maintains connections that we want to reuse.
479- ``` cs
480- var producerConfig = builder .Configuration .GetSection (" KafkaProducer" ).Get <ProducerConfig >();
481- builder .Services .AddSingleton (new ProducerBuilder <string , string >(producerConfig ).Build ());
482- ```
473+ The messages aren't necessarily sent immediately.
474+ They may be buffered in memory so that multiple messages can be sent as a batch.
475+ Once we're sure we want the messages to be sent, it's a good idea to call the ` .Flush() ` method.
483476
484477### Test it
485478#### Start the app
486- <img width =" 200 " alt =" image " src =" https://github.com/akhanalcs/dotnet-kafka/assets/30603497/f7250701-7d04-4477-bead-6c9c5f83db7e " >
479+ <img width =" 300 " alt =" image " src =" https://github.com/akhanalcs/dotnet-kafka/assets/30603497/f7250701-7d04-4477-bead-6c9c5f83db7e " >
487480
488481#### Send a message to the endpoint through Swagger
489- <img width =" 550 " alt =" image " src =" https://github.com/akhanalcs/dotnet-kafka/assets/30603497/a83c9991-e5b9-4256-863a-a9461490aec6 " >
482+ <img width =" 500 " alt =" image " src =" https://github.com/akhanalcs/dotnet-kafka/assets/30603497/a83c9991-e5b9-4256-863a-a9461490aec6 " >
490483
491484#### Verify it in the cluster
492485Home -> Environments -> kafka-with-dotnet -> cluster_0 -> Topics
493486
494- <img width =" 550 " alt =" image " src =" https://github.com/akhanalcs/dotnet-kafka/assets/30603497/974b3b19-fa64-4402-8e40-8b2b3b48cfc1 " >
487+ <img width =" 500 " alt =" image " src =" https://github.com/akhanalcs/dotnet-kafka/assets/30603497/974b3b19-fa64-4402-8e40-8b2b3b48cfc1 " >
495488
496489## Serialization & Deserialization
497490The message producer is created by providing two types.
@@ -528,19 +521,18 @@ If a matching schema is found,the current message and any future ones that use t
528521
529522<img width =" 550 " alt =" image " src =" https://github.com/akhanalcs/dotnet-kafka/assets/30603497/0448c387-860b-4e1b-863a-5808c65ecd93 " >
530523
531- However, if no matching schema is found,then any messages that use that schema will be rejected.
532- Essentially, an exception is thrown.
533- This ensures that each message going to Kafka matches the required format.
524+ However, if no matching schema is found,then any messages that use that schema will be rejected. Essentially, an exception is thrown. This ensures that each message going to Kafka matches the required format.
534525
535526## Schemas & Serialization
536527### Create a new topic
537528Create a new topic named ` BiometricsImported ` (use the default partitions).
538529
539530Define a data contract -> Create a schema for message values
540531
541- <img width =" 500 " alt =" image " src =" https://github.com/akhanalcs/dotnet-kafka/assets/30603497/bf734225-8385-4252-b79a-6c9334a41ebb " >
532+ <img width =" 400 " alt =" image " src =" https://github.com/akhanalcs/dotnet-kafka/assets/30603497/bf734225-8385-4252-b79a-6c9334a41ebb " >
542533
543534-> Create Schema
535+ <br ><br >
544536
545537<img width =" 600 " alt =" image " src =" https://github.com/akhanalcs/dotnet-kafka/assets/30603497/04a1e110-e193-44c0-b6da-fc3c4f8fae09 " >
546538
@@ -590,11 +582,14 @@ Copy paste the above JSON into https://codebeautify.org/jsonviewer and add your
590582```
591583
592584In above step, essentially you were just mapping these records into a JSON format
593- https://github.com/akhanalcs/dotnet-kafka/blob/e05177c04bc471b17fdb081db8afcc952d3473d5/Producer/Program.cs#L41-L42
585+ ``` cs
586+ record Biometrics (Guid DeviceId , List <HeartRate > HeartRates , int MaxHeartRate );
587+ record HeartRate (DateTime DateTime , int Value );
588+ ```
594589
595590### Connect the app to Schema Registry
596591Add 2 packages:
597- https://github.com/akhanalcs/dotnet-kafka/blob/df7850306df39f57b7f89be05a84077d6a773577/Producer/Producer.csproj#L13C1-L14C92
592+ https://github.com/akhanalcs/dotnet-kafka/blob/df7850306df39f57b7f89be05a84077d6a773577/Producer/Producer.csproj#L13-L14
598593
599594Register an instance of a ` ISchemaRegistryClient ` using a ` new CachedSchemaRegistryClient `
600595https://github.com/akhanalcs/dotnet-kafka/blob/df7850306df39f57b7f89be05a84077d6a773577/Producer/Program.cs#L29-L34
0 commit comments