From ecb476779dbab031a6780a8cab58cebaa640af6c Mon Sep 17 00:00:00 2001
From: mirwaisx <57108408+mirwaisx@users.noreply.github.com>
Date: Tue, 11 Jul 2023 15:42:10 +0200
Subject: [PATCH] auto topic creation enabled

---
 kq/pusher.go | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git a/kq/pusher.go b/kq/pusher.go
index f2c02fe..3956486 100644
--- a/kq/pusher.go
+++ b/kq/pusher.go
@@ -27,10 +27,13 @@ type (
 
 func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher {
 	producer := &kafka.Writer{
-		Addr:        kafka.TCP(addrs...),
-		Topic:       topic,
+		Addr:  kafka.TCP(addrs...),
+		Topic: topic,
+		//todo move the follwoing to config kpusherConfig?
 		Balancer:    &kafka.LeastBytes{},
 		Compression: kafka.Snappy,
+		//if this is not set, the writer will not create a nonexistent topic
+		AllowAutoTopicCreation: true,
 	}
 	pusher := &Pusher{
 		produer: producer,
@@ -53,7 +56,7 @@ func (p *Pusher) Close() error {
 	if p.executor != nil {
 		p.executor.Flush()
 	}
-	
+
 	return p.produer.Close()
 }