Add voting state to shard mbean FollowerInfo
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
index a5b5d2b81aa3d78f4e1bc91bd81d1abdd50eef03..933fde98b9b432f60b545541d0b05cc03ef8da0d 100644 (file)
@@ -12,7 +12,6 @@ package org.opendaylight.controller.cluster.raft;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.PoisonPill;
-import akka.japi.Procedure;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
@@ -52,6 +51,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftS
 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.concepts.Immutable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -205,7 +205,24 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         handleBehaviorChange(state, newBehavior);
     }
 
+    /**
+     * Method exposed for subclasses to plug-in their logic. This method is invoked by {@link #handleCommand(Object)}
+     * for messages which are not handled by this class. Subclasses overriding this class should fall back to this
+     * implementation for messages which they do not handle
+     *
+     * @param message Incoming command message
+     */
+    protected void handleNonRaftCommand(final Object message) {
+        unhandled(message);
+    }
+
+    /**
+     * @deprecated This method is not final for testing purposes. DO NOT OVERRIDE IT, override
+     * {@link #handleNonRaftCommand(Object)} instead.
+     */
+    @Deprecated
     @Override
+    // FIXME: make this method final once our unit tests do not need to override it
     protected void handleCommand(final Object message) {
         if (serverConfigurationSupport.handleMessage(message, getSender())) {
             return;
@@ -217,11 +234,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         if (message instanceof ApplyState) {
             ApplyState applyState = (ApplyState) message;
 
-            long elapsedTime = (System.nanoTime() - applyState.getStartTime());
-            if(elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS){
-                LOG.warn("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
-                        TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState);
-            }
+            long startTime = System.nanoTime();
 
             if(LOG.isDebugEnabled()) {
                 LOG.debug("{}: Applying state for log index {} data {}",
@@ -232,6 +245,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             applyState(applyState.getClientActor(), applyState.getIdentifier(),
                 applyState.getReplicatedLogEntry().getData());
 
+            long elapsedTime = System.nanoTime() - startTime;
+            if(elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS){
+                LOG.debug("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
+                        TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState);
+            }
+
             if (!hasFollowers()) {
                 // for single node, the capture should happen after the apply state
                 // as we delete messages from the persistent journal which have made it to the snapshot
@@ -271,8 +290,16 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             // Processing the message may affect the state, hence we need to capture it
             final RaftActorBehavior currentBehavior = getCurrentBehavior();
             final BehaviorState state = behaviorStateTracker.capture(currentBehavior);
+
+            // A behavior indicates that it processed the change by returning a reference to the next behavior
+            // to be used. A null return indicates it has not processed the message and we should be passing it to
+            // the subclass for handling.
             final RaftActorBehavior nextBehavior = currentBehavior.handleMessage(getSender(), message);
-            switchBehavior(state, nextBehavior);
+            if (nextBehavior != null) {
+                switchBehavior(state, nextBehavior);
+            } else {
+                handleNonRaftCommand(message);
+            }
         }
     }
 
@@ -382,8 +409,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         // Debugging message to retrieve raft stats.
 
         Map<String, String> peerAddresses = new HashMap<>();
-        for(String peerId: context.getPeerIds()) {
-            peerAddresses.put(peerId, context.getPeerAddress(peerId));
+        Map<String, Boolean> peerVotingStates = new HashMap<>();
+        for(PeerInfo info: context.getPeers()) {
+            peerVotingStates.put(info.getId(), info.isVoting());
+            peerAddresses.put(info.getId(), info.getAddress() != null ? info.getAddress() : "");
         }
 
         final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
@@ -402,7 +431,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 .snapshotIndex(replicatedLog().getSnapshotIndex())
                 .snapshotTerm(replicatedLog().getSnapshotTerm())
                 .votedFor(context.getTermInformation().getVotedFor())
+                .isVoting(context.isVotingMember())
                 .peerAddresses(peerAddresses)
+                .peerVotingStates(peerVotingStates)
                 .customRaftPolicyClassName(context.getConfigParams().getCustomRaftPolicyImplementationClass());
 
         ReplicatedLogEntry lastLogEntry = replicatedLog().last();
@@ -418,7 +449,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             for(String id: followerIds) {
                 final FollowerLogInformation info = leader.getFollower(id);
                 followerInfoList.add(new FollowerInfo(id, info.getNextIndex(), info.getMatchIndex(),
-                        info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(info.timeSinceLastActivity())));
+                        info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(info.timeSinceLastActivity()),
+                        context.getPeerInfo(info.getId()).isVoting()));
             }
 
             builder.followerInfoList(followerInfoList);
@@ -485,8 +517,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      * @param identifier
      * @param data
      */
-    protected void persistData(final ActorRef clientActor, final String identifier,
-        final Payload data) {
+    protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data) {
 
         ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
             context.getReplicatedLog().lastIndex() + 1,
@@ -498,28 +529,25 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         final RaftActorContext raftContext = getRaftActorContext();
 
-        replicatedLog().appendAndPersist(replicatedLogEntry, new Procedure<ReplicatedLogEntry>() {
-            @Override
-            public void apply(ReplicatedLogEntry replicatedLogEntry) {
-                if (!hasFollowers()){
-                    // Increment the Commit Index and the Last Applied values
-                    raftContext.setCommitIndex(replicatedLogEntry.getIndex());
-                    raftContext.setLastApplied(replicatedLogEntry.getIndex());
+        replicatedLog().appendAndPersist(replicatedLogEntry, replicatedLogEntry1 -> {
+            if (!hasFollowers()){
+                // Increment the Commit Index and the Last Applied values
+                raftContext.setCommitIndex(replicatedLogEntry1.getIndex());
+                raftContext.setLastApplied(replicatedLogEntry1.getIndex());
 
-                    // Apply the state immediately.
-                    self().tell(new ApplyState(clientActor, identifier, replicatedLogEntry), self());
+                // Apply the state immediately.
+                self().tell(new ApplyState(clientActor, identifier, replicatedLogEntry1), self());
 
-                    // Send a ApplyJournalEntries message so that we write the fact that we applied
-                    // the state to durable storage
-                    self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self());
+                // Send a ApplyJournalEntries message so that we write the fact that we applied
+                // the state to durable storage
+                self().tell(new ApplyJournalEntries(replicatedLogEntry1.getIndex()), self());
 
-                } else if (clientActor != null) {
-                    context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry);
+            } else if (clientActor != null) {
+                context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry1);
 
-                    // Send message for replication
-                    getCurrentBehavior().handleMessage(getSelf(),
-                            new Replicate(clientActor, identifier, replicatedLogEntry));
-                }
+                // Send message for replication
+                getCurrentBehavior().handleMessage(getSelf(),
+                        new Replicate(clientActor, identifier, replicatedLogEntry1));
             }
         });
     }
@@ -697,8 +725,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      * @param data        A piece of data that was persisted by the persistData call.
      *                    This should NEVER be null.
      */
-    protected abstract void applyState(ActorRef clientActor, String identifier,
-        Object data);
+    protected abstract void applyState(ActorRef clientActor, Identifier identifier, Object data);
 
     /**
      * Returns the RaftActorRecoveryCohort to participate in persistence recovery.