+ boolean isLeaderActive = isLeaderActive();
+ if (isLeader() && isLeaderActive) {
+ handleBatchedModificationsLocal(batched, getSender());
+ } else {
+ ActorSelection leader = getLeader();
+ if (!isLeaderActive || leader == null) {
+ messageRetrySupport.addMessageToRetry(batched, getSender(),
+ "Could not commit transaction " + batched.getTransactionID());
+ } else {
+ // If this is not the first batch and leadership changed in between batched messages,
+ // we need to reconstruct previous BatchedModifications from the transaction
+ // DataTreeModification, honoring the max batched modification count, and forward all the
+ // previous BatchedModifications to the new leader.
+ Collection<BatchedModifications> newModifications = commitCoordinator.createForwardedBatchedModifications(
+ batched, datastoreContext.getShardBatchedModificationCount());
+
+ LOG.debug("{}: Forwarding {} BatchedModifications to leader {}", persistenceId(),
+ newModifications.size(), leader);
+
+ for(BatchedModifications bm: newModifications) {
+ leader.forward(bm, getContext());
+ }
+ }
+ }
+ }
+
+ private boolean failIfIsolatedLeader(ActorRef sender) {
+ if(isIsolatedLeader()) {
+ sender.tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
+ "Shard %s was the leader but has lost contact with all of its followers. Either all" +
+ " other follower nodes are down or this node is isolated by a network partition.",
+ persistenceId()))), getSelf());
+ return true;
+ }
+
+ return false;
+ }
+
+ protected boolean isIsolatedLeader() {
+ return getRaftState() == RaftState.IsolatedLeader;
+ }
+
+ private void handleReadyLocalTransaction(final ReadyLocalTransaction message) {
+ LOG.debug("{}: handleReadyLocalTransaction for {}", persistenceId(), message.getTransactionID());
+
+ boolean isLeaderActive = isLeaderActive();
+ if (isLeader() && isLeaderActive) {