Skip to content

Question: Trying to find the best way to join multiple topics  #463

Description

@Quinoa3

I have three topics of distinct messages all with the same key of event id.
The goal is to aggregate the messages and when they all arrive to emit a combined message onto a new stream. Currently I have groups and the process Callback defined as follows. All three are similar.

func processPageView(ctx goka.Context, msg interface{}) {
	var pageView *message.PageViewMessage
	pageView = msg.(*message.PageViewMessage)
	baseEventValue := ctx.Join(baseEventTable)
	productInfoValue := ctx.Join(productInfoTable)
	if baseEventValue != nil && productInfoValue != nil {
		fullSignal := &message.FullSignal{}
		fullSignal.ProductInfoMessage = productInfoValue.(*message.ProductInfoMessage)
		fullSignal.BaseEventMessage = baseEventValue.(*message.BaseEventMessage)
		fullSignal.PageViewMessage = pageView
		ctx.Emit(fullSignalStream, ctx.Key(), fullSignal)
	} else {
		ctx.SetValue(pageView)
	}
}

func definePageViewGroup() *goka.GroupGraph {
	group := goka.DefineGroup(baseEventGroup,
		goka.Input(pageViewStream, new(codec.PageViewMessageCodec), processPageView),
		goka.Output(fullSignalStream, new(codec.FullSignalCodec)),
		goka.Join(baseEventTable, new(codec.BaseEventMessageCodec)),
		goka.Join(productInfoTable, new(codec.ProductInfoMessageCodec)),
		goka.Persist(new(codec.PageViewMessageCodec)),
	)
	return group
}

Is there a better way to do this?
My last issues is if one of the messages doesn't arrive how can I still emit a message with the data that does exist?
Thanks

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions