-
Notifications
You must be signed in to change notification settings - Fork 171
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Any way allow to customize key of KTable #413
Comments
With KTable, do you mean that another processor will work on the table, or that you want to store the message under a different key in the same processor? For the case of another processor, just emit the message to another topic/table with the new key, like this: goka.NewProcessor(... ,
goka.DefineGroup(... ,
goka.Input("topic", codec, func(ctx goka.Context, msg interface{}){
value := msg.(MessageType)
ctx.Emit("new-key-topic", value.NewKey, msg)
}),
goka.Output("new-key-topic", codec)),
) If it's within the same processor, use the loop-edge, which provides a way to send a message to the same processor using an auto-created topic. goka.NewProcessor(... ,
goka.DefineGroup(... ,goka.Input("topic", codec, func(ctx goka.Context, msg interface{}){
value := msg.(MessageType)
ctx.Loopback(value.NewKey, msg)
}),
goka.Loop(codec, func(ctx goka.Context, msg interface{}){
// here we'll receive msg with the correct key and can simply store it
ctx.SetValue(msg)
}),
goka.Persist(codec),
) The code above is more pseudo code of course, you'd have to take care of actual codecs and topic names and such. Hope that helps, let me know if you have more questions. |
How can I change the key of message when I save a value to KTable in Callback? Or Use another key when Output to other topics?
For Example:
I have a event click topic and key of message in this topic is random number, so I want to transform messages to other topics with the key is user-id which is contained in the message.
The text was updated successfully, but these errors were encountered: