Cluster implementation is scalable, customizable, and provides several endpoints for integration.
It is very useful to use replication with
ClusterManager allows starting and stopping the leader or a backup replication instance depending on the node's role in the cluster.
ClusterManager can also automatically notify the replication service about available nodes and their address.
To use the replication service with Cluster Manager, you need to implement
LocalNodeBackupListener to receive and handle notifications about the cluster's state. You can use default implementations of these listeners for the replication service. They will handle all events, and enable and configure appropriate service depending on the node's role in the cluster.
Initialize replication leader
HazelcastCluster clusterManager = new HazelcastClusterManager(); // create controller for enable/disable leader replication role ReplicationLeader replicationLeaderService = new ReplicationLeader(aeronTransport,correlationIdHolder, REPLICATION_LEADER_PORT, //(1) REPLICATION_BROADCAST_PORT, //(2) "./logs/" + name); // register listener for enabling/disabling replication functionality for leader LocalNodeLeaderListener leaderListener = new ReplicationLeaderListener(replicationLeaderService); clusterManager.addLocalNodeLeaderListener(leaderListener);
- REPLICATION_LEADER_PORT - a port that the leader is listening for a synchronization request from backups.
- REPLICATION_BROADCAST_PORT - a port that each backup node is listening for replication data from the leader.
Initialize replication backup
// create controller for enable/disable backup replication role ReplicationBackup replicationBackupService = new ReplicationBackup(aeronTransport, correlationIdHolder, REPLICATION_LEADER_PORT, REPLICATION_BROADCAST_PORT, "./logs/" + name); // register listener for enabling/disabling replication functionality for backup LocalNodeBackupListener backupListener = new ReplicationBackupListener(replicationBackupService); clusterManager.addLocalNodeBackupListener(backupListener);
Creating storage directly
// Request instance of persistence factory ReplicatedPersistenceFactory factory = ReplicatedPersistenceFactoryHolder.getFactory().get(); // Request instance of named persistence sequence with synchronouse replication (replication timeout - 10 millis) PersistentSequence<String> sequence = factory.getOrCreateSequence("seqName", indexer, new StringSerializer(), 10, TimeUnit.MILLISECONDS); // Request instance of named persistence queue with asynchronouse replication PersistentQueue<String> queue = factory.getOrCreateQueue("queueName", new StringSerializer(), 0, TimeUnit.MILLISECONDS);
Creating storage with Persistence API
//Request instance of persistence factory ReplicatedPersistenceFactory factory = ReplicatedPersistenceFactoryHolder.getFactory().get(); //Enable synchronious replication by-default (replication timeout - 10 millis) factory.setSyncMode(10, TimeUnit.MILLISECONDS); // Request instance of named persistence sequence PersistentSequence<String> sequence = factory.buildSequence("seqName", indexer, new StringSerializer()); // Request instance of named persistence queue PersistentQueue<String> queue = factory.buildQueue("queueName", new StringSerializer());
Replication service lifecycle
The goal of a replication service is to maintain full copies of the storages within a network. To do this, it should support at least 2 operations:
synchronization of data to restore the actual state
replication of data in runtime to maintain the state
The service can replicate data in two modes - synchronous and asynchronous.
Sending messages asynchronously means that confirmation of the successful receipt and processing of messages by the other side is not expected. Replication is performed in parallel with the thread, which changes the data in the storages.
Asynchronous replication expects to receive confirmation about the successful processing, at least, from one of the backup nodes. A processing timeout can be specified for the storage instance during its creation. Otherwise, the default settings will be used. The thread that changes the data in the storages is blocked until receiving acknowledgment or until the timeout expires. In the latter case, the warning that signals that the data was not successfully transmitted to any of the backup nodes during the specified period will be logged.
The replication mode is set separately for each of the repositories so it is possible to simultaneously use both a storage with synchronous replication and a storage with asynchronous replication.
Each backup node maintains two transport channels because Aeron transport is unidirectional by its nature:
incoming: for sending synchronization requests and synchronous acknowledgments
outgoing: for receiving information about changing the data in the storages
The listening port is assigned to each node of the replication service, depending on its role. If you change the node’s role, the listening port is not changed. This same port will be always listened by the leader and the backups will always know how to contact it. Same for the port that is listened by the backups.
Persistent storages are designed for incremental updates. Internal storage contains a log of operations like 'APPEND' and 'REMOVE'. In case of the synchronization or replication of data, it only needs to send new updates.
Data synchronization procedure
After starting, the backup instance synchronizes its state with the leader.
The synchronization procedure is:
The backup sends a
GET_ALL_RESOURCESrequest to the leader right after its start.
The leader sends lists of existing queries and sequences in response as 'RESOURCE_LIST' messages. A
CorrelationIdis also sent for every storage. The
CorrelationIdis a unique id for storage. It is used in the following communications.
The backup loads exist and create non-exist storages.
The backup sends a
SYNC_REQrequest for every storage and passes its index. The timestamp of the last reset is passed for a sequence in addition.
The leader sends a
SYNC_REQ_ACCEPTEDmessage in response. It compares indexes and timestamps, and resends all the required data. For the sequence, it can also send a
SEQUENCE_RESETmessage if the reset was missed. At the end, the leader sends a
SYNC_FINISHEDmessage to indicate the border of synchronization answer.
Data replication procedure
For every operation with persistent storage, the leader sends updates to all backups.
The replication procedure is:
Upon adding the new item to internal storage, the leader sends a
SEQUENCE_APPENDmessage. New data and an internal ordered index are sent with this message.
The backup receives this message and compares the expected index with the received one.
If the received index is different then expected, it starts a synchronization procedure (see Data synchronization procedure, n.4)
If the leader indicates that this is synchronous storage and expects acknowledgment, the backup sends back an
FIX session replication
FIX Antenna Java can also use replication storages through the Persistence API.
To enable replicated storage for a FIX session, it needs to setup a replication leader and a backup (Initialize replication leader, Initialize replication backup), and use the following configuration options for FIX antenna (in
storageFactory=com.epam.fixengine.storage.persistence.PersistenceEnableFactory //(1) storage.persistenceFactoryBuilder=com.epam.fixengine.storage.persistence.ReplicatedPersistenceFactoryBuilder //(2) replicationTimeout=10 //(3)
storageFactoryproperty. This factory allows using the Persistence API for storing a FIX session's state.
PersistenceEnableFactoryis based on
FilesystemStorageFactoryand delegates all operation Persistence API objects. Working with this API requires the implementation of
PersistentFactoryBuilder. The last one should construct an instance of
ReplicatedPersistenceFactoryBuilderlike a factory builder for
PersistentFactoryBuilderand builds a replicated instance of the factory. It uses
replicationTimeoutoptions from the FIX Antenna config (or from the session’s
Configurationinstance) to configure synchronous or asynchronous replication for the FIX session.
- Define replication timeout in milliseconds. A zero value will enable asynchronous replication for a FIX session.