Kafka endpoint is a FEJ transport adapter that provides connectivity to the Kafka streaming platform. Its implementation is based on the FIX Antenna Java Kafka adapter and it uses the Kafka Producer and Consumer API internally.
Kafka endpoint parameters are described in the conf/kafka-adaptor.properties file. The configuration should include a list of Kafka endpoints and link them to corresponding Kafka topics.
Configuration properties
The Kafka endpoints are configured by means of the following properties:
Property Name | Description | Required | Default Value |
---|---|---|---|
kafka.clients | Comma-delimited list of Kafka endpoints. A separate configuration section for each listed client should be specified | Y | |
Generic Kafka parameters These parameters are applied to all producer and consumer endpoints | |||
kafka.[kafka_properties] | This section may include multiple options that are applied to all producer and consumer endpoints. Any key from Kafka Producer Configs or Kafka Consumer Configs may be used as a [kafka_property]. For example, it's a good section to define a bootstrap server for all endpoints:
| N | |
Generic Kafka Producer parameters | |||
kafka.producer.key.serializer | The serializer class for a record key that implements the com.epam.fej.kafka.KafkaEndpointKeySerializer interface. | N | |
kafka.producer.value.serializer | The serializer class for a record value that implements the com.epam.fej.kafka.KafkaEndpointValueSerializer interface. | N | |
kafka.producer.[kafka_properties] | This section may include multiple options that are applied to all producer endpoints. Any key from Kafka Producer Configs may be used as a [kafka_property]. | N | |
Kafka Producer parameters | |||
kafka.producer.[ClientName].client.id | The ID string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just an ip/port by allowing a logical application name to be included in server-side request logging. | Y | |
kafka.producer.[ClientName].topic | The topic for publishing by this Kafka producer endpoint. | Y | |
kafka.producer.[ClientName].async.sending | Defines the sending mode for this producer endpoint. If the value is true, the endpoint sends data asynchronously. If the value is false, it waits for confirmation from the Kafka broker for every sent message. | N | false |
kafka.producer.[ClientName].key.serializer | The serializer class for a record key that implements the com.epam.fej.kafka.KafkaEndpointKeySerializer interface. | Y, if kafka.producer.key.serializer property is not defined | |
kafka.producer.[ClientName].value.serializer | The serializer class for a record value that implements the com.epam.fej.kafka.KafkaEndpointValueSerializer interface. | Y, if kafka.producer.value.serializer property is not defined | |
kafka.producer.[ClientName].groups | The list of groups for routing | N | |
kafka.producer.[ClientName].[kafka_properties] | This section may include multiple options that are applied to all producer endpoints. Any key from Kafka Producer Configs may be used as a [kafka_property]. | N | |
Generic Kafka Consumer parameters | |||
kafka.consumer.key.deserializer | The deserializer class for a record key that implements the com.epam.fej.kafka.KafkaEndpointKeyDeserializer interface. | N | |
kafka.consumer.value.deserializer | The deserializer class for a record value that implements the com.epam.fej.kafka.KafkaEndpointValueDeserializer interface. | N | |
kafka.consumer.[kafka_properties] | This section may include multiple options that are applied to all producer endpoints. Any key from Kafka Consumer Configs may be used as a [kafka_property]. | N | |
Kafka Consumer parameters | |||
kafka.consumer.[ClientName].client.id | The ID string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just an ip/port by allowing a logical application name to be included in server-side request logging. | Y | |
kafka.consumer.[ClientName].topics | The list of topics to subscribe to | Y, if kafka.consumer.[ClientName].topics.regexp property is not defined | |
kafka.consumer.[ClientName].topics.regexp | The pattern to subscribe to | Y, if kafka.consumer.[ClientName].topics property is not defined | |
kafka.consumer.[ClientName]. | The time, in milliseconds, that a consumer spends waiting in the polling procedure if data is not available in the buffer. If 0, it returns immediately with any records that are available currently in the buffer. Otherwise, it returns empty. The value must not be negative. | N | 200 |
kafka.consumer.[ClientName].key.deserializer | The deserializer class for a record key that implements the com.epam.fej.kafka.KafkaEndpointKeyDeserializer interface. | Y, if kafka.consumer.key.deserializer property is not defined | |
kafka.consumer.[ClientName].value.deserializer | The deserializer class for a record value that implements the com.epam.fej.kafka.KafkaEndpointValueDeserializer interface. | Y, if kafka.consumer.value.deserializer property is not defined | |
kafka.consumer.[ClientName].groups | The list of groups for routing | N | |
kafka.consumer.[ClientName].[kafka_properties] | This section may include multiple options that are applied to all producer endpoints. Any key from Kafka Consumer Configs may be used as a [kafka_property]. | N |
For the full list of Kafka properties refer to the Kafka official documentation.
NOTE: The FIX session can be started or stopped only if scheduling is applied. Otherwise, the session will be inactive. Refer to the Scheduler section where the session start procedure is specified.
NOTE: All changes in the properties file are applied only after FixEdge Java server restart.
Sample configuration
The sample of the simplest config with Kafka producer and consumer endpoints:
# list of kafka endpoints (consumers and producer) kafka.clients = KProducer, KConsumer # main properties to specify producer's id and topic kafka.producer.KProducer.client.id = KProducer kafka.producer.KProducer.topic = PTopic1 # Type is class. Serializer class for key that implements the com.epam.fej.kafka.KafkaEndpointKeySerializer interface. kafka.producer.key.serializer = com.epam.fej.kafka.FIXMessageEventSerializer # Type is class. Serializer class for value that implements the com.epam.fej.kafka.KafkaEndpointValueSerializer interface. kafka.producer.value.serializer = com.epam.fej.kafka.FIXMessageEventSerializer # main properties to specify consumer's id and topic kafka.consumer.KConsumer.client.id = KConsumer kafka.consumer.KConsumer.topics = KTopic2 # Type is class. Deserializer class for key that implements the com.epam.fej.kafka.KafkaEndpointKeyDeserializer interface. kafka.consumer.key.deserializer = org.apache.kafka.common.serialization.StringDeserializer # Type is class. Deserializer class for value that implements the com.epam.fej.kafka.KafkaEndpointValueDeserializer interface. kafka.consumer.value.deserializer = com.epam.fej.kafka.FIXMessageEventDeserializer