In this article, we will walk through a complete example of an application that listens to a stock market stream API via web-socket and push that data to a Kafka topic.
First of all, let’s take a brief look at the components and technologies that we are going to use in this article.
Spring Integration
Spring integration is a powerful and adapter-based framework that follows the spring programming model and enables lightweight messaging. It is also an integration tool that makes individual systems communicate with each other.
As we mentioned, spring integration is based on the spring programming model so it follows two important concepts:
- Dependency Injection: a technique in which an object provides the dependencies of another object.
- Inversion Of Control: a principle that is letting a container take control of things like creating instances of classes and injecting them whenever it is required.
Spring integration is inspired by the book Enterprise Integration Patterns by Gregor Hohpe and Bobby Woolf. This book can help you better understand of spring integration mechanism.
Binance API
Binance is the most famous exchange in the world. It offers APIs to access stock market data.
In the example that we are going to implement, we will use Kline/Candlestick Streams endpoint that provides data via web-socket.
Detailed information of the API is written below:
- Base address: wss://stream.binance.com:9443
- The endpoint that we are using in this example: @kline_interval
You can find API documentation at this link.
Let’s get into implementation.
Implementation
First, we have to initialize a project with the required dependencies. You can use start.spring.io or others ways to initialize a spring-based project.
Required Dependencies
Add the following dependencies to your pom.xml or you can add them while initializing the project.
org.springframework.boot
spring-boot-starter-integration
org.springframework.boot
spring-boot-starter-websocket
org.springframework.integration
spring-integration-websocket
org.springframework.integration
spring-integration-kafka
Required Entities
Now, we have to create a class to map incoming data from the no.1 Kline/Candlestick Streams endpoint.
import lombok.Data;
import com.fasterxml.jackson.annotation.JsonProperty;
@Data
public class Kline {
@JsonProperty("t")
private Long klineStartTime;
@JsonProperty("T")
private Long klineCloseTime;
@JsonProperty("s")
private String symbol;
@JsonProperty("i")
private String interval;
@JsonProperty("o")
private String openPrice;
@JsonProperty("c")
private String closePrice;
@JsonProperty("h")
private String highPrice;
@JsonProperty("l")
private String lowPrice;
@JsonProperty("x")
private Boolean isClosed;
}
We created a class containing fields that match with Kline/Candlestick Streams endpoint response object k parameters.
Resource service.xml
We created a class containing fields that match with Kline/Candlestick Streams endpoint response object k parameters.
The service.xml base structure :
First of all, we have to create a web-socket client bean and an inbound-channel adapter in service.xml to be able to listen to Kline/Candlestick Streams endpoint.
Consider we want to listen to BNB/USDT stream data. So we can have the code below:
The URL that we are listening to is:
wss://stream.binance.com:9443/ws/bnbusdt@kline_1m
Now we can create a chain to process incoming data. The first thing to create in the chain is a transformer called MarketDataTransformer. In this transformer, we are going to map each incoming object to a Kline object.
MarketDataTransformer Component
As you can see in service.xml we have been created a transformer called marketDataTransformer. Now we have to create a transformer class called MarketDataTransformer. This class is responsible to parse the incoming data and map it to Kline class.
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.trade.model.Kline;
import lombok.extern.slf4j.Slf4j;
import org.json.JSONObject;
import org.springframework.integration.annotation.Transformer;
import org.springframework.stereotype.Component;
import java.util.Map;
@Slf4j
@Component
public class MarketDataTransformer {
private final ObjectMapper objectMapper;
public MarketDataTransformer(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
@Transformer
public Kline transform(Map candle) throws JsonProcessingException {
Map klineData = (Map) candle.get("k");
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
Kline kline = objectMapper.readValue(new JSONObject(klineData).toString() , Kline.class);
log.info("Kline successfully mapped [INTERVAL : " + kline.getI() + "]");
return kline;
}
}
Publish Kline Objects into a Kafka Topic
Now we have kline objects ready to publish into Kafka topic. We are not going to Kafka configuration details in this article. Refer to this link for more information about Kafka configuration.
First, we have to create a chain and a transformer just like the MarketDataTransformer.
Kafka Java Configuration
The code below is the configuration for connecting to Kafka using java.
import com.trade.model.Kline;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaConfiguration {
private static final String BOOTSTRAP_SERVER = "localhost:9092";
@Bean
public ProducerFactory producerFactory(){
Map conf = new HashMap<>();
conf.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG , BOOTSTRAP_SERVER);
conf.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG , StringSerializer.class);
conf.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(conf);
}
@Bean
public KafkaTemplate kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
}
KafkaPublisherTransformer Component
Now we have to create a transformer class called KafkaPublisherTransformer. This class is responsible to publish kline objects to Kafka topic.
import com.trade.model.Kline;
import lombok.extern.slf4j.Slf4j;
import org.springframework.integration.annotation.Transformer;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class KafkaPublisherTransformer {
// your own topic name
private static final String TOPIC = "market-stream-data";
private final KafkaTemplate template;
public KafkaPublisherTransformer(KafkaTemplate template) {
this.template = template;
}
@Transformer
public Object transform(Kline kline){
template.send(TOPIC , kline);
log.info("Published into kafka");
return "";
}
}
Notice that we are passing data through channels to different components. We connected components with input and output channels.
That’s it. We have been integrated binance API and Kafka using spring integration.
We didn’t cover all the details about spring integration in this article. It was just an example to understand the mechanism of this powerful framework.
Book Recommendation:
These books can help better understand spring integration:
Spring integration in action by: Mark Fisher, Jonas Partner, Marius Bogoevici, and Iwein Fuld.
Enterprise Integration Patterns by: Gregor Hohpe and Bobby Woolf.
Source Code:
You can find full implementation in my git repository.
میانگین آرا: 0 / 5. شمارش رایها: 0