+
+ final boolean hasLeader = hasLeader();
+ if (!hasLeader) {
+ // No leader implies we are not the leader, lose frontend state if we have any. This also places
+ // an explicit guard so the map will not get modified accidentally.
+ if (!knownFrontends.isEmpty()) {
+ LOG.debug("{}: removing frontend state for {}", persistenceId(), knownFrontends.keySet());
+ knownFrontends = ImmutableMap.of();
+ }
+ return;
+ }
+
+ if (!isLeader()) {
+ // Another leader was elected. If we were the previous leader and had pending transactions, convert
+ // them to transaction messages and send to the new leader.
+ ActorSelection leader = getLeader();
+ if (leader != null) {
+ Collection<?> messagesToForward = convertPendingTransactionsToMessages();
+
+ if (!messagesToForward.isEmpty()) {
+ LOG.debug("{}: Forwarding {} pending transaction messages to leader {}", persistenceId(),
+ messagesToForward.size(), leader);
+
+ for (Object message : messagesToForward) {
+ leader.tell(message, self());
+ }
+ }
+ } else {
+ commitCoordinator.abortPendingTransactions("The transacton was aborted due to inflight leadership "
+ + "change and the leader address isn't available.", this);
+ }
+ } else {
+ // We have become the leader, we need to reconstruct frontend state
+ knownFrontends = Verify.verifyNotNull(frontendMetadata.toLeaderState(this));
+ LOG.debug("{}: became leader with frontend state for {}", persistenceId(), knownFrontends.keySet());
+ }
+
+ if (!isIsolatedLeader()) {
+ messageRetrySupport.retryMessages();
+ }
+ }
+
+ /**
+ * Clears all pending transactions and converts them to messages to be forwarded to a new leader.
+ *
+ * @return the converted messages
+ */
+ public Collection<?> convertPendingTransactionsToMessages() {
+ return commitCoordinator.convertPendingTransactionsToMessages(
+ datastoreContext.getShardBatchedModificationCount());
+ }
+
+ @Override
+ protected void pauseLeader(final Runnable operation) {
+ LOG.debug("{}: In pauseLeader, operation: {}", persistenceId(), operation);
+ store.setRunOnPendingTransactionsComplete(operation);
+ }
+
+ @Override
+ protected OnDemandRaftState.AbstractBuilder<?> newOnDemandRaftStateBuilder() {
+ return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors())
+ .dataChangeListenerActors(changeSupport.getListenerActors())
+ .commitCohortActors(store.getCohortActors());