+
+ boolean hasLeader = hasLeader();
+ if(hasLeader && !isLeader()) {
+ // Another leader was elected. If we were the previous leader and had pending transactions, convert
+ // them to transaction messages and send to the new leader.
+ ActorSelection leader = getLeader();
+ if(leader != null) {
+ Collection<Object> messagesToForward = commitCoordinator.convertPendingTransactionsToMessages(
+ datastoreContext.getShardBatchedModificationCount());
+
+ if(!messagesToForward.isEmpty()) {
+ LOG.debug("{}: Forwarding {} pending transaction messages to leader {}", persistenceId(),
+ messagesToForward.size(), leader);
+
+ for(Object message: messagesToForward) {
+ leader.tell(message, self());
+ }
+ }
+ } else {
+ commitCoordinator.abortPendingTransactions(
+ "The transacton was aborted due to inflight leadership change and the leader address isn't available.",
+ this);
+ }
+ }
+
+ if(hasLeader && !isIsolatedLeader()) {
+ messageRetrySupport.retryMessages();
+ }
+ }
+
+ @Override
+ protected void pauseLeader(Runnable operation) {
+ LOG.debug("{}: In pauseLeader, operation: {}", persistenceId(), operation);
+ commitCoordinator.setRunOnPendingTransactionsComplete(operation);