Page tree
Skip to end of metadata
Go to start of metadata

(Available starting from version 6.10.0 of FIXEdge C++)

Overview

The Kafka Transport Adapter (hereinafter Kafka TA) provides the FIXEdge server connectivity to the Kafka distributed streaming platform.

Internally, its implementation uses the Kafka Consumer and Producer APIs.

The Kafka TA acts as a FIXEdge plugin and establishes a connection to a Kafka single instance or a Kafka cluster. The adapter supports TLS connectivity.

The Kafka TA transfers FIX messages in raw or serialized formats from the FIXEdge Server to the Kafka platform and vice versa.

Kafka TA performance is close to Kafka native speed of processing. The average FIX messages reading/writing per second is 30,000 or higher. This number refers to overall throughput capacity for the FIXEdge server with the Kafka TA (from FIX session to Kafka server).

The Kafka TA automatically reconnects to Kafka if connection to a message broker was unexpectedly terminated. Reconnection behavior can be configured by parameters specified in the Configuration parameters section below.

Concept

The Kafka TA is intended to communicate FIX messages to other applications using the Kafka streaming platform as middleware.

Communication is provided between the FIXEdge server and a Kafka broker or cluster on the FIXEdge side.

The Kafka TA allows integration with Kafka via Consumer/Producer API. FIXEdge clients, configured either as producers or consumers, push and pull messages to/from brokers housed in Kafka that organize and store messages according to topics. Brokers commit messages to disk storage for a customizable period of time or space, configurable according to topic. Topics are further broken down into partitions, with each partition acting as a separate commit log. A broker administrator keeps track of consumers when they read partitions by assigning offsets, thereby providing guaranteed-ordering. These features facilitate easy scaling and event replay, as well as the building of fault-tolerant systems.

The Kafka TA differentiates from other Transport Adapters in providing custom serialization/deserialization and durable message storage.

The schema below represents an interaction between FIXEdge and Customer via the Kafka solution.

Basic elements of the Kafka connectivity model

  • Producer = publisher, sender of messages
  • Consumer = subscriber, reader of messages
  • Message/event = a single unit of data
  • Broker = a server forming a storage layer containing one or more topics
  • Topic = ordered collection of events that are organized and stored in a durable way
  • Partition = buckets located within topics across which messages are distributed
  • Commit log = a record of changes that is committed to disk and can be replayed by consumers
  • Guaranteed ordering = a guarantee that a consumer of a given topic-partition will always read that partition's events in exactly the same order as they were written
  • Offset = tracking feature that helps consumers remember its position when last reading a partition

Clients behavior

When several clients (either one Producer or one Consumer, or both one Producer and one Consumer) are configured for the Kafka TA, all of them establish connection to the Kafka platform when FIXEdge starts its work.
If one of the clients fails to establish connection, the remaining clients are not affected by this failure.

Delivery

The Kafka TA provides guaranteed message delivery. The connectivity mechanism preserves FIX messages even if critical situations occur, such as:

  • FIXEdge server is terminated
  • Kafka TA is crashed or not started due to invalid configuration
  • FIX Session is terminated non-gracefully
  • Message Broker is down
  • network disorder occurs

If any unsent messages are found in the Persistent Messages Storage, the preserved messages are processed (before any other runtime-messages) as soon as the FIXEdge server starts and Kafka connection is re-established.

Kafka TA services discipline is FIFO in the scope of one Kafka partition (for messages that come from one client).  FIX messages that come from one FIX session preserve their order when passed to the message broker. FIX messages that arrive from a single queue of the message broker preserve their order in the FIX session.

FIX messages to Kafka via Producer API are sent in the same order in which FIXEdge sends the messages to the Client (sync messaging). FIXEdge fetches FIX messages via Consumer API in the same order in which messages arrive at the particular Kafka topic.

When the "group.id" parameter is not configured and its value is to be assigned, the session ID is used as a value of the Kafka TA "group.id" parameter.

Committing to Kafka

When the "Commit" Kafka TA parameter is not configured or set to "Auto" for Consumer, and Kafka sends the message to FIXEdge, then the Kafka TA sends the commit to Kafka after expiration of the configured time interval.

When the "Commit" Kafka TA parameter is set to "Sync" for Consumer, and Kafka sends the message to FIXEdge, then the Kafka TA sends the commit to Kafka when the FIXEdge BL reports to the Kafka TA that the message is delivered.

CHECK!!! When the "Commit" Kafka TA parameter is set to "Async" for Consumer, and Kafka sends the message to FIXEdge, then the Kafka TA sends the commit to Kafka asynchronously.

Async message handling

Sending FIX messages from FIXEdge to Kafka is regarded as async messaging. Messages are saved to the Persistent Messages Storage.

Sync message handling

Sending FIX messages from Kafka to FIXEdge is regarded as sync messaging. Messages are not saved to any storage, they become available at the FIXEdge Business Layer.  

When a message arrives at the particular Kafka topic in the defined order, the messages are fetched by FIXEdge via Consumer API and the messages are available at the FIXEdge Business Layer in the same order.

Configuration steps

To set up the Kafka TA for FIXEdge, the following steps have to be executed:

  1. Copy the FIXEdge distribution package to the FIX Edge home:
    Unpack the content of the distribution package to the FIXEdge root directory, e.g. to C:\FIXEdge

  2. Enable a transport adapter to be used by FIXEdge:
    In the ‘Transport Layer Section’ of the FIXEdge.properties, add the Kafka TA to the list of supported adapters:

    TransportLayer.TransportAdapters = TransportLayer.KafkaTA

    Note: If you use other transport adapters, just add TransportLayer.KafkaTA to the end of the list:

    TransportLayer.TransportAdapters = TransportLayer.SmtpTA, TransportLayer.RestTA, TransportLayer.KafkaTA
  3. Configure the Kafka TA by adding the Kafka TA section to the FIXEdge.properties:

    TransportLayer.KafkaTA.Description = Kafka Transport Adaptor
    TransportLayer.KafkaTA.DllName = bin/KafkaTA-vc10-MD-x64.dll
    TransportLayer.KafkaTA.Sessions = Kafka
     
    TransportLayer.KafkaTA.Kafka.bootstrap.servers = localhost:9092
    TransportLayer.KafkaTA.Kafka.FIXVersion = FIX44
    
    TransportLayer.KafkaTA.Kafka.Consumer.Commit = Auto
    TransportLayer.KafkaTA.Kafka.Consumer.Topics = outputTopic
    TransportLayer.KafkaTA.Kafka.Consumer.group.id = ID
    
    TransportLayer.KafkaTA.Kafka.Producer.Topic = topic

    Note: Sample settings can be copied to the FIXEdge.properties file from the KafkaTA.properties file (located in the doc folder of the FIXEdge distribution package).

  4. Configure rules for message routing from the Kafka TA.
    The KafkaTA Client is referred to the Business Layer (BL) by ClientID name specified in the FIXEdge.properties file.  For Kafka Business Layer configuration sample, refer to the Configuration sample sub-section.

  5. Restart the FIXEdge server to apply the changes.

Configuration parameters

Connection to the Kafka platform can be established when the Kafka TA is properly configured with the Kafka IP address.

 The Kafka TA is configured by means of the following properties. (Click to expand/collapse)

Property name

Description

RequiredDefault value

Common properties

TransportLayer.KafkaTA.DescriptionAdapter name. YKafka Transport Adaptor
TransportLayer.KafkaTA.DllNameContains path and name of the Kafka adapter DLL. 

Ybin/KafkaTA-vc10-MD-x64.dll
TransportLayer.KafkaTA.Sessions

Comma-separated list of session names.

At least one session should be defined.

Y
TransportLayer.KafkaTA.reconnect.backoff.msDelay after which the Kafka TA starts attempting to re-connect if the connection breaks.

TransportLayer.KafkaTA.reconnect.backoff.max.msMax delay time after which the Kafka TA stops attempts to re-connect if the connection breaks.

TransportLayer.KafkaTA.<Session>.FIXVersion

Version of the FIX protocol.  Acceptable values: 

FIX40, FIX41, FIX42, FIX43, FIX44, FIX50, FIX50SP1, FIX50SP2.


FIX44
TransportLayer.KafkaTA.<Session>.Serializer

Serializer name.  Acceptable values:

  • Raw
  • XmlWrapper
  • JSON

Serializer task can be set from an external plugin.

Format: 

TestSerializer:KafkaSerializer,

where: 

  • TestSerializer is a serializer Class ID inside the plugin
  • KafkaSerializer is a Plugin ID

ID values are set by the Plugin developer.


Raw

TransportLayer.KafkaTA.<Session>.bootstrap.servers

Initial list of brokers.

Y
TransportLayer.KafkaTA.<Session>.security.protocol

Protocol used to communicate with brokers. 

The TLS secured connection is set by the 'TLS' value.


plaintext

TransportLayer.KafkaTA.<Session>.ssl.ca.location

File or directory path to CA certificate(s) for verifying the broker's key.

Example: D:/SSL/root.pem

The parameter is required, if the security protocol is SSL.



Consumer properties

TransportLayer.KafkaTA.<Session>.Consumer.Commit

Commit mode. Acceptable values:

  • Auto   - automatically, on time interval expiration
  • Async  - asynchronously, after each received message
  • Sync   - synchronously, after each received message

Auto
TransportLayer.KafkaTA.<Session>.Consumer.group.id

A unique string that identifies the consumer group the given Consumer belongs to. This property is required if the Consumer uses either the group management functionality by using subscribe (Topic) or the Kafka-based offset management strategy.

NThe default value is equal to the <Session> ID.
TransportLayer.KafkaTA.<Session>.Consumer.Topics

Topics which are listened by the Consumer

Example: outputTopic



Producer properties

TransportLayer.KafkaTA.<Session>.Producer.KeyTag

Tag number in the FIX message. Will be used as a key when the message is sent to KafkaN
TransportLayer.KafkaTA.<Session>.Producer.TopicTopic that the Producer sends FIX messages to.

Configuration sample

The samples below represent Kafka TA configuration with minimal set of parameters to run the adaptor:

Kafka TA configuration file:

KafkaTA.properties
# $Revision: 1.0.0.0 $

#------------------------------------------------------------
# Transport Layer Section
#------------------------------------------------------------

#Comma separated list of identifiers of Transport Adapters should be loaded. 
TransportLayer.TransportAdapters = TransportLayer.KafkaTA

#------------------------------------------------------------
# The Kafka Transport Adaptor (KafkaTA) configuration file.
#------------------------------------------------------------

# Adaptor's name. Property is required 
TransportLayer.KafkaTA.Description = Kafka Transport Adaptor

# Contains path and name of the Kafka adaptor dll. Property is required 
TransportLayer.KafkaTA.DllName = bin/KafkaTA-vc10-MD-x64.dll

# List of adaptor sessions
TransportLayer.KafkaTA.Sessions = Kafka
 
# Following are parameters for each session
TransportLayer.KafkaTA.Kafka.bootstrap.servers = localhost:9092
TransportLayer.KafkaTA.Kafka.FIXVersion = FIX44

TransportLayer.KafkaTA.Kafka.Consumer.Commit = Auto
TransportLayer.KafkaTA.Kafka.Consumer.Topics = outputTopic
TransportLayer.KafkaTA.Kafka.Consumer.group.id = ID

TransportLayer.KafkaTA.Kafka.Producer.Topic = topic

Kafka Business Layer rules:

BL_Config
<FIXEdge>
    <BusinessLayer>

       <Rule>
            <Source>
                <FixSession SenderCompID="SC" TargetCompID="FE"/>
            </Source>
            <Action>
                <Send><Client Name="Kafka"/></Send>
            </Action>
        </Rule>
       <Rule>
            <Source>
                <Client Name="Kafka"/>
            </Source>
            <Action>
                <Send>
                    <FixSession SenderCompID="FE" TargetCompID="SC"/>
                </Send>
            </Action>
        </Rule>

        <DefaultRule>
            <Action>
                <DoNothing/>
            </Action>
        </DefaultRule>

    </BusinessLayer>
</FIXEdge>


Logging

Kafka exhaustive logging means that any action with a configuration parameter is logged.

The list of logged actions:

  • Adapter initialization
  • Parsing validation errors
  • Configuration parameters
  • Sent and received messages

The following levels of logging are used for Kafka:

  • ERROR - is always ON
  • INFO - can be optionally ON 
  • WARN - can be optionally ON 
  • DEBUG - is turned off by default and is used in critical cases

Any configuration event is logged in the "Kafka_TA" log category in the log file via the INFO messages. 

The table below specifies the most common logging messages for Kafka.

Log typeEvent descriptionLogging message format
ERRORClient error"Session '<session_name>': [Consumer/Producer]: <error_message>"
Outgoing message failed to be queued "Session '<session_name>': Producer: Error handling the outgoing message: <native error>. Message: <message>"
Incoming message failed to be handled"Session '<session_name>': Consumer: Error handling the incoming message: <native error>. Message (optional): <message>"
INFOClient creation / any config parameter change
WARNKafka TA fails to apply any config parameter"Session '<session_name>': <warning_message>"
The outgoing message is rejected because the client is not connected to Kafka "Session '<session_name>': Producer: The outgoing message is rejected: <message>" is visible in the "Kafka_TA"
DEBUG

Creation of a Consumer or Producer

"Session '<session_name>': Producer: Error handling the outgoing message: <native error>. Message: <message>"

Putting an outgoing message in the queue"Session '<session_name>': Producer: The message is put to the outgoing queue: <message>"
Taking an outgoing message from the queue "Session '<session_name>': Producer: Sending the message via the producer: <message>"
Incoming message is being received "Session '<session_name>': Consumer: Receiving the message: <message>"

Logging setup

The Kafka TA writes messages to log under the 'Kafka_TA' log category.

By default the Kafka TA puts log messages into the FIXEdge log. Execute the following steps to configure the Kafka TA logging into a separate file:

  1. Open the FIXEdge properties file.
  2. Add 'Log.Kafka_TA.File.Name' property with value - path and name of the Kafka TA log file.

    Log.Kafka_TA.File.Name = Kafka.log
  3. Restart the FIXEdge server.

When the Kafka TA starts, the following record will be written into the log file (xxx is the Kafka TA version):

[NOTE] 20070215-11:20:12.437 - Kafka Adaptor v.xxx started.


Additional information about logging configuration can be found there: 

Custom serialization

When FIX messages are transferred in the raw format (i.e. serialization is not applied), the Kafka TA implies a string containing the tag values converted to strings. 

The serialized format of communication is fully supported by the Kafka TA. Custom serialization implies the specially designed external code and includes serialization of all Kafka message parts:

  • Key
  • Value
  • Header(-s)

Serialization on sending

When custom serialization is applied and FIXEdge sends the message to the particular Kafka topic, this message is serialized via the custom serializer and is sent to the particular Kafka topic via Producer API. The following DEBUG message is logged in the client log category of the log file:

"The message content:
headers: <headers>,
key: <key>,
partition Id: <partition Id>,
value: <value>" 

Serialization on receiving

When custom serialization is applied and the new message arrives at the particular Kafka topic, this message is received by FIXEdge via Consumer API. The following DEBUG message is logged in the client log category of the log file:

"The message content:
headers: <headers>,
key: <key>,
partition Id: <partition Id>,
value: <value>" 

The message is deserialized via the custom serializer.

Message key processing

In case of the default serialization, the Kafka TA produces the key using the value of the tag configured in the "KeyTag" parameter, and the specified key is passed to the Kafka platform along with the message sent to the particular Kafka topic.

In case of the custom serialization, the custom plugin produces the key which is passed to the Kafka platform along with the message sent to the particular Kafka topic.

Custom partitioning

If custom partitioning is configured, the custom plugin produces the partition ID based on tag values of a FIX message.

Message Content Wrapping

When BL Client is configured to work with XMLData (213), and FIXEdge sends a message to the particular Kafka topic, and the XMLData (213) field exists in the FIX message or this field is not empty, then the value of the XMLData (213) field is sent to the particular Kafka topic via Producer API.

When BL Client is configured to work with XMLData (213), and FIXEdge sends a message to the particular Kafka topic, and the XMLData (213) field does not exist in the FIX message or this field is empty, then nothing is sent to the particular Kafka topic via Producer API. The corresponding error is sent to the Business Layer.

When BL Client is configured to work with XMLData (213), and Kafka sends a message to FIXEdge, then the FIX message with 35=n (XML message) is received by FIXEdge, and the XMLData (213) field is filled in with the received message, and the XMLDataLen (212) field is filled in with the received message length.

Troubleshooting

Common troubleshooting issues include:

Failure to initialize

If the adapter fails to start up, there is an issue with a configuration parameter.

To resolve this issue, the configuration parameter specified in the error message must be corrected. Check the log for error messages.

 Click to see an example

In this example, the path specified to the adapter dll is the incorrect one.

2020-11-25 15:51:39,019 UTC INFO [FixLayer_Version] 20936 Module 'FixLayer' version 0.2.1.5 was loaded.
2020-11-25 15:51:39,019 UTC INFO [Engine] 20936 Custom AdminApplication is registered.
2020-11-25 15:51:39,024 UTC ERROR [TransportLayer] 20936 Transport Adaptor 'KafkaTA' has failed to initialize: Error loading DLL 'd:\FIXEdge\bin\KafkaTA-vc10-MD-x64.dll'.
. The specified module could not be found. (Error code = 126)
2020-11-25 15:51:39,024 UTC INFO [TransportLayer] 20936 Module 'TransportLayer' version 0.1.1.5 was loaded.

Session wasn't created

If, during start-up, one session doesn’t load successfully while others are created successfully, there is an issue with a configuration parameter. 

To resolve this issue, the configuration parameter specified in the error message must be corrected. Check the log for error messages.

 Click to see an example

In this example, two sessions ('Kafka' and 'Kafka2') are configured, and one was not created successfully.

In the first session, the wrong parameter, 'protocol', was used instead of the correct parameter, 'security.protocol', and the session was not created.

In the log file an ERROR message saying, "Failed to set protocol..." appears instead of an INFO message saying, "Session 'Kafka': was created successfully".

2020-11-25 16:03:00,882 UTC INFO [Kafka_TA] 7504 process logon for session 'Kafka'
2020-11-25 16:03:00,882 UTC INFO [Kafka_TA] 7504 Session 'Kafka': Is about to be created with parameters:

Consumer.Topics = outputTopic
Consumer.group.id = ID
FIXVersion = FIX44
Producer.Topic = topic
Serializer = Raw
bootstrap.servers = localhost:9092
protocol = SSL

2020-11-25 16:03:00,894 UTC ERROR [Kafka_TA] 7504 Failed to set protocol: No such configuration property: "protocol"
2020-11-25 16:03:00,894 UTC INFO [Kafka_TA] 7504 process logon for session 'Kafka2'
2020-11-25 16:03:00,895 UTC INFO [Kafka_TA] 7504 Session 'Kafka2': Is about to be created with parameters:

Consumer.Topics = topic
Consumer.group.id = ID
FIXVersion = FIX44
Producer.Topic = outputTopic
Serializer = Raw
bootstrap.servers = localhost:9092

2020-11-25 16:03:00,922 UTC INFO [CC_Layer] 7504 Client Kafka2 has logged in

2020-11-25 16:03:00,936 UTC INFO [Kafka_TA] 7504 Session 'Kafka2': was created successfully
2020-11-25 16:03:00,936 UTC INFO [Kafka_TA] 7504 Kafka Adaptor v.0.1 started.

Adapter not sending or receiving messages

If sessions have been created successfully, but the adapter isn’t sending or receiving messages to/from the Kafka server, this issue has most likely occurred due to a problem with the connection. 

If the adapter can’t connect to the Kafka server, it will continue to make connection attempts in given intervals until a reason for the error is established. Until this is done, the TA will not be able to send or receive messages. The default level of logging doesn't explain the reason, you need to enable the deeper level of logging.

To establish what the error is, you must enable the DEBUG logging as follows:

  1. Open the FIXEdge.properties file
  2. Set the parameter Log.DebugIsOn = True

Once this is done, the log will show error messages stating previous connection attempts and the reason for the error. To resolve this issue, correct the configuration issue specified in the error message.

 Click to see an example

In this example, the wrong server port, 9093 (for SSL connection), is configured instead of the correct one, 9092 (for PLAINTEXT connection).

With DEBUG logging enabled, we can see that the adapter is permanently trying to connect to the server, as well as an error message specifying the configuration issue.

2020-11-25 16:22:54,533 UTC   DEBUG   [Kafka_TA]  5808  Session 'Kafka': Producer: librdkafka ERROR: [thrd:app]: rdkafka#producer-2: localhost:9093/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 197ms in state APIVERSION_QUERY)
2020-11-25 16:22:54,534 UTC   DEBUG   [Kafka_TA]  10912  Session 'Kafka': Consumer: librdkafka ERROR: [thrd:localhost:9093/bootstrap]: 1/1 brokers are down
2020-11-25 16:22:54,534 UTC   DEBUG   [Kafka_TA]  22440  Session 'Kafka': Consumer: librdkafka ERROR: [thrd:app]: rdkafka#consumer-1: localhost:9093/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 1086ms in state APIVERSION_QUERY)
2020-11-25 16:22:54,761 UTC   DEBUG   [Kafka_TA]  10912  Session 'Kafka': Consumer: librdkafka ERROR: [thrd:localhost:9093/bootstrap]: 1/1 brokers are down
2020-11-25 16:22:55,033 UTC   DEBUG   [Kafka_TA]  24160  Session 'Kafka': Producer: librdkafka ERROR: [thrd:localhost:9093/bootstrap]: 1/1 brokers are down
2020-11-25 16:23:07,261 UTC   DEBUG   [Kafka_TA]  22440  Session 'Kafka': Consumer: librdkafka ERROR: [thrd:app]: rdkafka#consumer-1: localhost:9093/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 1ms in state APIVERSION_QUERY, 4 identical error(s) suppressed)
2020-11-25 16:23:07,261 UTC   DEBUG   [Kafka_TA]  10912  Session 'Kafka': Consumer: librdkafka ERROR: [thrd:localhost:9093/bootstrap]: 1/1 brokers are down
2020-11-25 16:23:15,441 UTC   DEBUG   [Kafka_TA]  24160  Session 'Kafka': Producer: librdkafka ERROR: [thrd:localhost:9093/bootstrap]: 1/1 brokers are down
2020-11-25 16:23:15,442 UTC   DEBUG   [Kafka_TA]  5808  Session 'Kafka': Producer: librdkafka ERROR: [thrd:app]: rdkafka#producer-2: localhost:9093/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 1ms in state APIVERSION_QUERY, 4 identical error(s) suppressed)

Messages not delivered

If the adapter is receiving messages from the Kafka server, but not delivering them to target sessions, Business Layer rules have either been improperly configured or have not been specified.

In order to see an error message specifying the issue, DEBUG logging must be enabled. 

To enable DEBUG logging:

  1. Open the FIXEdge.properties file
  2. Set the parameter Log.DebugIsOn = True


 Click to see an example

With DEBUG logging enabled, we can see that message was received, but the message processing failed because no targets were configured in the BL. Business Layer rules for the 'Kafka2' session have not been specified.

2020-11-25 16:35:36,545 UTC DEBUG [Kafka_TA] 21964 Session 'Kafka2': Consumer: Receiving the message: 8=FIX.4.49=15235=D49=SC56=FE34=252=20201125-16:35:35.936212=4213=test11=BTC/USD_LimitB_GTC55=BTC/USD54=160=20160401-15:15:58.08038=0.1540=244=630.159=110=078
2020-11-25 16:35:36,545 UTC DEBUG [BL_RoutingTable] 21964 No BL rules found for message with ClientID 'Kafka2', executing DefaultRule.
2020-11-25 16:35:36,546 UTC DEBUG [CC_Layer] 21964 BL has processed a message. Number of client IDs for delivery :0. Number or FIX sessions for delivery :0.. Number or sources identifiers for delivery :0.

2020-11-25 16:35:36,546 UTC DEBUG [Kafka_TA] 21964 Session 'Kafka2': Consumer: Message processing failed
  • No labels