Page tree
Skip to end of metadata
Go to start of metadata


Overview

Replication service allows maintaining backup copies of Persistence API storages.
Replication service uses Aeron transport for transmitting data to backup instances. Replication can work in synchronous and asynchronous mode. In synchronous mode, storage sends notification about every operation to backups and waits for acknowledgment back. It blocks calling thread till receiving acknowledgment or till predefined timeout will expire.

Replication service consists of 2 parts: leader and backup. Depends on instance role it needs to initialize and start one or another instance.

Leader instance is responsible for

    • notifications about exist and new storages

    • delivering notifications about storage’s operations

    • handling synchronization requests

Backup instance is responsible for

    • creating required storages on backup

    • synchronize storages

    • update storages state (process notifications about storage’s operations)

Quick setup

It is very useful to use replication with ClusterManager. ClusterManager allows to start and stop leader or backup replication instance depends on node role in cluster. Also ClusterManager can notify replication service about available nodes and they address automatically.
To use replication service with Cluster Manager it needs to implement LocalNodeLeaderListener and LocalNodeBackupListener to receive and handle notifications about cluster state. You can use default implementations of these listeners for replication service. They will handle all events and enable and configure appropriate service depends on node role in cluster.

Initialize replication leader


HazelcastCluster clusterManager = new HazelcastClusterManager();

// create controller for enable/disable leader replication role
ReplicationLeader replicationLeaderService = new ReplicationLeader(aeronTransport,correlationIdHolder,
        REPLICATION_LEADER_PORT,    										//(1)
        REPLICATION_BROADCAST_PORT, 										//(2)
        "./logs/" + name);

// register listener for enabling/disabling replication functionality for leader
LocalNodeLeaderListener leaderListener = new ReplicationLeaderListener(replicationLeaderService);
        clusterManager.addLocalNodeLeaderListener(leaderListener);
  1. REPLICATION_LEADER_PORT - port, which leader is listening for synchronization request from backups.
  2. REPLICATION_BROADCAST_PORT - port, which each backup node is listening for replication data from the leader.

Initialize replication backup

// create controller for enable/disable backup replication role
ReplicationBackup replicationBackupService = new ReplicationBackup(aeronTransport, correlationIdHolder,
        REPLICATION_LEADER_PORT, REPLICATION_BROADCAST_PORT, "./logs/" + name);

// register listener for enabling/disabling replication functionality for backup
LocalNodeBackupListener backupListener = new ReplicationBackupListener(replicationBackupService);
        clusterManager.addLocalNodeBackupListener(backupListener);

Creating storage directly

// Request instance of persistence factory
ReplicatedPersistenceFactory factory = ReplicatedPersistenceFactoryHolder.getFactory().get();

// Request instance of named persistence sequence with synchronouse replication (replication timeout - 10 millis)
PersistentSequence<String> sequence = factory.getOrCreateSequence("seqName", indexer, new
                    StringSerializer(),
                    10, TimeUnit.MILLISECONDS);
// Request instance of named persistence queue with asynchronouse replication
PersistentQueue<String> queue = factory.getOrCreateQueue("queueName", new StringSerializer(),
                    0, TimeUnit.MILLISECONDS);

Creating storage with Persistence API

//Request instance of persistence factory
ReplicatedPersistenceFactory factory = ReplicatedPersistenceFactoryHolder.getFactory().get();
//Enable synchronious replication by-default (replication timeout - 10 millis)
factory.setSyncMode(10, TimeUnit.MILLISECONDS);

// Request instance of named persistence sequence
PersistentSequence<String> sequence = factory.buildSequence("seqName", indexer, new StringSerializer());
// Request instance of named persistence queue
PersistentQueue<String> queue = factory.buildQueue("queueName", new StringSerializer());

Configuration

Configuration of replication service there is in 2 configuration files: replication service configuration (replication.properties) and Aeron Media Driver configuration (aeron.properties).

Replication Service configuration

Replication Service configuration there is in replication.properties.

  • fej.replication.leader.sync
    Default (initial) replication mode (synchronous or asynchronous)
    Default: false

  • fej.replication.leader.async.timeout
    Default (initial) timeout for synchronous replication in milliseconds. Process could be blocked for this timeout till receive acknowledgment from another side.
    Default: 0 milliseconds (async mode)

  • fej.replication.leader.receive.buffer.size
    The size of the leader incoming ring buffer must be power of 2.+ Default: 512 bytes

  • fej.replication.leader.receive.wait.strategy
    The wait strategy to use for the leader incoming ring buffer (see Disruptor WaitStrategy).
    Default: com.lmax.disruptor.BlockingWaitStrategy

  • fej.replication.leader.send.buffer.size
    The size of the leader outgoing ring buffer, must be power of 2.
    Default: 2048 bytes

  • fej.replication.leader.send.wait.strategy
    The wait strategy to use for the leader outgoing ring buffer (see Disruptor WaitStrategy).
    Default: com.lmax.disruptor.BlockingWaitStrategy

  • fej.replication.backup.receive.buffer.size
    The size of the backup incoming ring buffer, must be power of 2.
    Default: 1024 bytes

  • fej.replication.backup.receive.wait.strategy
    The wait strategy to use for the backup incoming ring buffer (see Disruptor WaitStrategy).
    Default: com.lmax.disruptor.BlockingWaitStrategy

  • fej.replication.backup.send.buffer.size
    The size of the backup outgoing ring buffer, must be power of 2.
    Default: 512

  • fej.replication.backup.send.wait.strategy
    The wait strategy to use for the backup outgoing ring buffer (see Disruptor WaitStrategy).
    Default: com.lmax.disruptor.BlockingWaitStrategy

  • fej.replication.aeron.mediadriver.embedded
    Use embedded aeron media driver (see Aeron Embedded Media Driver).
    Default: true

  • fej.replication.aeron.idle.strategy
    Provides an IdleStrategy for the thread responsible for communicating with the Aeron Media Driver (see Aeron Idle Strategies).
    Default: uk.co.real_logic.agrona.concurrent.BackoffIdleStrategy

Aeron Media Driver configuration

Please find description of Aeron configuration options at official page.

Replication service lifecycle

The goal of replication service is maintaining of full copies of the storages within a network. To do this it should support at least 2 operations:

  • synchronization of data to restore actual state

  • replication data in runtime to maintain the state

The service can replicate data in two modes - synchronous and asynchronous.

Sending messages asynchronously does not expect confirmation about the successful receipt and processing by another side. Replication is performed in parallel with the thread, which changes the data in storages.

Asynchronous replication expects to receive confirmation about the successful processing, at least, from one of the backup nodes. A processing timeout can be specified for the storage instance during its creating (otherwise the default settings will be used). The thread that changes the data in storages, is blocked until receiving acknowledgment or until expiring of the timeout. In the last case, the warning, that signals that the data was not successfully transmitted to any of the backup nodes during the specified period, will be logged.

Replication mode is set separately for each of the repositories, so it is possible to simultaneously use both the storage with synchronous replication and with asynchronous.

Each backup node maintains two transport channels because Aeron transport is unidirectional by its nature:

  • incoming: for sending synchronization requests and synchronous acknowledgments

  • outgoing: for receiving information about changing the data in storages

The listening port is assigned to each node of replication service, depends on its role. If you change the node’s role, the listening port is not changed. One and the same port will be always listened by leader and backups always know how to contact it. Same for the port that is listening by backups.

Persistent storages are designed for incremental updates. Internal storage contains log of operations like 'APPEND' and 'REMOVE'. In case of synchronization or replication data, it needs only to send new updates.

Data synchronization procedure

After starting the backup instance synchronizes its state with the leader.
The synchronization procedure is:

  1. The backup sends GET_ALL_RESOURCES request to the leader right after start.

  2. The leader sends lists of existing queries and sequences in response as 'RESOURCE_LIST' messages. CorrelationId is also sent for every storage. CorrelationId is unique id for storage. It uses in the next communications.

  3. The backup loads exist and create non-exist storages.

  4. The backup sends SYNC_REQ request for every storage and passes its index. The timestamp of the last reset is passed for a sequence in addition.

  5. The leader sends SYNC_REQ_ACCEPTED message in answer. It compares indexes and timestamps and resent all required data. For sequence, it can also send SEQUENCE_RESET message, if reset was missed. At the end, the leader sends SYNC_FINISHED message to indicate the border of synchronization answer.

Data replication procedure

On every operation with persistent storage, the leader sends updates to all backups.
The replication procedure is:

  1. On adding the new item to internal storage the leader sends QUEUE_ADD or SEQUENCE_APPEND message. New data and internal ordered index are sent with this message.

  2. The backup received this message and compares the expected index with received.

  3. If received index is different then expected, it starts synchronization procedure (see Data synchronization procedure, n.4)

  4. If the leader indicates that this is synchronous storage and it expects acknowledgment, the backup sends back ACK message.

FIX session replication

FIX Antenna Java also can use replication storages thru Persistence API.

To enable replicated storage for FIX session it needs to setup replication leader and backup (Initialize replication leader, Initialize replication backup) and use next configuration options for FIX antenna (in fixengine.properties) :

storageFactory=com.epam.fixengine.storage.persistence.PersistenceEnableFactory                                   //(1)
storage.persistenceFactoryBuilder=com.epam.fixengine.storage.persistence.ReplicatedPersistenceFactoryBuilder     //(2)
replicationTimeout=10                                                                                            //(3)
  1. Use PersistenceEnableFactory for storageFactory property. This factory allows to use Persistence API for storing FIX session state. PersistenceEnableFactory is based on FilesystemStorageFactory and delegate all operation Persistence API objects. To work with this API it requires implementation of PersistentFactoryBuilder. The last one should construct instance of PersistenceFactory.
  2. Define ReplicatedPersistenceFactoryBuilder like a factory builder for PersistenceEnableFactory. ReplicatedPersistenceFactoryBuilder implements PersistentFactoryBuilder and build replicated instance of factory. It uses replicationTimeout options from FIX antenna config (or from session’s Configuration instance) to configure synchronous or asynchronous replication more for FIX session.
  3. Define replication timeout in milliseconds. Zero value for this option will enable asynchronous replication for FIX session.


  • No labels