+ pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), this) {
+ @Override
+ protected void doRun() {
+ self().tell(PoisonPill.getInstance(), self());
+ }
+
+ @Override
+ protected void doCancel() {
+ self().tell(PoisonPill.getInstance(), self());
+ }
+ });
+ }
+ }
+
+ private void onLeaderTransitioning(final LeaderTransitioning leaderTransitioning) {
+ LOG.debug("{}: onLeaderTransitioning: {}", persistenceId(), leaderTransitioning);
+ Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
+ if (getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()
+ && leaderTransitioning.getLeaderId().equals(getCurrentBehavior().getLeaderId())) {
+ roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), null,
+ getCurrentBehavior().getLeaderPayloadVersion()), getSelf());
+ }
+ }
+
+ private void switchBehavior(SwitchBehavior message) {
+ if (!getRaftActorContext().getRaftPolicy().automaticElectionsEnabled()) {
+ RaftState newState = message.getNewState();
+ if (newState == RaftState.Leader || newState == RaftState.Follower) {
+ switchBehavior(behaviorStateTracker.capture(getCurrentBehavior()),
+ AbstractRaftActorBehavior.createBehavior(context, message.getNewState()));
+ getRaftActorContext().getTermInformation().updateAndPersist(message.getNewTerm(), "");
+ } else {
+ LOG.warn("Switching to behavior : {} - not supported", newState);
+ }
+ }
+ }
+
+ private void switchBehavior(final BehaviorState oldBehaviorState, final RaftActorBehavior nextBehavior) {
+ setCurrentBehavior(nextBehavior);
+ handleBehaviorChange(oldBehaviorState, nextBehavior);
+ }
+
+ @VisibleForTesting
+ RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
+ return new RaftActorSnapshotMessageSupport(context, getRaftActorSnapshotCohort());
+ }
+
+ private void onGetOnDemandRaftStats() {
+ // Debugging message to retrieve raft stats.
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ Map<String, Boolean> peerVotingStates = new HashMap<>();
+ for (PeerInfo info: context.getPeers()) {
+ peerVotingStates.put(info.getId(), info.isVoting());
+ peerAddresses.put(info.getId(), info.getAddress() != null ? info.getAddress() : "");
+ }
+
+ final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
+ OnDemandRaftState.AbstractBuilder<?, ?> builder = newOnDemandRaftStateBuilder()
+ .commitIndex(context.getCommitIndex())
+ .currentTerm(context.getTermInformation().getCurrentTerm())
+ .inMemoryJournalDataSize(replicatedLog().dataSize())
+ .inMemoryJournalLogSize(replicatedLog().size())
+ .isSnapshotCaptureInitiated(context.getSnapshotManager().isCapturing())
+ .lastApplied(context.getLastApplied())
+ .lastIndex(replicatedLog().lastIndex())
+ .lastTerm(replicatedLog().lastTerm())
+ .leader(getLeaderId())
+ .raftState(currentBehavior.state().toString())
+ .replicatedToAllIndex(currentBehavior.getReplicatedToAllIndex())
+ .snapshotIndex(replicatedLog().getSnapshotIndex())
+ .snapshotTerm(replicatedLog().getSnapshotTerm())
+ .votedFor(context.getTermInformation().getVotedFor())
+ .isVoting(context.isVotingMember())
+ .peerAddresses(peerAddresses)
+ .peerVotingStates(peerVotingStates)
+ .customRaftPolicyClassName(context.getConfigParams().getCustomRaftPolicyImplementationClass());
+
+ ReplicatedLogEntry lastLogEntry = replicatedLog().last();
+ if (lastLogEntry != null) {
+ builder.lastLogIndex(lastLogEntry.getIndex());
+ builder.lastLogTerm(lastLogEntry.getTerm());
+ }
+
+ if (getCurrentBehavior() instanceof AbstractLeader) {
+ AbstractLeader leader = (AbstractLeader)getCurrentBehavior();
+ Collection<String> followerIds = leader.getFollowerIds();
+ List<FollowerInfo> followerInfoList = Lists.newArrayListWithCapacity(followerIds.size());
+ for (String id: followerIds) {
+ final FollowerLogInformation info = leader.getFollower(id);
+ followerInfoList.add(new FollowerInfo(id, info.getNextIndex(), info.getMatchIndex(),
+ info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(info.timeSinceLastActivity()),
+ context.getPeerInfo(info.getId()).isVoting()));