+ paused = false;
+
+ if (!isLeader()) {
+ if (!knownFrontends.isEmpty()) {
+ LOG.debug("{}: removing frontend state for {}", persistenceId(), knownFrontends.keySet());
+ knownFrontends = ImmutableMap.of();
+ }
+
+ requestMessageAssembler.close();
+
+ if (!hasLeader()) {
+ // No leader anywhere, nothing else to do
+ return;
+ }
+
+ // Another leader was elected. If we were the previous leader and had pending transactions, convert
+ // them to transaction messages and send to the new leader.
+ ActorSelection leader = getLeader();
+ if (leader != null) {
+ Collection<?> messagesToForward = convertPendingTransactionsToMessages();
+
+ if (!messagesToForward.isEmpty()) {
+ LOG.debug("{}: Forwarding {} pending transaction messages to leader {}", persistenceId(),
+ messagesToForward.size(), leader);
+
+ for (Object message : messagesToForward) {
+ LOG.debug("{}: Forwarding pending transaction message {}", persistenceId(), message);
+
+ leader.tell(message, self());
+ }
+ }
+ } else {
+ commitCoordinator.abortPendingTransactions("The transacton was aborted due to inflight leadership "
+ + "change and the leader address isn't available.", this);
+ }
+ } else {
+ // We have become the leader, we need to reconstruct frontend state
+ knownFrontends = Verify.verifyNotNull(frontendMetadata.toLeaderState(this));
+ LOG.debug("{}: became leader with frontend state for {}", persistenceId(), knownFrontends.keySet());
+ }
+
+ if (!isIsolatedLeader()) {
+ messageRetrySupport.retryMessages();
+ }
+ }
+
+ /**
+ * Clears all pending transactions and converts them to messages to be forwarded to a new leader.
+ *
+ * @return the converted messages
+ */
+ public Collection<?> convertPendingTransactionsToMessages() {
+ return commitCoordinator.convertPendingTransactionsToMessages(
+ datastoreContext.getShardBatchedModificationCount());
+ }
+
+ @Override
+ protected void pauseLeader(final Runnable operation) {
+ LOG.debug("{}: In pauseLeader, operation: {}", persistenceId(), operation);
+ paused = true;
+
+ // Tell-based protocol can replay transaction state, so it is safe to blow it up when we are paused.
+ knownFrontends.values().forEach(LeaderFrontendState::retire);
+ knownFrontends = ImmutableMap.of();
+
+ store.setRunOnPendingTransactionsComplete(operation);
+ }
+
+ @Override
+ protected void unpauseLeader() {
+ LOG.debug("{}: In unpauseLeader", persistenceId());
+ paused = false;
+
+ store.setRunOnPendingTransactionsComplete(null);
+
+ // Restore tell-based protocol state as if we were becoming the leader
+ knownFrontends = Verify.verifyNotNull(frontendMetadata.toLeaderState(this));
+ }
+
+ @Override
+ protected OnDemandRaftState.AbstractBuilder<?, ?> newOnDemandRaftStateBuilder() {
+ return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors())
+ .commitCohortActors(store.getCohortActors());