Welcome toVigges Developer Community-Open, Learning,Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
467 views
in Technique[技术] by (71.8m points)

Spring WebFlux with Kafka and Websockets

Right now I have a simple Kafka Consumer and Producer implemented in My SpringBoot application , which works fine what I wanna do next is that my consumer takes the consumed message and directly broadcasts it to all subscribed clients. I figured out that i can not use STOMP Messaging with WebFlux, so how can I accomplish this task , I saw the reactive WebSocket implementation but I did not figure out how I could send my consumed data to my websocket.

That is my simple KafkaProducer:

fun addMessage(message: Message){
        val headers : MutableMap<String, Any> = HashMap()
        headers[KafkaHeaders.TOPIC] = topicName
        kafkaTemplate.send(GenericMessage<Message>(message, headers))
    }

And my simple Consumer looks like this:

@KafkaListener(topics = ["mytopic"], groupId = "test-consumer-group")
    fun receiveData(message:Message) :Message{
        //Take consumed data and send to websocket
    }

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

I would consider to have a Sinks.many().multicast().onBackpressureBuffer() as global intermediate container. Then in your receiveData() you just sink data into that Reactor abstraction.

For your WebSocket connected sessions I would suggest to implement a org.springframework.web.reactive.socket.WebSocketHandler and use Sinks.Many.asFlux() in the WebSocketSession.send(Publisher<WebSocketMessage> messages) API. This way all your sessions are going to consume the same Kafka data as long as they are connected to this WebSocket server.

See more info in docs: https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html#webflux-websockethandler

UPDATE

You can find some sample here: https://github.com/artembilan/sandbox/tree/master/so-65667450


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to Vigges Developer Community for programmer and developer-Open, Learning and Share
...