(Available starting from version 6.10.0 of FIXEdge C++)
Overview
The Kafka Transport Adapter (hereinafter Kafka TA) provides the FIXEdge server connectivity to the Kafka distributed streaming platform.
Internally, its implementation uses the Kafka Consumer and Producer APIs.
The Kafka TA acts as a FIXEdge plugin and establishes a connection to a Kafka single instance or a Kafka cluster. The adapter supports TLS connectivity.
The Kafka TA transfers FIX messages in raw or serialized formats from the FIXEdge Server to the Kafka platform and vice versa.
Kafka TA performance is close to Kafka native speed of processing. The average FIX messages reading/writing per second is 30,000 or higher. This number refers to overall throughput capacity for the FIXEdge server with the Kafka TA (from FIX session to Kafka server).
The Kafka TA automatically reconnects to Kafka if connection to a message broker was unexpectedly terminated. Reconnection behavior can be configured by parameters specified in the Configuration parameters section below.
Concept
The Kafka TA is intended to communicate FIX messages to other applications using the Kafka streaming platform as middleware.
Communication is provided between the FIXEdge server and a Kafka broker or cluster on the FIXEdge side.
The Kafka TA allows integration with Kafka via Consumer/Producer API. FIXEdge clients, configured either as producers or consumers, push and pull messages to/from brokers housed in Kafka that organize and store messages according to topics. Brokers commit messages to disk storage for a customizable period of time or space, configurable according to topic. Topics are further broken down into partitions, with each partition acting as a separate commit log. A broker administrator keeps track of consumers when they read partitions by assigning offsets, thereby providing guaranteed-ordering. These features facilitate easy scaling and event replay, as well as the building of fault-tolerant systems.
The Kafka TA differentiates from other Transport Adapters in providing custom serialization/deserialization and durable message storage.
The schema below represents an interaction between FIXEdge and Customer via the Kafka solution.
Basic elements of the Kafka connectivity model
- Producer = publisher, sender of messages
- Consumer = subscriber, reader of messages
- Message/event = a single unit of data
- Broker = a server forming a storage layer containing one or more topics
- Topic = ordered collection of events that are organized and stored in a durable way
- Partition = buckets located within topics across which messages are distributed
- Commit log = a record of changes that is committed to disk and can be replayed by consumers
- Guaranteed ordering = a guarantee that a consumer of a given topic-partition will always read that partition's events in exactly the same order as they were written
- Offset = tracking feature that helps consumers remember its position when last reading a partition
Clients behavior
When several clients (either one Producer or one Consumer) are configured for the Kafka TA, all of them establish connection to the Kafka platform when FIXEdge starts its work.
If one of the clients fails to establish connection, the remaining clients are not affected by this failure.
Delivery
The Kafka TA provides guaranteed message delivery.
Kafka TA services discipline is FIFO in the scope of one Kafka partition (for messages that come from one client). FIX messages that come from one FIX session preserve their order when passed to the message broker. FIX messages that arrive from a single queue of the message broker preserve their order in the FIX session.
FIX messages to Kafka via Producer API are sent in the same order in which FIXEdge sends the messages to the Client. FIXEdge fetches FIX messages via Consumer API in the same order in which messages arrive at the particular Kafka topic.
When the "Commit" parameter is not configure or set to "Auto" for Consumer, and Kafka sends the message to FIXEdge, then the Kafka TA sends the commit to Kafka when the configured time interval expires.
When the "group.id" parameter is not configured and its value is to be assigned, the session ID is used as a value of the Kafka TA "group.id" parameter.
Async message handling
When a message arrives at the particular Kafka topic in the defined order, the messages are received by FIXEdge via Consumer API and the messages are available at the FIXEdge Business Layer in the same order.
When the FIXEdge BL reports to the Kafka TA that the message is delivered and the "Commit" parameter is set to "Sync" for Consumer, and Kafka sends the message to FIXEdge, then the Kafka TA sends the commit to Kafka.
Configuration steps
To set up the Kafka TA for FIXEdge, the following steps have to be executed:
- Copy the FIXEdge distribution package to the FIX Edge home:
Unpack the content of the distribution package to the FIXEdge root directory, e.g. to C:\FIXEdge Enable a transport adapter to be used by FIXEdge:
In the ‘Transport Layer Section’ of the FIXEdge.properties, add the Kafka TA to the list of supported adapters:TransportLayer.TransportAdapters = TransportLayer.KafkaTA
Note: If you use other transport adapters, just add TransportLayer.KafkaTA to the end of the list:
TransportLayer.TransportAdapters = TransportLayer.SmtpTA, TransportLayer.RestTA, TransportLayer.KafkaTA
Configure the Kafka TA by adding the Kafka TA section to the FIXEdge.properties:
TransportLayer.KafkaTA.Description = Kafka Transport Adaptor TransportLayer.KafkaTA.DllName = bin/KafkaTA-vc10-MD-x64.dll TransportLayer.KafkaTA.Sessions = Kafka TransportLayer.KafkaTA.Kafka.bootstrap.servers = localhost:9092 TransportLayer.KafkaTA.Kafka.FIXVersion = FIX44 TransportLayer.KafkaTA.Kafka.Consumer.Commit = Auto TransportLayer.KafkaTA.Kafka.Consumer.Topics = outputTopic TransportLayer.KafkaTA.Kafka.Consumer.group.id = ID TransportLayer.KafkaTA.Kafka.Producer.Topic = topic
Note: Sample settings can be copied to the FIXEdge.properties file from the KafkaTA.properties file (located in the doc folder of the FIXEdge distribution package).
- Configure rules for message routing from the Kafka TA.
The KafkaTA Client is referred to the Business Layer (BL) by ClientID name specified in the FIXEdge.properties file. For Kafka Business Layer configuration sample, refer to the Configuration sample sub-section. - Restart the FIXEdge server to apply the changes.
Configuration parameters
Connection to the Kafka platform can be established when the Kafka TA is properly configured with the Kafka IP address.
Configuration sample
The samples below represent Kafka TA configuration with minimal set of parameters to run the adaptor:
Kafka TA configuration file:
Kafka Business Layer rules:
Logging
Kafka exhaustive logging means that any action with a configuration parameter is logged.
The list of logged actions:
- Adapter initialization
- Parsing validation errors
- Configuration parameters
- Sent and received messages
The following levels of logging are used for Kafka:
- ERROR - is always ON
- INFO - can be optionally ON
- WARN - can be optionally ON
- DEBUG - is turned off by default and is used in critical cases
Any configuration event is logged in the "Kafka_TA" log category in the log file via the INFO messages.
Log type | Event description | Format |
---|---|---|
ERROR | Client error | "Session '<session_name>': [Consumer/Producer]: <error_message>" |
INFO | Client creation | |
WARN | Kafka TA fails to apply any config parameter | Message that the given parameter is not specified or has a wrong value |
DEBUG | Creation of a Consumer or Producer Putting an outgoing message in the queue Taking an outgoing message from the queue | "Session '<session_name>': Producer: The message is put to the outgoing queue: <message>" "Session '<session_name>': Producer: Sending the message via the producer: <message>" |
Logging setup
The Kafka TA writes messages to log under the 'Kafka_TA' log category.
By default the Kafka TA puts log messages into the FIXEdge log. Execute the following steps to configure the Kafka TA logging into a separate file:
- Open the FIXEdge properties file.
Add 'Log.Kafka_TA.File.Name' property with value - path and name of the Kafka TA log file.
Log.Kafka_TA.File.Name = Kafka.log
Restart the FIXEdge server.
When the Kafka TA starts, the following record will be written into the log file (xxx is the Kafka TA version):
[NOTE] 20070215-11:20:12.437 - Kafka Adaptor v.xxx started.
Additional information about logging configuration can be found there:
- FIXEdge logs format
- How to redirect FIX Antenna and/or FIXEdge logging to Syslog instead of files
- How to divide different categories and severities of log files into different files in the Logging section
Custom serialization
When FIX messages are transferred in the raw format (i.e. serialization is not applied), the Kafka TA implies a string containing the tag values converted to strings.
The serialized format of communication is fully supported by the Kafka TA. Custom serialization implies the specially designed external code and includes serialization of all Kafka message parts:
- Key
- Value
- Header(-s)
Serialization on sending
When custom serialization is applied and FIXEdge sends the message to the particular Kafka topic, this message is serialized via the custom serializer and is sent to the particular Kafka topic via Producer API. The following DEBUG message is logged in the client log category of the log file:
"The message content:
headers: <headers>,
key: <key>,
partition Id: <partition Id>,
value: <value>"
Serialization on receiving
When custom serialization is applied and the new message arrives at the particular Kafka topic, this message is received by FIXEdge via Consumer API. The following DEBUG message is logged in the client log category of the log file:
"The message content:
headers: <headers>,
key: <key>,
partition Id: <partition Id>,
value: <value>"
The message is deserialized via the custom serializer.
Message key processing
In case of the default serialization, the particular tag is configured in the "KeyTag" parameter and the specified key is passed to the Kafka platform along with the message sent to the particular Kafka topic.
In case of the custom serialization, the custom plugin produces the key which is passed to the Kafka platform along with the message sent to the particular Kafka topic.
Custom partitioning
If custom partitioning is configured, the custom plugin produces the partition ID based on tag values of a FIX message.
Message Content Wrapping
When BL Client is configured to work with XMLData (213), and FIXEdge sends a message to the particular Kafka topic, and the XMLData (213) field exists in the FIX message and this field is not empty, then the value of the XMLData (213) field is sent to the particular Kafka topic via Producer API.
When BL Client is configured to work with XMLData (213), and FIXEdge sends a message to the particular Kafka topic, and the XMLData (213) field does not exist in the FIX message and this field is empty, then nothing is sent to the particular Kafka topic via Producer API. The corresponding error is sent to the Business Layer.
When BL Client is configured to work with XMLData (213), and Kafka sends a message to FIXEdge, then the FIX message with 35=n (XML message) is received by FIXEdge, and the XMLData (213) field is filled in with the received message, and the XMLDataLen (212) field is filled in with the received message length.
Troubleshooting
Common troubleshooting issues include:
Failure to initialize
If the adapter fails to start up, there is an issue with a configuration parameter.
To resolve this issue, the configuration parameter specified in the error message must be corrected. Check the log for error messages.
Session wasn't created
If, during start-up, one session doesn’t load successfully while others are created successfully, there is an issue with a configuration parameter.
To resolve this issue, the configuration parameter specified in the error message must be corrected. Check the log for error messages.
Adapter not sending or receiving messages
If sessions have been created successfully, but the adapter isn’t sending or receiving messages to/from the Kafka server, this issue has most likely occurred due to a problem with the connection.
If the adapter can’t connect to the Kafka server, it will continue to make connection attempts in given intervals until a reason for the error is established. Until this is done, the TA will not be able to send or receive messages.
To establish what the error is, you must enable DEBUG logging as follows:
- Open the FIXEdge.properties file
- Set the parameter Log.DebugIsOn = True
Once this is done, the log will show error messages stating previous connection attempts and the reason for the error. To resolve this issue, correct the configuration issue specified in the error message.
Messages not delivered
If the adapter is receiving messages from the Kafka server, but not delivering them to target sessions, Business Layer rules have either been improperly configured or have not been specified.
In order to see an error message specifying the issue, DEBUG logging must be enabled.
To enable DEBUG logging:
- Open the FIXEdge.properties file
- Set the parameter Log.DebugIsOn = True