+ paused = false;
+
+ if (!isLeader()) {
+ if (!knownFrontends.isEmpty()) {
+ LOG.debug("{}: removing frontend state for {}", persistenceId(), knownFrontends.keySet());
+ knownFrontends = ImmutableMap.of();
+ }
+
+ requestMessageAssembler.close();
+
+ if (!hasLeader()) {
+ // No leader anywhere, nothing else to do
+ return;
+ }
+
+ // Another leader was elected. If we were the previous leader and had pending transactions, convert
+ // them to transaction messages and send to the new leader.
+ ActorSelection leader = getLeader();
+ if (leader != null) {
+ // Clears all pending transactions and converts them to messages to be forwarded to a new leader.
+ Collection<?> messagesToForward = commitCoordinator.convertPendingTransactionsToMessages(
+ datastoreContext.getShardBatchedModificationCount());
+
+ if (!messagesToForward.isEmpty()) {
+ LOG.debug("{}: Forwarding {} pending transaction messages to leader {}", persistenceId(),
+ messagesToForward.size(), leader);
+
+ for (Object message : messagesToForward) {
+ LOG.debug("{}: Forwarding pending transaction message {}", persistenceId(), message);
+
+ leader.tell(message, self());
+ }
+ }
+ } else {
+ commitCoordinator.abortPendingTransactions("The transacton was aborted due to inflight leadership "
+ + "change and the leader address isn't available.", this);
+ }
+ } else {
+ // We have become the leader, we need to reconstruct frontend state
+ knownFrontends = verifyNotNull(frontendMetadata.toLeaderState(this));
+ LOG.debug("{}: became leader with frontend state for {}", persistenceId(), knownFrontends.keySet());
+ }
+
+ if (!isIsolatedLeader()) {
+ messageRetrySupport.retryMessages();
+ }
+ }
+
+ @Override
+ protected final void pauseLeader(final Runnable operation) {
+ LOG.debug("{}: In pauseLeader, operation: {}", persistenceId(), operation);
+ paused = true;
+
+ // Tell-based protocol can replay transaction state, so it is safe to blow it up when we are paused.
+ knownFrontends.values().forEach(LeaderFrontendState::retire);
+ knownFrontends = ImmutableMap.of();
+
+ store.setRunOnPendingTransactionsComplete(operation);
+ }
+
+ @Override
+ protected final void unpauseLeader() {
+ LOG.debug("{}: In unpauseLeader", persistenceId());
+ paused = false;
+
+ store.setRunOnPendingTransactionsComplete(null);
+
+ // Restore tell-based protocol state as if we were becoming the leader
+ knownFrontends = verifyNotNull(frontendMetadata.toLeaderState(this));