(Available starting from version 6.10.0 of FIXEdge C++)
Overview
The Kafka Transport Adapter (hereinafter Kafka TA) provides the FIXEdge server connectivity to the Kafka distributed streaming platform.
Internally, its implementation uses the Kafka Consumer and Producer APIs.
The Kafka TA acts as a FIXEdge plugin and establishes a connection to a Kafka single instance or a Kafka cluster.
The Kafka TA transfers FIX messages in raw or serialized formats from the FIXEdge Server to the Kafka platform and vice versa.
Kafka TA performance is close to Kafka native speed of processing. The average FIX messages reading/writing per second is 30,000 or higher.
The Kafka TA automatically reconnects to Kafka if connection to a message broker was unexpectedly terminated. Reconnection is regulated by the reconnect.backoff.ms and reconnect.backoff.max.ms parameters specified in the Configuration parameters section below.
Concept
The Kafka TA is intended to communicate FIX messages to other applications using the Kafka streaming platform as middleware.
Communication is provided between the FIXEdge server and a Kafka broker or cluster on the FIXEdge side.
The Kafka TA allows integration with Kafka via Consumer/Producer API. FIXEdge clients, configured either as producers or consumers, push and pull messages to/from brokers housed in Kafka that organize and store messages according to topics. Brokers commit messages to disk storage for a customizable period of time or space, configurable according to topic. Topics are further broken down into partitions, with each partition acting as a separate commit log. A broker administrator keeps track of consumers when they read partitions by assigning offsets, thereby providing guaranteed-ordering. These features facilitate easy scaling and event replay, as well as the building of fault-tolerant systems.
The Kafka TA differentiates from other Transport Adapters in providing scheduling, custom serialization and deserialization, and durable storage.
The scheme below represents interaction between FIXEdge and Customer via the Kafka solution.
Basic elements of the Kafka connectivity model
- Producer = publisher, sender of messages
- Consumer = subscriber, reader of messages
- Message/event = a single unit of data
- Broker = a server forming a storage layer containing one or more topics
- Topic = ordered collection of events that are organized and stored in a durable way
- Partition = buckets located within topics across which messages are distributed
- Commit log = a record of changes that is committed to disk and can be replayed by consumers
- Guaranteed ordering = a guarantee that a consumer of a given topic-partition will always read that partition's events in exactly the same order as they were written
- Offset = tracking feature that helps consumers remember its position when last reading a partition
Clients behavior
When several clients (either one Producer or one Consumer) are configured for the Kafka TA, all of them establish connection to the Kafka platform when FIXEdge starts its work.
If one of the clients fails to establish connection, the remaining clients are not affected by this failure.
Delivery
The Kafka TA provides guaranteed message delivery.
FIX messages to Kafka via Producer API are sent in the same order in which FIXEdge sends the messages to the Client. FIXEdge receives (?) FIX messages via Consumer API in the same order in which messages arrive at the particular Kafka topic.
When the "Commit" parameter is not configure or set to "Auto" for Consumer, and Kafka sends the message to FIXEdge, then the Kafka TA sends the commit to Kafka when the configured time interval expires.
When the "GroupId" parameter is not configured and its value is to be assigned, the session ID is used as a value of the Kafka TA "GroupId" parameter.
Async message handling
When a message arrives at the particular Kafka topic in the defined order, the messages are received by FIXEdge via Consumer API and the messages are available at the FIXEdge Business Layer in the same order.
When the FIXEdge BL reports to the Kafka TA that the message is delivered and the "Commit" parameter is set to "Sync" for Consumer, and Kafka sends the message to FIXEdge, then the Kafka TA sends the commit to Kafka.
Configuration steps
To set up the Kafka TA for FIXEdge, the following steps have to be executed:
- Copy the FIXEdge distribution package to the FIX Edge home:
Unpack the content of the distribution package to the FIXEdge root directory, e.g. to C:\FIXEdge Enable a transport adapter to be used by FIXEdge:
In the ‘Transport Layer Section’ of the FIXEdge.properties, add the Kafka TA to the list of supported adapters:TransportLayer.TransportAdapters = TransportLayer.KafkaTA
Note: If you use other transport adapters, just add TransportLayer.KafkaTA to the end of the list:
TransportLayer.TransportAdapters = TransportLayer.SmtpTA, TransportLayer.RestTA, TransportLayer.KafkaTA
Configure the Kafka TA by adding the Kafka TA section to the FIXEdge.properties:
TransportLayer.KafkaTA.Description = Kafka Transport Adaptor TransportLayer.KafkaTA.DllName = bin/KafkaTA-vc10-MD-x64.dll TransportLayer.KafkaTA.Sessions = Kafka TransportLayer.KafkaTA.Kafka.bootstrap.servers = localhost:9092 TransportLayer.KafkaTA.Kafka.FIXVersion = FIX44 TransportLayer.KafkaTA.Kafka.Consumer.Commit = Auto TransportLayer.KafkaTA.Kafka.Consumer.Topics = outputTopic TransportLayer.KafkaTA.Kafka.Consumer.group.id = ID TransportLayer.KafkaTA.Kafka.Producer.Topic = topic
Note: Sample settings can be copied to the FIXEdge.properties file from the KafkaTA.properties file (located in the doc folder of the FIXEdge distribution package).
- Configure rules for message routing from the Kafka TA.
The KafkaTA Client is referred to the Business Layer (BL) by ClientID name specified in the FIXEdge.properties file. For Kafka Business Layer configuration sample, refer to the Configuration sample sub-section. - Restart the FIXEdge server to apply the changes.
Configuration parameters
Connection to the Kafka platform can be established when the Kafka TA is properly configured with the Kafka IP address.
Configuration sample
The samples below represent Kafka TA configuration with minimal set of parameters to run the adaptor:
Kafka TA configuration file:
Kafka Business Layer rules:
Logging
Logging messages differ by type:
- INFO
- DEBUG
- WARN
- ERROR
Any configuration action is logged in the "Kafka_TA" log category in the log file via the INFO messages. All the config parameters applied as a result of configuration are logged.
If the Kafka TA fails to apply any config parameter, the WARN message is logged in the log file of the "Kafka_TA" log category that the given parameter is not specified or has a wrong value.
Creation of a client is logged in the "Kafka_TA" category in the log file via the INFO messages.
Creation of a Consumer or Producer is logged in the "Kafka_TA" category in the log file via the DEBUG messages.
An error with a client is logged in the "Kafka_TA" category in the log file via the ERROR messages in the following format:
"Session '<session_name>': [Consumer/Producer]: <error_message>"
Putting an outgoing message in the queue is logged in the "Kafka_TA" category in the log file via the DEBUG messages in the following format:
"Session '<session_name>': Producer: The message is put to the outgoing queue: <message>"
Taking an outgoing message from the queue is logged in the "Kafka_TA" category in the log file via the DEBUG messages in the following format:
"Session '<session_name>': Producer: Sending the message via the producer: <message>"
Scheduling
The schedule settings regulate work of Kafka TA Producer/Consumer. The schedule can be configured as a cron expression or directly tied up to FIXEdge.
If the cron expression is used for scheduling, the Producer/Consumer starts using Client's connection to the Kafka platform at a scheduled time. At a scheduled disconnection time, the Producer/Consumer stops using the Client's connection to Kafka.
If the schedule settings are directly tied up to FIXEdge, the Producer/Consumer starts using Client's connection to the Kafka platform at the moment when the FIXEdge server starts. When the FIXEdge server stops, the Producer/Consumer stops using the Client's connection to Kafka.
Scheduling is represented by the following properties:
- TransportLayer.KafkaTA.<Session>.ConnectTime
- TransportLayer.KafkaTA.<Session>.DisconnectTime
Refer to the table in the Configuration section above.
Custom serialization
When FIX messages are transferred in the raw format (i.e. serialization is not applied), the Kafka TA implies a string containing the tag values converted to strings.
The serialized format of communication is fully supported by the Kafka TA. Custom serialization implies the specially designed external code and includes serialization of all Kafka message parts:
- Key
- Value
- Header(-s)
Serialization on sending
When custom serialization is applied and FIXEdge sends the message to the particular Kafka topic, this message is serialized via the custom serializer and is sent to the particular Kafka topic via Producer API. The following INFO message is logged in the client log category of the log file:
"The message content:
headers: <headers>,
key: <key>,
partition Id: <partition Id>,
value: <value>"
Serialization on receiving
When custom serialization is applied and the new message arrives at the particular Kafka topic, this message is received by FIXEdge via Consumer API. The following DEBUG message is logged in the client log category of the log file:
"The message content:
headers: <headers>,
key: <key>,
partition Id: <partition Id>,
value: <value>"
The message is deserialized via the custom serializer.
Message key processing
In case of the default serialization, the particular tag is configured in the keytag parameter and the specified key is passed to the Kafka platform along with the message sent to the particular Kafka topic.
In case of the custom serialization, the custom plugin produces the key which is passed to the Kafka platform along with the message sent to the particular Kafka topic.
Custom partitioning
If the custom partitioner is configured, the specified key and/or topic should be passed to the FIXEdge Business Layer via the configured tag of a FIX message.
Message Content Wrapping
When BL Client is configured to work with XMLData (213), and FIXEdge sends a message to the particular Kafka topic, and the XMLData (213) field exists in the FIX message and this field is not empty, then the value of the XMLData (213) field is sent to the particular Kafka topic via Producer API.
When BL Client is configured to work with XMLData (213), and FIXEdge sends a message to the particular Kafka topic, and the XMLData (213) field does not exist in the FIX message and this field is empty, then nothing is sent to the particular Kafka topic via Producer API. The corresponding error is sent to the Business Layer.
When BL Client is configured to work with XMLData (213), and Kafka sends a message to FIXEdge, then the FIX message with 35=n (XML message) is received by FIXEdge,
and the XMLData (213) field is filled in with the received message, and the XMLDataLen (212) field is filled in with the received message length.
Troubleshooting
Common troubleshooting issues include:
Failure to initialize
If the adapter fails to start-up, there is an issue with a configuration parameter.
To resolve this issue, the configuration parameter specified in the error message must be corrected. Check the log for error messages.
Session wasn't created
If, during start-up, one session doesn’t load successfully while others are created successfully, there is an issue with a configuration parameter.
To resolve this issue, the configuration parameter specified in the error message must be corrected. Check the log for error messages.
Adapter not sending or receiving messages
If sessions have been created successfully, but the adapter isn’t sending or receiving messages to/from the Kafka server, this issue has most likely occurred due to a problem with the connector.
If the adapter can’t connect to the Kafka server, it will continue to make connection attempts in given intervals until a reason for the error is established. Until this is done, the TA will not be able to send or receive messages, however, this issue will not be clearly stated in the logs.
To establish what the error is, you must enable DEBUG logging as follows:
- Open the FIXEdge.properties file
- Set the parameter Log.DebugIsOn = True
Once this is done, the log will show error messages stating previous connection attempts and the reason for the error. To resolve this issue, correct the configuration issue specified in the error message.
Messages not delivered
If the adapter is receiving messages from the Kafka server, but not delivering them to target sessions, Business Layer rules have either been improperly configured or have not been specified.
In order to see an error message specifying the issue, DEBUG logging must be enabled.
To enable DEBUG logging:
- Open the FIXEdge.properties file
- Set the parameter Log.DebugIsOn = True