Page tree
Skip to end of metadata
Go to start of metadata

Overview

FIXEdge/J provides a functionality for storing and forwarding messages in the event that a connection with one of the counterparties is dropped and messages are not delivered. These messages will be stored by the queue and forwarded when the receiving counterparty connection is re-established. E.g. if one of the counterparties is logged off and messages are being sent, FIXEdge/J can place the undelivered messages into the Store & Forward Queue and route them once the counterparty is logged on. 
The Store and Forward functionality:

  • can be enabled/disabled for a particular session in the FIXEdge/J
  • can be used for specific message types (business only)
  • can be used for specific messages by specifying a particular filter with tags values, e.g. 59=0
  • can define stale messages and remove them from the queue
  • has persisting options - in the memory and on the disk


Configuration

Enable Store and Forward functionality

Enable the Store and Forward queue for an endpoint

To enable a Store and Forward queue for a particular endpoint, use the storeAndForwardEndpoint.enabled=true/false property.

For example, Client A may want to store all business messages that he sends to Exchange. In this case, the Store and Forward queue should be enabled at the destination, (Exchange), where he sends messages to. 

s_fix_Exchange.properties
sessionType=acceptor
senderCompID=FIXEdgeJCompID
targetCompID=Exchange
storeAndForwardEndpoint.enabled=true

Note. To enable/disable the Store and Forward queue for all endpoints on FIXEdge/J, use the storeAndForwardEndpoint.enabled=true/false property in the s_fixDefault.properties.

Enable the Store and Forward functionality for the session

To enable the Store and Forward queue for a particular session, use the storeAndForward.enabled=true/false property.

For example, Client A may want to store all the business messages that he sends to Exchange. At the same time, Client B may not want to store the messages that he sends to the same Exchange. In this case, the Store and Forward functionality should be disabled for Client B. 

s_fix_Client_B.properties
sessionType=acceptor
senderCompID=FIXEdgeJCompID
targetCompID=ClientB
storeAndForward.enabled=false

Compatibility matrix


storeAndForward.enabled=truestoreAndForward.enabled=false
storeAndForwardEndpoint.enabled=true

Client A: storeAndForward.enabled=true

Client B: storeAndForward.enabled=true

Exchange: storeAndForwardEndpoint.enabled=true

Messages from Client A and Client B will be stored in the Exchange's Store and Forward queue

Client A: storeAndForward.enabled=true

Client B: storeAndForward.enabled=false

Exchange: storeAndForwardEndpoint.enabled=true

Messages from Client A will be stored in the Exchange's Store and Forward queue

Messages from Client B will not be stored in the Exchange's Store and Forward queue

Client A: storeAndForwardEndpoint.enabled=true

Client B: storeAndForwardEndpoint.enabled=true

Exchange: storeAndForward.enabled=true

Messages from the Exchange will be stored in the Clients Store and Forward queues

Client A: storeAndForwardEndpoint.enabled=true

Client B: storeAndForwardEndpoint.enabled=true

Exchange: storeAndForward.enabled=false

Messages from the Exchange will not be stored in the Clients Store and Forward queues

storeAndForwardEndpoint.enabled=false

Client A: storeAndForward.enabled=true

Client B: storeAndForward.enabled=true

Exchange: storeAndForwardEndpoint.enabled=false

Messages from Client A and Client B will not be stored in the Exchange's Store and Forward queue

Client A: storeAndForward.enabled=true

Client B: storeAndForward.enabled=false

Exchange: storeAndForwardEndpoint.enabled=false

Messages from Client A and Client B will not be stored in the Exchange's Store and Forward queue

Client A: storeAndForwardEndpoint.enabled=true

Client B: storeAndForwardEndpoint.enabled=false

Exchange: storeAndForward.enabled=true

Messages from the Exchange will be stored in the Client A Store and Forward queue

Messages from the Exchange will not be stored in the Client B Store and Forward queue

Client A: storeAndForwardEndpoint.enabled=true

Client B: storeAndForwardEndpoint.enabled=false

Exchange: storeAndForward.enabled=false

Messages from the Exchange will not be stored in the Clients Store and Forward queues

Configure Store and Forward for particular message types

To enable a Store and Forward queue for particular message types, use the property storeAndForward.msgTypesRequiredForDelivery=F,G.

For example, Client A may only want to store Order Cancel Request (MsgType = F) messages. In this case, a message type should be specified for storage in the Store and Forward queue for a Client's session.

s_fix_Client_A.properties
sessionType=acceptor
senderCompID=FIXEdgeJCompID
targetCompID=ClientA
groups=Group_A
storeAndForward.enabled=true
storeAndForward.msgTypesRequiredForDelivery=F

Store and Forward functionality for a particular tag value

Filtering messages by particular tag values can be configured using business rules. There is a special function, 'requiredForDelivery', that enables message saving in the Store and Forward queue.

If the "requiredForDelivery" parameter is set to true or not defined, the message will be saved and delivered regardless.

...
def requiredForDelivery = false
if(msg.getTagValueAsDouble(44) > 1000 && msg.getTagValueAsString(55) == "AAPL") {
    requiredForDelivery = true
}
send(routingContext, messageContext, destination, requiredForDelivery)
...

Configure handling 'stale' messages

FIXEdge/J provides the ability to remove 'stale' orders from the Store and Forward queue. The period after which an order becomes outdated can be defined for each session. In addition, there is an option to exclude some message from the set of 'stale' messages.

For example, Client A may want to define all messages that are older than 30 sec at the sending time as 'stale'. At the same time, Client A wants Order Cancel Request (MsgType = F) messages to be always processed to the Exchange.

There are two properties:

  • storeAndForward.stalePeriod property defines a period (in seconds) that is used to check if the message has become 'stale' and should not be sent. If the property is set, all queued messages will be checked when a destination becomes available. In addition, if the difference between TransactTime (Tag = 60) and sending time is more that defined the period, messages will not be sent to the destination. 
  • storeAndForward.staleProcessingExcludedTypes property specifies the types of messages that the stale message checking and removal should not be applied to.


s_fix_Client_A.properties
sessionType=acceptor
senderCompID=FIXEdgeJCompID
targetCompID=ClientA
groups=Group_A
storeAndForward.enabled=true
storeAndForward.stalePeriod=30
storeAndForward.staleProcessingExcludedTypes=F

Configure Store and Forward persistence mode

By default, the Store and Forward queue is persisted on the disk in the 'logs' directory.

In addition, there is a possibility to configure the messages in the memory using the storeAndForwardEndpoint.queueType parameter.

Possible values: 

  • storeAndForwardEndpoint.queueType = IN_MEMORY - In this mode, all messages provided are saved in memory. If the system is down (gracefully or non-gracefully) all the messages will be lost.
  • storeAndForwardEndpoint.queueType = PERSISTENT - In this mode, all messages provided are saved on the disk and will be restored and sent in case of shutdown (gracefully or non-gracefully).
  • storeAndForwardEndpoint.queueType = REPLICATED - In this mode all messages will be stored/replicated with the Persistent API functionality.

Configure Store and Forward processing mode

By default, FIXEdge/J processes messages in an asynchronous mode.

In the case when synchronous processing is needed, there is a parameter that allows processing message in this mode - storeAndForwardEndpoint.maxTimeForSyncSendingInUsec.

Please note, that this parameter will be applied only if storeAndForwardEndpoint.enabled=true.


s_fix_Client_B.properties
sessionType=acceptor
senderCompID=FIXEdgeJCompID
targetCompID=ClientB
groups=Group_A
storeAndForwardEndpoint.enabled=true
storeAndForwardEndpoint.maxTimeForSyncSendingInUsec = 60000000

It is also possible to use this parameter with slow consumer sessions. Reducing the value of the parameter allows starting sending in the synchronous mode. If the message wasn't processed in the defined period (in microseconds), FIXEdge/J will switch to asynchronous mode.


s_fix_Client_B.properties
sessionType=acceptor
senderCompID=FIXEdgeJCompID
targetCompID=ClientB
groups=Group_A
storeAndForwardEndpoint.enabled=true
storeAndForwardEndpoint.maxTimeForSyncSendingInUsec = 100

Managing messages in the queue

Description

It is possible to manage the store and forward queue to iterate through, remove, or add messages.

Methods

Method name

Description

Example

clear()Removes all messages

Iterator<MessageEvent> iterator()

Iterates through all the elements in the queue
for (Iterator<MessageEvent> iterator = queue.iterator(); iterator.hasNext(); ) {
MessageEvent nextEvent = iterator.next();
if (<some condition>) {
iterator.remove();
}
}
isEmpty()Returns true if the queue is empty
MessageEvent peek()Gets the first element without removing it
remove(MessageEvent messageEvent)Removes the specified message event
MessageEvent first = queue.peek();
queue.remove(first);

Example of a rule for removing queued orders by cancel request

        rule("Remove queued order by order cancel request")
                .condition({
                    ctx -> stringValue(ctx.getMessage(), 35) == "F" //filter by OrderCancelRequest msg type
                })
                .action({
                    ctx ->
                        def cancelRequest = ctx.getMessage()
                        def queue = getManageableQueueByDestinationId(routingContext, "Exchange")
                        def removedFromQueue = false
                        for (Iterator iterator = queue.iterator(); iterator.hasNext(); ) {
                            def queuedMessage = iterator.next().getMessage()
                            if (stringValue(queuedMessage, 37) == stringValue(cancelRequest, 41)) {
                                iterator.remove()
                                removedFromQueue = true
                                break
                            }
                        }
                        if (!removedFromQueue) {
                            //send cancellation to exchange
                        } else {
                            logger.info("Order with id '{}' has been removed from the queue by cancel request with id '{}'", stringValue(cancelRequest, 41), stringValue(cancelRequest, 37))
                        }
                        ctx.exit()
                })
        .build()
Rule for routing order messages
        rule("Route order messages to exchange")
                .condition({
                    ctx -> stringValue(ctx.getMessage(), 35) == "D" //filter by Order msg type
                })
                .action({
                    ctx ->
                        send(routingContext, ctx, "Exchange") // send a message to the destination
                        ctx.exit()
                })
                .build()



  • No labels