diff --git a/kq/pusher.go b/kq/pusher.go index 7f8b13f..b61ce88 100644 --- a/kq/pusher.go +++ b/kq/pusher.go @@ -112,8 +112,13 @@ func (p *Pusher) KPush(ctx context.Context, k, v string) error { // Push sends a message to the Kafka topic. func (p *Pusher) Push(ctx context.Context, v string) error { + return p.PushWithKey(ctx, strconv.FormatInt(time.Now().UnixNano(), 10), v) +} + +// PushWithKey sends a message with the given key to the Kafka topic. +func (p *Pusher) PushWithKey(ctx context.Context, key, v string) error { msg := kafka.Message{ - Key: []byte(strconv.FormatInt(time.Now().UnixNano(), 10)), // current timestamp + Key: []byte(key), Value: []byte(v), }