Processing Online Stock Market Data Using Spring Integration + Binance API

Processing Online Stock Market Data Using Spring Integration + Binance API

In this article, we will walk through a complete example of an application that listens to a stock market stream API via web-socket and push that data to a Kafka topic.

First of all, let’s take a brief look at the components and technologies that we are going to use in this article.

Spring Integration

Spring integration is a powerful and adapter-based framework that follows the spring programming model and enables lightweight messaging. It is also an integration tool that makes individual systems communicate with each other.

As we mentioned, spring integration is based on the spring programming model so it follows two important concepts:

  • Dependency Injection: a technique in which an object provides the dependencies of another object.
  • Inversion Of Control: a principle that is letting a container take control of things like creating instances of classes and injecting them whenever it is required.

Spring integration is inspired by the book Enterprise Integration Patterns by Gregor Hohpe and Bobby Woolf. This book can help you better understand of spring integration mechanism.

Binance API

Binance is the most famous exchange in the world. It offers APIs to access stock market data.

In the example that we are going to implement, we will use Kline/Candlestick Streams endpoint that provides data via web-socket.

Detailed information of the API is written below:

  • Base address: wss://stream.binance.com:9443
  • The endpoint that we are using in this example: @kline_interval

You can find API documentation at this link.

Let’s get into implementation.

Implementation

First, we have to initialize a project with the required dependencies. You can use start.spring.io or others ways to initialize a spring-based project.

Required Dependencies

Add the following dependencies to your pom.xml or you can add them while initializing the project.

				
					<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-integration</artifactId>
</dependency>

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

<dependency>
   <groupId>org.springframework.integration</groupId>
   <artifactId>spring-integration-websocket</artifactId>
</dependency>

<dependency>
   <groupId>org.springframework.integration</groupId>
   <artifactId>spring-integration-kafka</artifactId>
</dependency>
				
			

Required Entities

Now, we have to create a class to map incoming data from the no.1 Kline/Candlestick Streams endpoint.

				
					import lombok.Data;
import com.fasterxml.jackson.annotation.JsonProperty;

@Data
public class Kline {

    @JsonProperty("t")
    private Long klineStartTime;
    @JsonProperty("T")
    private Long klineCloseTime;
    @JsonProperty("s")
    private String symbol;
    @JsonProperty("i")
    private String interval;
    @JsonProperty("o")
    private String openPrice;
    @JsonProperty("c")
    private String closePrice;
    @JsonProperty("h")
    private String highPrice;
    @JsonProperty("l")
    private String lowPrice;
    @JsonProperty("x")
    private Boolean isClosed;

}
				
			

We created a class containing fields that match with Kline/Candlestick Streams endpoint response object k parameters.

Resource service.xml

We created a class containing fields that match with Kline/Candlestick Streams endpoint response object k parameters.

The service.xml base structure :

				
					
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-websocket="http://www.springframework.org/schema/integration/websocket"
       xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/websocket
    https://www.springframework.org/schema/integration/websocket/spring-integration-websocket.xsd">

</beans>
				
			

First of all, we have to create a web-socket client bean and an inbound-channel adapter in service.xml to be able to listen to Kline/Candlestick Streams endpoint.

Consider we want to listen to ‌BNB/USDT stream data. So we can have the code below:

				
					<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-websocket="http://www.springframework.org/schema/integration/websocket"
       xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/websocket
    https://www.springframework.org/schema/integration/websocket/spring-integration-websocket.xsd">

    <!--     websocket client bean    -->
    <bean id="webSocketClient" class="org.springframework.web.socket.client.standard.StandardWebSocketClient"/>

    <!--     inbound-channel-adapter && client-container     -->
    <int-websocket:client-container id="clientContainer"
                                    client="webSocketClient"
                                    uri="wss://stream.binance.com:9443/ws/bnbusdt@kline_1m"/>

    <int-websocket:inbound-channel-adapter container="clientContainer"
                                           channel="input"
                                           payload-type="java.util.Map"/>

</beans>
				
			

The URL that we are listening to is:

wss://stream.binance.com:9443/ws/bnbusdt@kline_1m

Now we can create a chain to process incoming data. The first thing to create in the chain is a transformer called MarketDataTransformer. In this transformer, we are going to map each incoming object to a Kline object.

				
					<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-websocket="http://www.springframework.org/schema/integration/websocket"
       xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/websocket
    https://www.springframework.org/schema/integration/websocket/spring-integration-websocket.xsd">

    <!--     websocket client bean    -->
    <bean id="webSocketClient" class="org.springframework.web.socket.client.standard.StandardWebSocketClient"/>

    <!--     inbound-channel-adapter && client-container     -->
    <int-websocket:client-container id="clientContainer"
                                    client="webSocketClient"
                                    uri="wss://stream.binance.com:9443/ws/bnbusdt@kline_1m"/>

    <int-websocket:inbound-channel-adapter container="clientContainer"
                                           channel="input"
                                           payload-type="java.util.Map"/>
                                           
                                         
   <int:chain input-channel="input" output-channel="kafkaInput">

       <!--      only closed candle allowed      -->
       <int:filter expression="payload.k.x == true"/>
     
       <int:transformer id="marketDataTransformer" ref="marketDataTransformer" method="transform"/>

   </int:chain>
        
</beans>
				
			

MarketDataTransformer Component

As you can see in service.xml we have been created a transformer called marketDataTransformer. Now we have to create a transformer class called MarketDataTransformer. This class is responsible to parse the incoming data and map it to Kline class.

				
					import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.trade.model.Kline;
import lombok.extern.slf4j.Slf4j;
import org.json.JSONObject;
import org.springframework.integration.annotation.Transformer;
import org.springframework.stereotype.Component;

import java.util.Map;

@Slf4j
@Component
public class MarketDataTransformer {

    private final ObjectMapper objectMapper;

    public MarketDataTransformer(ObjectMapper objectMapper) {
        this.objectMapper = objectMapper;
    }

    @Transformer
    public Kline transform(Map<String , Object> candle) throws JsonProcessingException {
        Map<String , Object> klineData = (Map<String, Object>) candle.get("k");

        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        Kline kline = objectMapper.readValue(new JSONObject(klineData).toString() , Kline.class);

        log.info("Kline successfully mapped [INTERVAL : " + kline.getI() + "]");

        return kline;
    }
}
				
			

Publish Kline Objects into a Kafka Topic

Now we have kline objects ready to publish into Kafka topic. We are not going to Kafka configuration details in this article. Refer to this link for more information about Kafka configuration.

First, we have to create a chain and a transformer just like the MarketDataTransformer.

				
					<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-websocket="http://www.springframework.org/schema/integration/websocket"
       xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/websocket
    https://www.springframework.org/schema/integration/websocket/spring-integration-websocket.xsd">

    <!--     websocket client bean    -->
    <bean id="webSocketClient" class="org.springframework.web.socket.client.standard.StandardWebSocketClient"/>

    <!--     inbound-channel-adapter && client-container     -->
    <int-websocket:client-container id="clientContainer"
                                    client="webSocketClient"
                                    uri="wss://stream.binance.com:9443/ws/bnbusdt@kline_1m"/>

    <int-websocket:inbound-channel-adapter container="clientContainer"
                                           channel="input"
                                           payload-type="java.util.Map"/>
                                           
                                         
   <int:chain input-channel="input" output-channel="kafkaInput">

       <!--      only closed candle allowed      -->
       <int:filter expression="payload.k.x == true"/>
     
       <int:transformer id="marketDataTransformer" ref="marketDataTransformer" method="transform"/>

   </int:chain>
   
   
   <int:chain input-channel="kafkaInput" output-channel="nullChannel">

        <!--        publish data to kafka topic       -->
        <int:transformer id="kafkaPublisherTransformer" ref="kafkaPublisherTransformer" method="transform"/>

   </int:chain>
        
</beans>
				
			

Kafka Java Configuration

The code below is the configuration for connecting to Kafka using java.

				
					import com.trade.model.Kline;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaConfiguration {

    private static final String BOOTSTRAP_SERVER = "localhost:9092";

    @Bean
    public ProducerFactory<String , Kline> producerFactory(){
        Map<String, Object> conf = new HashMap<>();
        conf.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG , BOOTSTRAP_SERVER);
        conf.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG , StringSerializer.class);
        conf.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , JsonSerializer.class);

        return new DefaultKafkaProducerFactory<>(conf);
    }


    @Bean
    public KafkaTemplate<String , Kline> kafkaTemplate(){
        return new KafkaTemplate<>(producerFactory());
    }
}
				
			

KafkaPublisherTransformer Component

Now we have to create a transformer class called KafkaPublisherTransformer. This class is responsible to publish kline objects to Kafka topic.

				
					import com.trade.model.Kline;
import lombok.extern.slf4j.Slf4j;
import org.springframework.integration.annotation.Transformer;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class KafkaPublisherTransformer {

    // your own topic name
    private static final String TOPIC = "market-stream-data";

    private final KafkaTemplate<String , Kline> template;

    public KafkaPublisherTransformer(KafkaTemplate<String, Kline> template) {
        this.template = template;
    }

    @Transformer
    public Object transform(Kline kline){
        template.send(TOPIC , kline);

        log.info("Published into kafka");

        return "";
    }
}
				
			

Notice that we are passing data through channels to different components. We connected components with input and output channels.

That’s it. We have been integrated binance API and Kafka using spring integration.

We didn’t cover all the details about spring integration in this article. It was just an example to understand the mechanism of this powerful framework.

Book Recommendation:

These books can help better understand spring integration:

Spring integration in action by: Mark Fisher, Jonas Partner, Marius Bogoevici, and Iwein Fuld.

Enterprise Integration Patterns by: Gregor Hohpe and Bobby Woolf.

Source Code:

You can find full implementation in my git repository.


 

Author: Danial Eskandari

میانگین آرا: 0 / 5. شمارش رای‌ها: 0

Leave a Reply

Your email address will not be published. Required fields are marked *