Cluster implementation is scalable, customizable, and provides several endpoints for integration.
It is very useful to use replication with
ClusterManager allows starting and stopping the leader or a backup replication instance depending on the node's role in the cluster.
ClusterManager can also automatically notify the replication service about available nodes and their address.
To use the replication service with Cluster Manager, you need to implement
LocalNodeBackupListener to receive and handle notifications about the cluster's state. You can use default implementations of these listeners for the replication service. They will handle all events, and enable and configure appropriate service depending on the node's role in the cluster.
HazelcastCluster clusterManager = new HazelcastClusterManager(); // create controller for enable/disable leader replication role ReplicationLeader replicationLeaderService = new ReplicationLeader(aeronTransport,correlationIdHolder, REPLICATION_LEADER_PORT, //(1) REPLICATION_BROADCAST_PORT, //(2) "./logs/" + name); // register listener for enabling/disabling replication functionality for leader LocalNodeLeaderListener leaderListener = new ReplicationLeaderListener(replicationLeaderService); clusterManager.addLocalNodeLeaderListener(leaderListener);
// create controller for enable/disable backup replication role ReplicationBackup replicationBackupService = new ReplicationBackup(aeronTransport, correlationIdHolder, REPLICATION_LEADER_PORT, REPLICATION_BROADCAST_PORT, "./logs/" + name); // register listener for enabling/disabling replication functionality for backup LocalNodeBackupListener backupListener = new ReplicationBackupListener(replicationBackupService); clusterManager.addLocalNodeBackupListener(backupListener);
// Request instance of persistence factory ReplicatedPersistenceFactory factory = ReplicatedPersistenceFactoryHolder.getFactory().get(); // Request instance of named persistence sequence with synchronouse replication (replication timeout - 10 millis) PersistentSequence<String> sequence = factory.getOrCreateSequence("seqName", indexer, new StringSerializer(), 10, TimeUnit.MILLISECONDS); // Request instance of named persistence queue with asynchronouse replication PersistentQueue<String> queue = factory.getOrCreateQueue("queueName", new StringSerializer(), 0, TimeUnit.MILLISECONDS);
//Request instance of persistence factory ReplicatedPersistenceFactory factory = ReplicatedPersistenceFactoryHolder.getFactory().get(); //Enable synchronious replication by-default (replication timeout - 10 millis) factory.setSyncMode(10, TimeUnit.MILLISECONDS); // Request instance of named persistence sequence PersistentSequence<String> sequence = factory.buildSequence("seqName", indexer, new StringSerializer()); // Request instance of named persistence queue PersistentQueue<String> queue = factory.buildQueue("queueName", new StringSerializer());
The goal of a replication service is to maintain full copies of the storages within a network. To do this, it should support at least 2 operations:
synchronization of data to restore the actual state
replication of data in runtime to maintain the state
The service can replicate data in two modes - synchronous and asynchronous.
Sending messages asynchronously means that confirmation of the successful receipt and processing of messages by the other side is not expected. Replication is performed in parallel with the thread, which changes the data in the storages.
Asynchronous replication expects to receive confirmation about the successful processing, at least, from one of the backup nodes. A processing timeout can be specified for the storage instance during its creation. Otherwise, the default settings will be used. The thread that changes the data in the storages is blocked until receiving acknowledgment or until the timeout expires. In the latter case, the warning that signals that the data was not successfully transmitted to any of the backup nodes during the specified period will be logged.
The replication mode is set separately for each of the repositories so it is possible to simultaneously use both a storage with synchronous replication and a storage with asynchronous replication.
Each backup node maintains two transport channels because Aeron transport is unidirectional by its nature:
incoming: for sending synchronization requests and synchronous acknowledgments
outgoing: for receiving information about changing the data in the storages
The listening port is assigned to each node of the replication service, depending on its role. If you change the node’s role, the listening port is not changed. This same port will be always listened by the leader and the backups will always know how to contact it. Same for the port that is listened by the backups.
Persistent storages are designed for incremental updates. Internal storage contains a log of operations like 'APPEND' and 'REMOVE'. In case of the synchronization or replication of data, it only needs to send new updates.
After starting, the backup instance synchronizes its state with the leader.
The synchronization procedure is:
The backup sends a
GET_ALL_RESOURCES request to the leader right after its start.
The leader sends lists of existing queries and sequences in response as 'RESOURCE_LIST' messages. A
CorrelationId is also sent for every storage. The
CorrelationId is a unique id for storage. It is used in the following communications.
The backup loads exist and create non-exist storages.
The backup sends a
SYNC_REQ request for every storage and passes its index. The timestamp of the last reset is passed for a sequence in addition.
The leader sends a
SYNC_REQ_ACCEPTED message in response. It compares indexes and timestamps, and resends all the required data. For the sequence, it can also send a
SEQUENCE_RESET message if the reset was missed. At the end, the leader sends a
SYNC_FINISHED message to indicate the border of synchronization answer.
For every operation with persistent storage, the leader sends updates to all backups.
The replication procedure is:
Upon adding the new item to internal storage, the leader sends a
SEQUENCE_APPEND message. New data and an internal ordered index are sent with this message.
The backup receives this message and compares the expected index with the received one.
If the received index is different then expected, it starts a synchronization procedure (see Data synchronization procedure, n.4)
If the leader indicates that this is synchronous storage and expects acknowledgment, the backup sends back an
FIX Antenna Java can also use replication storages through the Persistence API.
To enable replicated storage for a FIX session, it needs to setup a replication leader and a backup (Initialize replication leader, Initialize replication backup), and use the following configuration options for FIX antenna (in
storageFactory=com.epam.fixengine.storage.persistence.PersistenceEnableFactory //(1) storage.persistenceFactoryBuilder=com.epam.fixengine.storage.persistence.ReplicatedPersistenceFactoryBuilder //(2) replicationTimeout=10 //(3)
storageFactoryproperty. This factory allows using the Persistence API for storing a FIX session's state.
PersistenceEnableFactoryis based on
FilesystemStorageFactoryand delegates all operation Persistence API objects. Working with this API requires the implementation of
PersistentFactoryBuilder. The last one should construct an instance of
ReplicatedPersistenceFactoryBuilderlike a factory builder for
PersistentFactoryBuilderand builds a replicated instance of the factory. It uses
replicationTimeoutoptions from the FIX Antenna config (or from the session’s
Configurationinstance) to configure synchronous or asynchronous replication for the FIX session.