+ @Override
+ protected final Optional<ActorRef> getRoleChangeNotifier() {
+ return roleChangeNotifier;
+ }
+
+ final String getShardName() {
+ return shardName;
+ }
+
+ @Override
+ protected final LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId,
+ final short leaderPayloadVersion) {
+ return isLeader() ? new ShardLeaderStateChanged(memberId, leaderId, store.getDataTree(), leaderPayloadVersion)
+ : new ShardLeaderStateChanged(memberId, leaderId, leaderPayloadVersion);
+ }
+
+ private void onDatastoreContext(final DatastoreContext context) {
+ datastoreContext = verifyNotNull(context);
+
+ setTransactionCommitTimeout();
+
+ setPersistence(datastoreContext.isPersistent());
+
+ updateConfigParams(datastoreContext.getShardRaftConfig());
+ }
+
+ // applyState() will be invoked once consensus is reached on the payload
+ // non-final for mocking
+ void persistPayload(final Identifier id, final Payload payload, final boolean batchHint) {
+ final boolean canSkipPayload = !hasFollowers() && !persistence().isRecoveryApplicable();
+ if (canSkipPayload) {
+ applyState(self(), id, payload);
+ } else {
+ // We are faking the sender
+ persistData(self(), id, payload, batchHint);
+ }
+ }
+
+ private void handleCommitTransaction(final CommitTransaction commit) {
+ final TransactionIdentifier txId = commit.getTransactionId();
+ if (isLeader()) {
+ askProtocolEncountered(txId);
+ commitCoordinator.handleCommit(txId, getSender(), this);
+ } else {
+ ActorSelection leader = getLeader();
+ if (leader == null) {
+ messageRetrySupport.addMessageToRetry(commit, getSender(), "Could not commit transaction " + txId);
+ } else {
+ LOG.debug("{}: Forwarding CommitTransaction to leader {}", persistenceId(), leader);
+ leader.forward(commit, getContext());
+ }