Overview
The Kafka Transport Adapter (hereinafter Kafka TA) provides the FIXEdge server connectivity to the Kafka distributed streaming platform.
Internally, its implementation uses the Kafka Consumer and Producer APIs.
The Kafka TA acts as a FIXEdge plugin and establishes a connection to a Kafka single instance or a Kafka cluster. The adapter supports TLS connectivity.
The Kafka TA transfers FIX messages in raw or serialized formats from the FIXEdge Server to the Kafka platform and vice versa.
Kafka TA performance is close to Kafka's native speed of processing. The average FIX messages reading/writing per second is 30,000 or higher. This number refers to the overall throughput capacity for the FIXEdge server with the Kafka TA (from FIX session to Kafka server).
The Kafka TA automatically reconnects to Kafka if the connection to a message broker was unexpectedly terminated. Reconnection behavior can be configured by parameters specified in the Configuration parameters section below.
Multiple instances of the Kafka TA can be configured using one library instance, inside one instance of FIXEdge.
When the FIXEdge server is terminated, the Kafka TA employs graceful termination, ensuring that the FIXEdge server will execute message processing operations correctly, completely, and without message loss during server termination.
FIFIO (First In First Out) ordering is guaranteed, meaning that the order in which messages were sent and received will be maintained upon termination of the server instance.
Concept
The Kafka TA is intended to communicate FIX messages to other applications using the Kafka streaming platform as middleware.
Communication is provided between the FIXEdge server and a Kafka broker or cluster on the FIXEdge side.
The Kafka TA allows integration with Kafka via Consumer/Producer API. FIXEdge clients, configured either as producers or consumers, push and pull messages to/from brokers housed in Kafka that organize and store messages according to topics. Brokers commit messages to disk storage for a customizable period of time or space, configurable according to the topic. Topics are further broken down into partitions, with each partition acting as a separate commit log. A broker administrator keeps track of consumers when they read partitions by assigning offsets, thereby providing guaranteed-ordering. These features facilitate easy scaling and event replay, as well as the building of fault-tolerant systems.
The Kafka TA differentiates from other Transport Adapters in providing custom serialization/deserialization and durable message storage.
The schema below represents an interaction between FIXEdge and a customer's infrastructure via the Kafka solution.
Basic elements of the Kafka connectivity model
- Producer = publisher, the sender of messages
- Consumer = subscriber, the reader of messages
- Message/event = a single unit of data
- Broker = a server forming a storage layer containing one or more topics
- Topic = ordered collection of events that are organized and stored in a durable way
- Partition = buckets located within topics across which messages are distributed
- Commit log = a record of changes that is committed to disk and can be replayed by consumers
- Guaranteed ordering = a guarantee that a consumer of a given topic-partition will always read that partition's events in exactly the same order as they were written
- Offset = tracking feature that helps consumers remember its position when last reading a partition
Clients behavior
When several clients (either one Producer or one Consumer, or both one Producer and one Consumer) are configured for the Kafka TA, all of them establish a connection to the Kafka platform when FIXEdge starts its work.
If one of the clients fails to establish a connection, the remaining clients are not affected by this failure.
Delivery
The Kafka TA provides guaranteed message delivery. The connectivity mechanism preserves FIX messages even if critical situations occur, such as:
- FIXEdge server is terminated
- Kafka TA crashes or does not start due to invalid configuration
- FIX Session is terminated non-gracefully
- message broker is down
- network disorder occurs
If any unsent messages are found in the Persistent Messages Storage, the preserved messages are processed (before any other runtime-messages) as soon as the FIXEdge server starts and the Kafka connection is re-established.
Kafka TA services discipline is FIFO in the scope of one Kafka partition (for messages that come from one client). FIX messages that come from one FIX session preserve their order when passed to the message broker. FIX messages that arrive from a single queue from the message broker preserve their order in the FIX session.
FIX messages to Kafka via Producer API are sent in the same order in which FIXEdge sends the messages to the Client (sync messaging). FIXEdge fetches FIX messages via the Consumer API in the same order in which messages arrive at the particular Kafka topic.
When the "group.id" parameter is not configured and its value is yet to be assigned, the session ID is used as the value for the Kafka TA "group.id" parameter.
Committing to Kafka
When the "Commit" Kafka TA parameter is not configured or set to "Auto" for the Consumer and Kafka sends a message to FIXEdge, the Kafka TA sends the commit to Kafka after the expiration of the configured time interval.
When the "Commit" Kafka TA parameter is set to "Sync" for the Consumer and Kafka sends a message to FIXEdge, the Kafka TA sends the commit to Kafka when the FIXEdge BL reports to the Kafka TA that the message is delivered.
When the "Commit" Kafka TA parameter is set to "Async" for the Consumer and Kafka sends a message to FIXEdge, the Kafka TA sends the commit to Kafka asynchronously.
Async message handling
Sending FIX messages from FIXEdge to Kafka is regarded as async messaging. Messages are saved to Persistent Messages Storage.
Sync message handling
Sending FIX messages from Kafka to FIXEdge is regarded as sync messaging. Messages are not saved to any storage. They become available at the FIXEdge Business Layer.
When a message arrives at the particular Kafka topic in the defined order, the messages are fetched by FIXEdge via the Consumer API and the messages are available at the FIXEdge Business Layer in the same order.
Configuration steps
To set up the Kafka TA for FIXEdge, the following steps have to be executed:
- Copy the FIXEdge distribution package to the FIX Edge home:
Unpack the content of the distribution package to the FIXEdge root directory, e.g. to C:\FIXEdge Enable a transport adapter to be used by FIXEdge:
In the ‘Transport Layer Section’ of the FIXEdge.properties, add the Kafka TA to the list of supported adapters:TransportLayer.TransportAdapters = TransportLayer.KafkaTA
Note: If you use other transport adapters, just add TransportLayer.KafkaTA to the end of the list:
TransportLayer.TransportAdapters = TransportLayer.SmtpTA, TransportLayer.RestTA, TransportLayer.KafkaTA
Configure the Kafka TA by adding the Kafka TA section to the FIXEdge.properties:
TransportLayer.KafkaTA.Description = Kafka Transport Adaptor TransportLayer.KafkaTA.DllName = bin/KafkaTA-vc10-MD-x64.dll TransportLayer.KafkaTA.Sessions = Kafka TransportLayer.KafkaTA.Kafka.bootstrap.servers = localhost:9092 TransportLayer.KafkaTA.Kafka.FIXVersion = FIX44 TransportLayer.KafkaTA.Kafka.Consumer.Commit = Auto TransportLayer.KafkaTA.Kafka.Consumer.Topics = outputTopic TransportLayer.KafkaTA.Kafka.Consumer.group.id = ID TransportLayer.KafkaTA.Kafka.Producer.Topic = topic
Note: Sample settings can be copied to the FIXEdge.properties file from the KafkaTA.properties file (located in the doc folder of the FIXEdge distribution package).
- Configure rules for message routing from the Kafka TA.
The Kafka TA Client is referred to the Business Layer (BL) by the ClientID name specified in the FIXEdge.properties file. For a Kafka Business Layer configuration sample, refer to the Configuration sample sub-section. - Restart the FIXEdge server to apply the changes.
Configuring multiple adapter instances
Multiple instances of the Kafka TA can be used during one library instance, inside one instance of FIXEdge.
To run multiple instances of the Kafka TA, each new instance must be assigned a new name and new session names. The name assigned to the new adapter instance must be consistent across parameters relating to that instance. New session names must be unique across all existing instances of the adapter.
TransportLayer.Kafka1.Description = Kafka Transport Adaptor TransportLayer.Kafka1.DllName = bin/KafkaTA-vc10-MD-x64.dll TransportLayer.Kafka1.Sessions = KafkaA TransportLayer.Kafka1.KafkaA.bootstrap.servers = localhost:9092 TransportLayer.Kafka1.KafkaA.FIXVersion = FIX44 TransportLayer.Kafka1.KafkaA.Consumer.Commit = Auto TransportLayer.Kafka1.KafkaA.Consumer.Topics = outputTopic TransportLayer.Kafka1.KafkaA.Consumer.group.id = ID TransportLayer.Kafka1.KafkaA.Producer.Topic = topic
In the above example, the new adapter instance is called "Kafka1" and the unique session name is assigned as "KafkaA".
Configuration parameters
Connection to the Kafka platform can be established when the Kafka TA is properly configured with the Kafka IP address.
Configuration sample
The samples below represent the Kafka TA's configuration with the minimal set of parameters required to run the adaptor:
Kafka TA configuration file:
Kafka Business Layer rules:
Authentication Configuration
SSL authentication
To configure SSL authentication, follow these steps:
- Make sure the Kafka broker and adaptor are configured for SSL connection (link to instructions to be provided soon)
Set client authentication as "required" in the server.properties file
Provide ssl.key details in the FIXEdge.properties file
SASL_PLAIN authentication
To configure SASL_PLAIN authentication:
Use the following files and corresponding settings to install the Kafka broker
Use the following file and corresponding settings in to configure the Kafka TA:
SASL_SSL authentication
This is a combination of an SSL connection with client authentication and SASL_PLAIN authentication.
To configure SASL_SSL authentication:
Use the following files and corresponding settings to install the Kafka broker
Use the following file and corresponding settings to configure the Kafka TA:
SASL_GSSAPI authentication
To configure SASL_GSSAPI authentication:
Use the following files and corresponding settings to install the Kafka broker
Use the following file and corresponding settings to configure the Kafka TA
Logging
Kafka exhaustive logging means that any action with a configuration parameter is logged.
List of logged actions:
- Adapter initialization
- Parsing validation errors
- Configuration parameters
- Sent and received messages
The following levels of logging are used for Kafka:
- ERROR - is always ON
- INFO - can be optionally ON
- WARN - can be optionally ON
- DEBUG - is turned off by default and is used in critical cases
Any configuration event is logged in the "Kafka_TA" log category in the log file via the INFO messages.
The table below specifies the most common logging messages for Kafka.
Log type | Event description | Logging message format |
---|---|---|
ERROR | Client error | "Session '<session_name>': [Consumer/Producer]: <error_message>" |
Outgoing message failed to be queued | "Session '<session_name>': Producer: Error handling the outgoing message: <native error>. Message: <message>" | |
Incoming message failed to be handled | "Session '<session_name>': Consumer: Error handling the incoming message: <native error>. Message (optional): <message>" | |
INFO | Client creation / any config parameter change | |
WARN | Kafka TA fails to apply any config parameter | "Session '<session_name>': <warning_message>" |
The outgoing message is rejected because the client is not connected to Kafka | "Session '<session_name>': Producer: The outgoing message is rejected: <message>" is visible in the "Kafka_TA" | |
DEBUG | Creation of a Consumer or Producer | "Session '<session_name>': Producer: Error handling the outgoing message: <native error>. Message: <message>" |
Adding an outgoing message to the queue | "Session '<session_name>': Producer: The message is put to the outgoing queue: <message>" | |
Removing an outgoing message from the queue | "Session '<session_name>': Producer: Sending the message via the producer: <message>" | |
An incoming message is being received | "Session '<session_name>': Consumer: Receiving the message: <message>" |
Logging setup
The Kafka TA writes messages to the log under the 'Kafka_TA' log category.
By default, the Kafka TA puts log messages into the FIXEdge log. Execute the following steps to configure the Kafka TA logging into a separate file:
- Open the FIXEdge properties file.
Add 'Log.Kafka_TA.File.Name' property with value - path and name of the Kafka TA log file.
Log.Kafka_TA.File.Name = Kafka.log
Restart the FIXEdge server.
When the Kafka TA starts, the following record will be written into the log file (xxx is the Kafka TA version):
[NOTE] 20070215-11:20:12.437 - Kafka Adaptor v.xxx started.
Additional information about logging configuration can be found there:
- FIXEdge logs format
- How to redirect FIX Antenna and/or FIXEdge logging to Syslog instead of files
- How to divide different categories and severities of log files into different files in the Logging section
Custom serialization
When FIX messages are transferred in raw format (i.e. serialization is not applied), the Kafka TA implies a string containing the tag values has been converted to strings.
The serialized format of communication is fully supported by the Kafka TA. Custom serialization implies the specially designed external code and includes serialization of all Kafka message parts:
- Key
- Value
- Header(-s)
Serialization on sending
When custom serialization is applied and FIXEdge sends the message to the particular Kafka topic, this message is serialized via the custom serializer and is sent to a particular Kafka topic via the Producer API. The following DEBUG message is logged in the client log category of the log file:
"The message content:
headers: <headers>,
key: <key>,
partition Id: <partition Id>,
value: <value>"
Serialization on receiving
When custom serialization is applied and the new message arrives at the particular Kafka topic, this message is received by FIXEdge via the Consumer API. The following DEBUG message is logged in the client log category of the log file:
"The message content:
headers: <headers>,
key: <key>,
partition Id: <partition Id>,
value: <value>"
The message is deserialized via the custom serializer.
Message key processing
In the case of the default serialization, the Kafka TA produces a key using the value of the tag configured in the "KeyTag" parameter. The specified key is passed to the Kafka platform along with the message sent to the particular Kafka topic.
In the case of the custom serialization, the custom plugin produces the key that is passed to the Kafka platform along with the message sent to the particular Kafka topic.
Custom partitioning
If custom partitioning is configured, the custom plugin produces the partition ID based on the tag values of a FIX message.
Message Content Wrapping
When FIXEdge sends a message to a particular Kafka topic and the BL Client is configured to work with XMLData (213), two flows are possible:
- The XMLData (213) field exists in the FIX message and is not empty.
In this case, the XML message extracted from the XMLData (213) field is sent to the particular Kafka topic via the Producer API. - The XMLData (213) field does not exist in the FIX message or is empty.
In this case, the corresponding error is sent to the Business Layer and nothing is sent to a Kafka topic via the Producer API.
When Kafka sends an XML message to FIXEdge and the BL Client is configured to work with XMLData (213), the FIX message with 35=n (XML message, JSON plain text, etc.) is received by FIXEdge, the XMLData (213) field is filled in with the received XML message, and the XMLDataLen (212) field is filled in with the received message length.
Session Monitoring
When the Kafka TA is in use, the following information about the adapter’s state will be available:
Name shown | Description |
---|---|
Messages received during last session | Number of incoming messages from Kafka handled per client, beginning from the client’s current working session |
Messages received total | Number of incoming messages from Kafka handled per client, beginning when FIXEdge started |
Messages sent during last session | Number of outgoing messages to Kafka handled per client, beginning from the client’s current working session |
Messages sent total | Number of outgoing messages to Kafka handled per client, beginning when FIXEdge started |
Messages with errors during last session | Number of errors related to the Kafka TA, beginning from the client’s current working session |
Messages with errors total | Number of errors related to the Kafka TA, beginning when FIXEdge started |
Producer’s queue depth | Queue depth per producer |
Status | Status of the client’s connection with Kafka |
Troubleshooting
Common troubleshooting issues include:
Failure to initialize
If the adapter fails to start up, there is an issue with a configuration parameter.
To resolve this issue, the configuration parameter specified in the error message must be corrected. Check the log for error messages.
Session wasn't created
If, during start-up, one session doesn’t load successfully while others are created successfully, there is an issue with a configuration parameter.
To resolve this issue, the configuration parameter specified in the error message must be corrected. Check the log for error messages.
Adapter not sending or receiving messages
If sessions have been created successfully, but the adapter isn’t sending or receiving messages to/from the Kafka server, this issue has most likely occurred due to a problem with the connection.
If the adapter can’t connect to the Kafka server, it will continue to make connection attempts in given intervals until a reason for the error is established. Until this is done, the TA will not be able to send or receive messages. The default level of logging doesn't explain the reason, you need to enable the deeper level of logging.
To establish what the error is, you must enable the DEBUG logging as follows:
- Open the FIXEdge.properties file
- Set the parameter Log.DebugIsOn = True
Once this is done, the log will show error messages stating previous connection attempts and the reason for the error. To resolve this issue, correct the configuration issue specified in the error message.
Messages not delivered
If the adapter is receiving messages from the Kafka server, but not delivering them to target sessions, Business Layer rules have either been improperly configured or have not been specified.
In order to see an error message specifying the issue, DEBUG logging must be enabled.
To enable DEBUG logging:
- Open the FIXEdge.properties file
- Set the parameter Log.DebugIsOn = True