Remove DelegatingRaftActorBehavior 60/36160/11
authorRobert Varga <rovarga@cisco.com>
Tue, 15 Mar 2016 13:32:58 +0000 (14:32 +0100)
committerRobert Varga <rovarga@cisco.com>
Tue, 29 Mar 2016 13:45:11 +0000 (15:45 +0200)
The delegate is leaked through various methods, implementations of which already have access
to the current behavior if it were available from RaftActorContext. Simplify calling
conventions

Change-Id: I9e27f68e55f28a9afd446abff91fbb38dd26c011
Signed-off-by: Robert Varga <rovarga@cisco.com>
28 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/DelegatingRaftActorBehavior.java [deleted file]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeaderElectionScenarioTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeaderTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/DelayedMessagesElectionScenarioTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeaderTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/PartitionedCandidateOnStartupElectionScenarioTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/PartitionedLeadersElectionScenarioTest.java

index a36807a8c8a4bb46cc0e7743f8daa1079615e605..fdd4b2395bff7b3e9b8ef746450761a7ec41f323 100644 (file)
@@ -41,7 +41,6 @@ import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior;
-import org.opendaylight.controller.cluster.raft.behaviors.DelegatingRaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
@@ -100,12 +99,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     protected final Logger LOG = LoggerFactory.getLogger(getClass());
 
-    /**
-     * The current state determines the current behavior of a RaftActor
-     * A Raft Actor always starts off in the Follower State
-     */
-    private final DelegatingRaftActorBehavior currentBehavior = new DelegatingRaftActorBehavior();
-
     /**
      * This context should NOT be passed directly to any other actor it is
      * only to be consumed by the RaftActorBehaviors
@@ -143,7 +136,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             delegatingPersistenceProvider, LOG);
 
         context.setPayloadVersion(payloadVersion);
-        context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, currentBehavior));
+        context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
     }
 
     @Override
@@ -159,14 +152,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     @Override
     public void postStop() {
-        if(currentBehavior.getDelegate() != null) {
-            try {
-                currentBehavior.close();
-            } catch (Exception e) {
-                LOG.debug("{}: Error closing behavior {}", persistenceId(), currentBehavior.state());
-            }
-        }
-
+        context.close();
         super.postStop();
     }
 
@@ -194,13 +180,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     protected RaftActorRecoverySupport newRaftActorRecoverySupport() {
-        return new RaftActorRecoverySupport(context, currentBehavior, getRaftActorRecoveryCohort());
+        return new RaftActorRecoverySupport(context, getRaftActorRecoveryCohort());
     }
 
-    protected void initializeBehavior(){
+    @VisibleForTesting
+    void initializeBehavior(){
         changeCurrentBehavior(new Follower(context));
     }
 
+    @VisibleForTesting
     protected void changeCurrentBehavior(RaftActorBehavior newBehavior){
         if(getCurrentBehavior() != null) {
             try {
@@ -249,7 +237,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 // and recovery shows data missing
                 context.getReplicatedLog().captureSnapshotIfReady(applyState.getReplicatedLogEntry());
 
-                context.getSnapshotManager().trimLog(context.getLastApplied(), currentBehavior);
+                context.getSnapshotManager().trimLog(context.getLastApplied());
             }
 
         } else if (message instanceof ApplyJournalEntries) {
@@ -315,6 +303,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
 
         shuttingDown = true;
+
+        final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
         if(currentBehavior.state() == RaftState.Leader && context.hasFollowers()) {
             initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
                 @Override
@@ -349,9 +339,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     private void onLeaderTransitioning() {
         LOG.debug("{}: onLeaderTransitioning", persistenceId());
         Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
-        if(currentBehavior.state() == RaftState.Follower && roleChangeNotifier.isPresent()) {
+        if(getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()) {
             roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), null,
-                    currentBehavior.getLeaderPayloadVersion()), getSelf());
+                getCurrentBehavior().getLeaderPayloadVersion()), getSelf());
         }
     }
 
@@ -375,9 +365,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior());
     }
 
-    protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
-        return new RaftActorSnapshotMessageSupport(context, currentBehavior,
-                getRaftActorSnapshotCohort());
+    @VisibleForTesting
+    RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
+        return new RaftActorSnapshotMessageSupport(context, getRaftActorSnapshotCohort());
     }
 
     private void onGetOnDemandRaftStats() {
@@ -388,6 +378,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             peerAddresses.put(peerId, context.getPeerAddress(peerId));
         }
 
+        final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
         OnDemandRaftState.Builder builder = OnDemandRaftState.builder()
                 .commitIndex(context.getCommitIndex())
                 .currentTerm(context.getTermInformation().getCurrentTerm())
@@ -406,7 +397,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 .peerAddresses(peerAddresses)
                 .customRaftPolicyClassName(context.getConfigParams().getCustomRaftPolicyImplementationClass());
 
-        ReplicatedLogEntry lastLogEntry = getLastLogEntry();
+        ReplicatedLogEntry lastLogEntry = replicatedLog().last();
         if (lastLogEntry != null) {
             builder.lastLogIndex(lastLogEntry.getIndex());
             builder.lastLogTerm(lastLogEntry.getTerm());
@@ -518,7 +509,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                     context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry);
 
                     // Send message for replication
-                    currentBehavior.handleMessage(getSelf(),
+                    getCurrentBehavior().handleMessage(getSelf(),
                             new Replicate(clientActor, identifier, replicatedLogEntry));
                 }
             }
@@ -535,11 +526,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     @VisibleForTesting
     void setCurrentBehavior(RaftActorBehavior behavior) {
-        currentBehavior.setDelegate(behavior);
+        context.setCurrentBehavior(behavior);
     }
 
     protected RaftActorBehavior getCurrentBehavior() {
-        return currentBehavior.getDelegate();
+        return context.getCurrentBehavior();
     }
 
     /**
@@ -549,11 +540,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      * @return true it this RaftActor is a Leader false otherwise
      */
     protected boolean isLeader() {
-        return context.getId().equals(currentBehavior.getLeaderId());
+        return context.getId().equals(getCurrentBehavior().getLeaderId());
     }
 
-    protected boolean isLeaderActive() {
-        return currentBehavior.state() != RaftState.IsolatedLeader && !shuttingDown &&
+    protected final boolean isLeaderActive() {
+        return getRaftState() != RaftState.IsolatedLeader && !shuttingDown &&
                 !isLeadershipTransferInProgress();
     }
 
@@ -582,30 +573,19 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      *
      * @return the current leader's id
      */
-    protected String getLeaderId(){
-        return currentBehavior.getLeaderId();
-    }
-
-    protected RaftState getRaftState() {
-        return currentBehavior.state();
+    protected final String getLeaderId(){
+        return getCurrentBehavior().getLeaderId();
     }
 
-    protected ReplicatedLogEntry getLastLogEntry() {
-        return replicatedLog().last();
+    @VisibleForTesting
+    protected final RaftState getRaftState() {
+        return getCurrentBehavior().state();
     }
 
     protected Long getCurrentTerm(){
         return context.getTermInformation().getCurrentTerm();
     }
 
-    protected Long getCommitIndex(){
-        return context.getCommitIndex();
-    }
-
-    protected Long getLastApplied(){
-        return context.getLastApplied();
-    }
-
     protected RaftActorContext getRaftActorContext() {
         return context;
     }
@@ -625,7 +605,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             // The RaftPolicy was modified. If the current behavior is Follower then re-initialize to Follower
             // but transfer the previous leaderId so it doesn't immediately try to schedule an election. This
             // avoids potential disruption. Otherwise, switch to Follower normally.
-            RaftActorBehavior behavior = currentBehavior.getDelegate();
+            RaftActorBehavior behavior = getCurrentBehavior();
             if(behavior instanceof Follower) {
                 String previousLeaderId = ((Follower)behavior).getLeaderId();
                 short previousLeaderPayloadVersion = behavior.getLeaderPayloadVersion();
@@ -757,13 +737,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         operation.run();
     }
 
-    protected void onLeaderChanged(String oldLeader, String newLeader){};
+    protected void onLeaderChanged(String oldLeader, String newLeader) {
+
+    };
 
     private String getLeaderAddress(){
         if(isLeader()){
             return getSelf().path().toString();
         }
-        String leaderId = currentBehavior.getLeaderId();
+        String leaderId = getLeaderId();
         if (leaderId == null) {
             return null;
         }
@@ -783,11 +765,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     private void captureSnapshot() {
         SnapshotManager snapshotManager = context.getSnapshotManager();
 
-        if(!snapshotManager.isCapturing()) {
+        if (!snapshotManager.isCapturing()) {
+            final long idx = getCurrentBehavior().getReplicatedToAllIndex();
             LOG.debug("Take a snapshot of current state. lastReplicatedLog is {} and replicatedToAllIndex is {}",
-                replicatedLog().last(), currentBehavior.getReplicatedToAllIndex());
+                replicatedLog().last(), idx);
 
-            snapshotManager.capture(replicatedLog().last(), currentBehavior.getReplicatedToAllIndex());
+            snapshotManager.capture(replicatedLog().last(), idx);
         }
     }
 
@@ -879,7 +862,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             if(this.message instanceof SwitchBehavior){
                 return AbstractRaftActorBehavior.createBehavior(context, ((SwitchBehavior) message).getNewState());
             }
-            return currentBehavior.handleMessage(sender, message);
+            return getCurrentBehavior().handleMessage(sender, message);
         }
     }
 }
index 6f941d7dbea306a2f0150aebfe27bc32c01c1ca8..267a4d2b7e995894d8feb7a36261f24653d59e74 100644 (file)
@@ -17,6 +17,7 @@ import com.google.common.base.Supplier;
 import java.util.Collection;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
 import org.slf4j.Logger;
 
@@ -241,4 +242,9 @@ public interface RaftActorContext {
      * @return true if this RaftActor is a voting member of the cluster, false otherwise.
      */
     boolean isVotingMember();
+
+    /**
+     * @return current behavior attached to the raft actor.
+     */
+    RaftActorBehavior getCurrentBehavior();
 }
index 0bf71ab7991b3e87bb4c6afd02577f84d749a730..8b28d5213fbbe45c51bb1a260f5fe324a9a9c40b 100644 (file)
@@ -14,6 +14,7 @@ import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -24,6 +25,7 @@ import java.util.Map;
 import java.util.Set;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
 import org.slf4j.Logger;
 
@@ -64,6 +66,8 @@ public class RaftActorContextImpl implements RaftActorContext {
 
     private boolean votingMember = true;
 
+    private RaftActorBehavior currentBehavior;
+
     public RaftActorContextImpl(ActorRef actor, ActorContext context, String id,
             ElectionTerm termInformation, long commitIndex, long lastApplied, Map<String, String> peerAddresses,
             ConfigParams configParams, DataPersistenceProvider persistenceProvider, Logger logger) {
@@ -323,4 +327,23 @@ public class RaftActorContextImpl implements RaftActorContext {
     public boolean isVotingMember() {
         return votingMember;
     }
+
+    @Override
+    public RaftActorBehavior getCurrentBehavior() {
+        return currentBehavior;
+    }
+
+    void setCurrentBehavior(final RaftActorBehavior behavior) {
+        this.currentBehavior = Preconditions.checkNotNull(behavior);
+    }
+
+    void close() {
+        if (currentBehavior != null) {
+            try {
+                currentBehavior.close();
+            } catch (Exception e) {
+                LOG.debug("{}: Error closing behavior {}", getId(), currentBehavior.state());
+            }
+        }
+    }
 }
index 923e1f1c0daa9001aef9e4b26139097c34544f3d..ae7fdeadbc12a657c6e6c33189a1b7e36c16d687 100644 (file)
@@ -20,7 +20,6 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
-import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.slf4j.Logger;
 
 /**
@@ -30,7 +29,6 @@ import org.slf4j.Logger;
  */
 class RaftActorRecoverySupport {
     private final RaftActorContext context;
-    private final RaftActorBehavior currentBehavior;
     private final RaftActorRecoveryCohort cohort;
 
     private int currentRecoveryBatchCount;
@@ -40,10 +38,8 @@ class RaftActorRecoverySupport {
     private Stopwatch recoveryTimer;
     private final Logger log;
 
-    RaftActorRecoverySupport(RaftActorContext context, RaftActorBehavior currentBehavior,
-            RaftActorRecoveryCohort cohort) {
+    RaftActorRecoverySupport(final RaftActorContext context, final RaftActorRecoveryCohort cohort) {
         this.context = context;
-        this.currentBehavior = currentBehavior;
         this.cohort = cohort;
         this.log = context.getLogger();
     }
@@ -167,7 +163,7 @@ class RaftActorRecoverySupport {
         // The replicated log can be used later on to retrieve this snapshot
         // when we need to install it on a peer
 
-        context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, currentBehavior));
+        context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context));
         context.setLastApplied(snapshot.getLastAppliedIndex());
         context.setCommitIndex(snapshot.getLastAppliedIndex());
         context.getTermInformation().update(snapshot.getElectionTerm(), snapshot.getElectionVotedFor());
index 09020812bd5c9212c14c90161b4d49a1be95c537..7012e0db86f8847b3660f78de8cefb3a1f8c7a6a 100644 (file)
@@ -120,7 +120,7 @@ class RaftActorServerConfigurationSupport {
 
     private void onRemoveServer(RemoveServer removeServer, ActorRef sender) {
         LOG.debug("{}: onRemoveServer: {}, state: {}", raftContext.getId(), removeServer, currentOperationState);
-        boolean isSelf = removeServer.getServerId().equals(raftActor.getId());
+        boolean isSelf = removeServer.getServerId().equals(raftContext.getId());
         if(isSelf && !raftContext.hasFollowers()) {
             sender.tell(new RemoveServerReply(ServerChangeStatus.NOT_SUPPORTED, raftActor.getLeaderId()),
                     raftActor.getSelf());
@@ -316,7 +316,7 @@ class RaftActorServerConfigurationSupport {
             // Sanity check - we could get an ApplyState from a previous operation that timed out so make
             // sure it's meant for us.
             if(operationContext.getContextId().equals(applyState.getIdentifier())) {
-                LOG.info("{}: {} has been successfully replicated to a majority of followers", raftActor.getId(),
+                LOG.info("{}: {} has been successfully replicated to a majority of followers", raftContext.getId(),
                         applyState.getReplicatedLogEntry().getData());
 
                 timer.cancel();
index 7750cc89230430c4c9e780b8a9f72dac5a0c19de..8b6871174180d8d372bcc2eb075ebef045ba6c3d 100644 (file)
@@ -18,7 +18,6 @@ import org.apache.commons.lang3.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
-import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply;
 import org.slf4j.Logger;
@@ -33,7 +32,6 @@ class RaftActorSnapshotMessageSupport {
     static final String COMMIT_SNAPSHOT = "commit_snapshot";
 
     private final RaftActorContext context;
-    private final RaftActorBehavior currentBehavior;
     private final RaftActorSnapshotCohort cohort;
     private final Logger log;
 
@@ -53,10 +51,8 @@ class RaftActorSnapshotMessageSupport {
 
     private Duration snapshotReplyActorTimeout = Duration.create(30, TimeUnit.SECONDS);
 
-    RaftActorSnapshotMessageSupport(RaftActorContext context, RaftActorBehavior currentBehavior,
-            RaftActorSnapshotCohort cohort) {
+    RaftActorSnapshotMessageSupport(final RaftActorContext context, final RaftActorSnapshotCohort cohort) {
         this.context = context;
-        this.currentBehavior = currentBehavior;
         this.cohort = cohort;
         this.log = context.getLogger();
 
@@ -74,7 +70,7 @@ class RaftActorSnapshotMessageSupport {
         } else if (message instanceof CaptureSnapshotReply) {
             onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
         } else if (message.equals(COMMIT_SNAPSHOT)) {
-            context.getSnapshotManager().commit(-1, currentBehavior);
+            context.getSnapshotManager().commit(-1);
         } else if (message instanceof GetSnapshot) {
             onGetSnapshot(sender);
         } else {
@@ -87,7 +83,7 @@ class RaftActorSnapshotMessageSupport {
     private void onCaptureSnapshotReply(byte[] snapshotBytes) {
         log.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", context.getId(), snapshotBytes.length);
 
-        context.getSnapshotManager().persist(snapshotBytes, currentBehavior, context.getTotalMemory());
+        context.getSnapshotManager().persist(snapshotBytes, context.getTotalMemory());
     }
 
     private void onSaveSnapshotFailure(SaveSnapshotFailure saveSnapshotFailure) {
@@ -102,7 +98,7 @@ class RaftActorSnapshotMessageSupport {
 
         long sequenceNumber = success.metadata().sequenceNr();
 
-        context.getSnapshotManager().commit(sequenceNumber, currentBehavior);
+        context.getSnapshotManager().commit(sequenceNumber);
     }
 
     private void onApplySnapshot(ApplySnapshot message) {
index 6b4427d5ce39078c772e0ea5f33953cc421712c8..90042907a5f79e2d13d590af19d3782a8a269548 100644 (file)
@@ -12,7 +12,6 @@ import com.google.common.base.Preconditions;
 import java.util.Collections;
 import java.util.List;
 import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
-import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 
 /**
  * Implementation of ReplicatedLog used by the RaftActor.
@@ -22,7 +21,6 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
 
     private long dataSizeSinceLastSnapshot = 0L;
     private final RaftActorContext context;
-    private final RaftActorBehavior currentBehavior;
 
     private final Procedure<DeleteEntries> deleteProcedure = new Procedure<DeleteEntries>() {
         @Override
@@ -30,22 +28,19 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
         }
     };
 
-    static ReplicatedLog newInstance(final Snapshot snapshot, final RaftActorContext context,
-            final RaftActorBehavior currentBehavior) {
+    static ReplicatedLog newInstance(final Snapshot snapshot, final RaftActorContext context) {
         return new ReplicatedLogImpl(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
-                snapshot.getUnAppliedEntries(), context, currentBehavior);
+                snapshot.getUnAppliedEntries(), context);
     }
 
-    static ReplicatedLog newInstance(final RaftActorContext context, final RaftActorBehavior currentBehavior) {
-        return new ReplicatedLogImpl(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), context,
-                currentBehavior);
+    static ReplicatedLog newInstance(final RaftActorContext context) {
+        return new ReplicatedLogImpl(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), context);
     }
 
     private ReplicatedLogImpl(final long snapshotIndex, final long snapshotTerm, final List<ReplicatedLogEntry> unAppliedEntries,
-            final RaftActorContext context, final RaftActorBehavior currentBehavior) {
+            final RaftActorContext context) {
         super(snapshotIndex, snapshotTerm, unAppliedEntries);
         this.context = Preconditions.checkNotNull(context);
-        this.currentBehavior = Preconditions.checkNotNull(currentBehavior);
     }
 
     @Override
@@ -72,7 +67,7 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
                 || getDataSizeForSnapshotCheck() > dataThreshold)) {
 
             boolean started = context.getSnapshotManager().capture(replicatedLogEntry,
-                    currentBehavior.getReplicatedToAllIndex());
+                    context.getCurrentBehavior().getReplicatedToAllIndex());
             if (started) {
                 if (!context.hasFollowers()) {
                     dataSizeSinceLastSnapshot = 0;
index 197fa867156265eb0b7ddda8bbab7586a40dea3b..7109980f3d5f33ca735853544b42ad718985e99b 100644 (file)
@@ -72,13 +72,13 @@ public class SnapshotManager implements SnapshotState {
     }
 
     @Override
-    public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) {
-        currentState.persist(snapshotBytes, currentBehavior, totalMemory);
+    public void persist(final byte[] snapshotBytes, final long totalMemory) {
+        currentState.persist(snapshotBytes, totalMemory);
     }
 
     @Override
-    public void commit(long sequenceNumber, RaftActorBehavior currentBehavior) {
-        currentState.commit(sequenceNumber, currentBehavior);
+    public void commit(final long sequenceNumber) {
+        currentState.commit(sequenceNumber);
     }
 
     @Override
@@ -87,8 +87,8 @@ public class SnapshotManager implements SnapshotState {
     }
 
     @Override
-    public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
-        return currentState.trimLog(desiredTrimIndex, currentBehavior);
+    public long trimLog(final long desiredTrimIndex) {
+        return currentState.trimLog(desiredTrimIndex);
     }
 
     public void setCreateSnapshotCallable(Procedure<Void> createSnapshotProcedure) {
@@ -172,12 +172,12 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
-        public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) {
+        public void persist(final byte[] snapshotBytes, final long totalMemory) {
             LOG.debug("persist should not be called in state {}", this);
         }
 
         @Override
-        public void commit(long sequenceNumber, RaftActorBehavior currentBehavior) {
+        public void commit(final long sequenceNumber) {
             LOG.debug("commit should not be called in state {}", this);
         }
 
@@ -187,12 +187,12 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
-        public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
+        public long trimLog(final long desiredTrimIndex) {
             LOG.debug("trimLog should not be called in state {}", this);
             return -1;
         }
 
-        protected long doTrimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior){
+        protected long doTrimLog(final long desiredTrimIndex) {
             //  we would want to keep the lastApplied as its used while capturing snapshots
             long lastApplied = context.getLastApplied();
             long tempMin = Math.min(desiredTrimIndex, (lastApplied > -1 ? lastApplied - 1 : -1));
@@ -211,7 +211,10 @@ public class SnapshotManager implements SnapshotState {
                 context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm());
                 context.getReplicatedLog().snapshotCommit();
                 return tempMin;
-            } else if(tempMin > currentBehavior.getReplicatedToAllIndex()) {
+            }
+
+            final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
+            if(tempMin > currentBehavior.getReplicatedToAllIndex()) {
                 // It's possible a follower was lagging and an install snapshot advanced its match index past
                 // the current replicatedToAllIndex. Since the follower is now caught up we should advance the
                 // replicatedToAllIndex (to tempMin). The fact that tempMin wasn't found in the log is likely
@@ -286,15 +289,15 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
-        public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
-            return doTrimLog(desiredTrimIndex, currentBehavior);
+        public long trimLog(final long desiredTrimIndex) {
+            return doTrimLog(desiredTrimIndex);
         }
     }
 
     private class Creating extends AbstractSnapshotState {
 
         @Override
-        public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) {
+        public void persist(final byte[] snapshotBytes, final long totalMemory) {
             // create a snapshot object from the state provided and save it
             // when snapshot is saved async, SaveSnapshotSuccess is raised.
 
@@ -316,6 +319,7 @@ public class SnapshotManager implements SnapshotState {
             boolean logSizeExceededSnapshotBatchCount =
                     context.getReplicatedLog().size() >= context.getConfigParams().getSnapshotBatchCount();
 
+            final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
             if (dataSizeThresholdExceeded || logSizeExceededSnapshotBatchCount) {
                 if(LOG.isDebugEnabled()) {
                     if(dataSizeThresholdExceeded) {
@@ -381,7 +385,7 @@ public class SnapshotManager implements SnapshotState {
     private class Persisting extends AbstractSnapshotState {
 
         @Override
-        public void commit(long sequenceNumber, RaftActorBehavior currentBehavior) {
+        public void commit(final long sequenceNumber) {
             LOG.debug("{}: Snapshot success -  sequence number: {}", persistenceId(), sequenceNumber);
 
             if(applySnapshot != null) {
@@ -389,7 +393,7 @@ public class SnapshotManager implements SnapshotState {
                     Snapshot snapshot = applySnapshot.getSnapshot();
 
                     //clears the followers log, sets the snapshot index to ensure adjusted-index works
-                    context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, currentBehavior));
+                    context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context));
                     context.setLastApplied(snapshot.getLastAppliedIndex());
                     context.setCommitIndex(snapshot.getLastAppliedIndex());
                     context.getTermInformation().update(snapshot.getElectionTerm(), snapshot.getElectionVotedFor());
index f5a175f3891de08defa6a0a7cefccf816fbf8213..5d1304fe752222214a82b9d877da996dc517feb4 100644 (file)
@@ -9,7 +9,6 @@
 package org.opendaylight.controller.cluster.raft;
 
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
-import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 
 public interface SnapshotState {
     /**
@@ -52,14 +51,14 @@ public interface SnapshotState {
      * @param currentBehavior
      * @param totalMemory
      */
-    void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory);
+    void persist(byte[] snapshotBytes, long totalMemory);
 
     /**
      * Commit the snapshot by trimming the log
      *
      * @param sequenceNumber
      */
-    void commit(long sequenceNumber, RaftActorBehavior currentBehavior);
+    void commit(long sequenceNumber);
 
     /**
      * Rollback the snapshot
@@ -72,5 +71,5 @@ public interface SnapshotState {
      * @param desiredTrimIndex
      * @return the actual trim index
      */
-    long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior);
+    long trimLog(long desiredTrimIndex);
 }
index 5394d65e2a347d27b9a119207a211aa914e6c5cf..237e5eeaae1e001388fad692520c1b64d0a43125 100644 (file)
@@ -495,7 +495,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
      * @param snapshotCapturedIndex
      */
     protected void performSnapshotWithoutCapture(final long snapshotCapturedIndex) {
-        long actualIndex = context.getSnapshotManager().trimLog(snapshotCapturedIndex, this);
+        long actualIndex = context.getSnapshotManager().trimLog(snapshotCapturedIndex);
 
         if(actualIndex != -1){
             setReplicatedToAllIndex(actualIndex);
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/DelegatingRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/DelegatingRaftActorBehavior.java
deleted file mode 100644 (file)
index 175e16f..0000000
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.raft.behaviors;
-
-import akka.actor.ActorRef;
-import org.opendaylight.controller.cluster.raft.RaftState;
-
-/**
- * A RaftActorBehavior implementation that delegates to another implementation.
- *
- * @author Thomas Pantelis
- */
-public class DelegatingRaftActorBehavior implements RaftActorBehavior {
-    private RaftActorBehavior delegate;
-
-    public RaftActorBehavior getDelegate() {
-        return delegate;
-    }
-
-    public void setDelegate(RaftActorBehavior delegate) {
-        this.delegate = delegate;
-    }
-
-    @Override
-    public void close() throws Exception {
-        delegate.close();
-    }
-
-    @Override
-    public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
-        return delegate.handleMessage(sender, message);
-    }
-
-    @Override
-    public RaftState state() {
-        return delegate.state();
-    }
-
-    @Override
-    public String getLeaderId() {
-        return delegate.getLeaderId();
-    }
-
-    @Override
-    public void setReplicatedToAllIndex(long replicatedToAllIndex) {
-        delegate.setReplicatedToAllIndex(replicatedToAllIndex);
-    }
-
-    @Override
-    public long getReplicatedToAllIndex() {
-        return delegate.getReplicatedToAllIndex();
-    }
-
-    @Override
-    public short getLeaderPayloadVersion() {
-        return delegate.getLeaderPayloadVersion();
-    }
-
-    @Override
-    public RaftActorBehavior switchBehavior(RaftActorBehavior behavior) {
-        return delegate.switchBehavior(behavior);
-    }
-}
index 25d14a2b193120c4c5a98021f8c7ed76f5c42a45..34572653c360ba89eab21393cfc1823a401662f4 100644 (file)
@@ -17,6 +17,7 @@ import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.slf4j.Logger;
@@ -318,4 +319,9 @@ public class MockRaftActorContext extends RaftActorContextImpl {
             return this.mockLog;
         }
     }
+
+    @Override
+    public void setCurrentBehavior(final RaftActorBehavior behavior) {
+        super.setCurrentBehavior(behavior);
+    }
 }
index 71ca4cae5df490d6d6d5faa00954b167dc614fb9..b4497fd298487e3636387a4ca0bd366dd9b94246 100644 (file)
@@ -40,7 +40,6 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntrie
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
-import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,8 +56,6 @@ public class RaftActorRecoverySupportTest {
     @Mock
     private DataPersistenceProvider mockPersistence;
 
-    @Mock
-    private RaftActorBehavior mockBehavior;
 
     @Mock
     private RaftActorRecoveryCohort mockCohort;
@@ -80,11 +77,11 @@ public class RaftActorRecoverySupportTest {
         context = new RaftActorContextImpl(null, null, localId, new ElectionTermImpl(mockPersistentProvider, "test", LOG),
                 -1, -1, Collections.<String,String>emptyMap(), configParams, mockPersistence, LOG);
 
-        support = new RaftActorRecoverySupport(context, mockBehavior , mockCohort);
+        support = new RaftActorRecoverySupport(context, mockCohort);
 
         doReturn(true).when(mockPersistence).isRecoveryApplicable();
 
-        context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, mockBehavior));
+        context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
     }
 
     private void sendMessageToSupport(Object message) {
index bdfb2b20c8192e5d6f4269a16147d33ec0763431..514346f1ab30334417b3f40c360759db1d5a7a3a 100644 (file)
@@ -135,7 +135,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
     public void testAddServerWithExistingFollower() throws Exception {
         LOG.info("testAddServerWithExistingFollower starting");
         setupNewFollower();
-        RaftActorContext followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
+        RaftActorContextImpl followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
                 0, 3, 1).build());
         followerActorContext.setCommitIndex(2);
@@ -143,6 +143,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         Follower follower = new Follower(followerActorContext);
         followerActor.underlyingActor().setBehavior(follower);
+        followerActorContext.setCurrentBehavior(follower);
 
         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
@@ -1441,7 +1442,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         assertEquals("Server config", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig()));
     }
 
-    private static RaftActorContext newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
+    private static RaftActorContextImpl newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
         configParams.setElectionTimeoutFactor(100000);
index 94000d07121944f2b47dee06587aa220844ae3b3..32b2e465f6fcf83d32a6e9f213caee16ea020f8a 100644 (file)
@@ -71,11 +71,11 @@ public class RaftActorSnapshotMessageSupportTest {
             }
         };
 
-        support = new RaftActorSnapshotMessageSupport(context, mockBehavior, mockCohort);
+        support = new RaftActorSnapshotMessageSupport(context, mockCohort);
 
         doReturn(true).when(mockPersistence).isRecoveryApplicable();
 
-        context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, mockBehavior));
+        context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
     }
 
     private void sendMessageToSupport(Object message) {
@@ -109,7 +109,7 @@ public class RaftActorSnapshotMessageSupportTest {
         byte[] snapshot = {1,2,3,4,5};
         sendMessageToSupport(new CaptureSnapshotReply(snapshot));
 
-        verify(mockSnapshotManager).persist(same(snapshot), same(mockBehavior), anyLong());
+        verify(mockSnapshotManager).persist(same(snapshot), anyLong());
     }
 
     @Test
@@ -118,7 +118,7 @@ public class RaftActorSnapshotMessageSupportTest {
         long sequenceNumber = 100;
         sendMessageToSupport(new SaveSnapshotSuccess(new SnapshotMetadata("foo", sequenceNumber, 1234L)));
 
-        verify(mockSnapshotManager).commit(eq(sequenceNumber), same(mockBehavior));
+        verify(mockSnapshotManager).commit(eq(sequenceNumber));
     }
 
     @Test
@@ -135,7 +135,7 @@ public class RaftActorSnapshotMessageSupportTest {
 
         sendMessageToSupport(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT);
 
-        verify(mockSnapshotManager).commit(eq(-1L), same(mockBehavior));
+        verify(mockSnapshotManager).commit(eq(-1L));
     }
 
     @Test
index d37681e872528be1bedc38d42b0ce094599295e7..99b17fe8ab2ac9d8b303504f98bde766dae6cf60 100644 (file)
@@ -657,12 +657,12 @@ public class RaftActorTest extends AbstractActorTest {
                         new MockRaftActorContext.MockPayload("foo-4")));
 
                 leaderActor.getRaftActorContext().getSnapshotManager().persist(snapshotBytes.toByteArray(),
-                        leader, Runtime.getRuntime().totalMemory());
+                        Runtime.getRuntime().totalMemory());
 
                 assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
 
                 // The commit is needed to complete the snapshot creation process
-                leaderActor.getRaftActorContext().getSnapshotManager().commit(-1, leader);
+                leaderActor.getRaftActorContext().getSnapshotManager().commit(-1);
 
                 // capture snapshot reply should remove the snapshotted entries only
                 assertEquals(3, leaderActor.getReplicatedLog().size());
@@ -765,7 +765,7 @@ public class RaftActorTest extends AbstractActorTest {
                 assertTrue(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
 
                 // The commit is needed to complete the snapshot creation process
-                followerActor.getRaftActorContext().getSnapshotManager().commit(-1, follower);
+                followerActor.getRaftActorContext().getSnapshotManager().commit(-1);
 
                 // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
                 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
index 51cff356f2cdda165231df6b14375e776a1c715f..8f674cca3bac0968532da1fcdf22a9482105288f 100644 (file)
@@ -76,7 +76,7 @@ public class ReplicatedLogImplTest {
     @SuppressWarnings("unchecked")
     @Test
     public void testAppendAndPersistExpectingNoCapture() throws Exception {
-        ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockBehavior);
+        ReplicatedLog log = ReplicatedLogImpl.newInstance(context);
 
         MockReplicatedLogEntry logEntry = new MockReplicatedLogEntry(1, 1, new MockPayload("1"));
 
@@ -104,7 +104,7 @@ public class ReplicatedLogImplTest {
 
         doReturn(1L).when(mockBehavior).getReplicatedToAllIndex();
 
-        ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockBehavior);
+        ReplicatedLog log = ReplicatedLogImpl.newInstance(context);
 
         MockReplicatedLogEntry logEntry1 = new MockReplicatedLogEntry(1, 2, new MockPayload("2"));
         MockReplicatedLogEntry logEntry2 = new MockReplicatedLogEntry(1, 3, new MockPayload("3"));
@@ -132,7 +132,7 @@ public class ReplicatedLogImplTest {
             }
         });
 
-        ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockBehavior);
+        ReplicatedLog log = ReplicatedLogImpl.newInstance(context);
 
         int dataSize = 600;
         MockReplicatedLogEntry logEntry = new MockReplicatedLogEntry(1, 2, new MockPayload("2", dataSize));
@@ -153,7 +153,7 @@ public class ReplicatedLogImplTest {
     @Test
     public void testRemoveFromAndPersist() throws Exception {
 
-        ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockBehavior);
+        ReplicatedLog log = ReplicatedLogImpl.newInstance(context);
 
         log.append(new MockReplicatedLogEntry(1, 0, new MockPayload("0")));
         log.append(new MockReplicatedLogEntry(1, 1, new MockPayload("1")));
index 27596dff74f6a0b83ebab2775cf507d7515900fb..e6f7bd7b9ed9cc1975037309c2caab99633e633d 100644 (file)
@@ -81,6 +81,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         doReturn(mockReplicatedLog).when(mockRaftActorContext).getReplicatedLog();
         doReturn("123").when(mockRaftActorContext).getId();
         doReturn(mockDataPersistenceProvider).when(mockRaftActorContext).getPersistenceProvider();
+        doReturn(mockRaftActorBehavior).when(mockRaftActorContext).getCurrentBehavior();
         doReturn("123").when(mockRaftActorBehavior).getLeaderId();
 
         doReturn(mockElectionTerm).when(mockRaftActorContext).getTermInformation();
@@ -247,7 +248,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         snapshotManager.capture(lastLogEntry, -1);
 
         byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
-        snapshotManager.persist(bytes, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(bytes, Runtime.getRuntime().totalMemory());
 
         ArgumentCaptor<Snapshot> snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class);
         verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture());
@@ -280,7 +281,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
                 new MockRaftActorContext.MockPayload()), 9);
 
         byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
-        snapshotManager.persist(bytes, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(bytes, Runtime.getRuntime().totalMemory());
 
         ArgumentCaptor<Snapshot> snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class);
         verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture());
@@ -307,7 +308,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
                 new MockRaftActorContext.MockPayload()), -1);
 
-        snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
 
         verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
 
@@ -333,7 +334,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
                 new MockRaftActorContext.MockPayload()), replicatedToAllIndex);
 
-        snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, 2000000L);
+        snapshotManager.persist(new byte[]{}, 2000000L);
 
         verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
 
@@ -354,7 +355,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
 
-        snapshotManager.persist(bytes, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(bytes, Runtime.getRuntime().totalMemory());
 
         assertEquals(true, snapshotManager.isCapturing());
 
@@ -374,7 +375,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
     @Test
     public void testCallingPersistWithoutCaptureWillDoNothing(){
-        snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
 
         verify(mockDataPersistenceProvider, never()).saveSnapshot(any(Snapshot.class));
 
@@ -390,9 +391,9 @@ public class SnapshotManagerTest extends AbstractActorTest {
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
                 new MockRaftActorContext.MockPayload()), -1, "follower-1");
 
-        snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
 
-        snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
 
         verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
 
@@ -409,11 +410,11 @@ public class SnapshotManagerTest extends AbstractActorTest {
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
                 new MockRaftActorContext.MockPayload()), -1, "follower-1");
 
-        snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
 
         assertEquals(true, snapshotManager.isCapturing());
 
-        snapshotManager.commit(100L, mockRaftActorBehavior);
+        snapshotManager.commit(100L);
 
         assertEquals(false, snapshotManager.isCapturing());
 
@@ -438,7 +439,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
                 new MockRaftActorContext.MockPayload()), -1, "follower-1");
 
-        snapshotManager.commit(100L, mockRaftActorBehavior);
+        snapshotManager.commit(100L);
 
         verify(mockReplicatedLog, never()).snapshotCommit();
 
@@ -450,7 +451,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
     @Test
     public void testCommitBeforeCapture(){
-        snapshotManager.commit(100L, mockRaftActorBehavior);
+        snapshotManager.commit(100L);
 
         verify(mockReplicatedLog, never()).snapshotCommit();
 
@@ -468,11 +469,11 @@ public class SnapshotManagerTest extends AbstractActorTest {
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
                 new MockRaftActorContext.MockPayload()), -1, "follower-1");
 
-        snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
 
-        snapshotManager.commit(100L, mockRaftActorBehavior);
+        snapshotManager.commit(100L);
 
-        snapshotManager.commit(100L, mockRaftActorBehavior);
+        snapshotManager.commit(100L);
 
         verify(mockReplicatedLog, times(1)).snapshotCommit();
 
@@ -487,7 +488,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
                 new MockRaftActorContext.MockPayload()), -1, "follower-1");
 
-        snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
 
         snapshotManager.rollback();
 
@@ -521,7 +522,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
                 new MockRaftActorContext.MockPayload()), -1, "follower-1");
 
-        snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
 
         snapshotManager.rollback();
 
@@ -539,7 +540,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
         doReturn(5L).when(replicatedLogEntry).getTerm();
 
-        long retIndex = snapshotManager.trimLog(10, mockRaftActorBehavior);
+        long retIndex = snapshotManager.trimLog(10);
         assertEquals("return index", 10L, retIndex);
 
         verify(mockReplicatedLog).snapshotPreCommit(10, 5);
@@ -557,7 +558,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
         doReturn(5L).when(replicatedLogEntry).getTerm();
 
-        long retIndex = snapshotManager.trimLog(10, mockRaftActorBehavior);
+        long retIndex = snapshotManager.trimLog(10);
         assertEquals("return index", -1L, retIndex);
 
         verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong());
@@ -575,7 +576,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
         doReturn(5L).when(replicatedLogEntry).getTerm();
 
-        long retIndex = snapshotManager.trimLog(10, mockRaftActorBehavior);
+        long retIndex = snapshotManager.trimLog(10);
         assertEquals("return index", -1L, retIndex);
 
         verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong());
@@ -590,7 +591,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         doReturn(false).when(mockReplicatedLog).isPresent(10);
 
-        long retIndex = snapshotManager.trimLog(10, mockRaftActorBehavior);
+        long retIndex = snapshotManager.trimLog(10);
         assertEquals("return index", -1L, retIndex);
 
         verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong());
@@ -615,7 +616,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
         doReturn(5L).when(replicatedLogEntry).getTerm();
 
-        snapshotManager.trimLog(10, mockRaftActorBehavior);
+        snapshotManager.trimLog(10);
 
         verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong());
         verify(mockReplicatedLog, never()).snapshotCommit();
@@ -637,7 +638,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
         doReturn(5L).when(replicatedLogEntry).getTerm();
 
-        snapshotManager.trimLog(10, mockRaftActorBehavior);
+        snapshotManager.trimLog(10);
 
         verify(mockReplicatedLog, never()).snapshotPreCommit(10, 5);
         verify(mockReplicatedLog, never()).snapshotCommit();
index 63d9643e3cc2dce319e79fe3d69182792a8dd334..31f2d629641fd92820c549c9dcaf2a7eb3a08beb 100644 (file)
@@ -25,7 +25,6 @@ import org.junit.After;
 import org.junit.Before;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
-import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.TestActorFactory;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
@@ -199,13 +198,14 @@ public class AbstractLeaderElectionScenarioTest {
         assertEquals(name + " behavior state", expState, actor.behavior.state());
     }
 
-    void initializeLeaderBehavior(MemberActor actor, RaftActorContext context, int numActiveFollowers) throws Exception {
+    void initializeLeaderBehavior(MemberActor actor, MockRaftActorContext context, int numActiveFollowers) throws Exception {
         // Leader sends immediate heartbeats - we don't care about it so ignore it.
 
         actor.expectMessageClass(AppendEntriesReply.class, numActiveFollowers);
 
-        @SuppressWarnings("resource")
         Leader leader = new Leader(context);
+        context.setCurrentBehavior(leader);
+
         actor.waitForExpectedMessages(AppendEntriesReply.class);
         // Delay assignment here so the AppendEntriesReply isn't forwarded to the behavior.
         actor.behavior = leader;
index dd3ed2347c911bdf429193c147bfed11011dbe54..003964a0a1a0e881dd80e59722a8b7795591f4ed 100644 (file)
@@ -25,7 +25,7 @@ import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorAc
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import scala.concurrent.duration.FiniteDuration;
 
-public abstract class AbstractLeaderTest extends AbstractRaftActorBehaviorTest{
+public abstract class AbstractLeaderTest<T extends AbstractLeader> extends AbstractRaftActorBehaviorTest<T> {
 
     /**
      * When we removed scheduling of heartbeat in the AbstractLeader constructor we ended up with a situation where
index 496f7b071a927d9475c58cbd5b042a885f085b9e..30ca63b0634a74c7ecbe0887ba0e8452f72b6176 100644 (file)
@@ -40,7 +40,7 @@ import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payloa
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.slf4j.LoggerFactory;
 
-public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
+public abstract class AbstractRaftActorBehaviorTest<T extends RaftActorBehavior> extends AbstractActorTest {
 
     protected final TestActorFactory actorFactory = new TestActorFactory(getSystem());
 
@@ -66,7 +66,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
      */
     @Test
     public void testHandleRaftRPCWithNewerTerm() throws Exception {
-        RaftActorContext actorContext = createActorContext();
+        MockRaftActorContext actorContext = createActorContext();
 
         assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, behaviorActor,
                 createAppendEntriesWithNewerTerm());
@@ -212,7 +212,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
      */
     @Test
     public void testHandleRequestVoteWhenSenderTermLessThanCurrentTerm() {
-        RaftActorContext context = createActorContext();
+        MockRaftActorContext context = createActorContext();
 
         context.getTermInformation().update(1000, null);
 
@@ -272,11 +272,11 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
     }
 
 
-    protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
+    protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
             ActorRef actorRef, RaftRPC rpc) throws Exception {
 
         Payload p = new MockRaftActorContext.MockPayload("");
-        setLastLogEntry((MockRaftActorContext) actorContext, 1, 0, p);
+        setLastLogEntry(actorContext, 1, 0, p);
         actorContext.getTermInformation().update(1, "test");
 
         RaftActorBehavior origBehavior = createBehavior(actorContext);
@@ -304,8 +304,13 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
         return log;
     }
 
-    protected abstract RaftActorBehavior createBehavior(
-        RaftActorContext actorContext);
+    protected abstract T createBehavior(RaftActorContext actorContext);
+
+    protected final T createBehavior(MockRaftActorContext actorContext) {
+        T ret = createBehavior((RaftActorContext)actorContext);
+        actorContext.setCurrentBehavior(ret);
+        return ret;
+    }
 
     protected RaftActorBehavior createBehavior() {
         return createBehavior(createActorContext());
index 73e0a165156082787f998259d054c26a07b49222..6e5c9315025a9ab68950245eafee0861bf697547 100644 (file)
@@ -43,7 +43,7 @@ import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class CandidateTest extends AbstractRaftActorBehaviorTest {
+public class CandidateTest extends AbstractRaftActorBehaviorTest<Candidate> {
     static final Logger LOG = LoggerFactory.getLogger(CandidateTest.class);
 
     private final TestActorRef<MessageCollectorActor> candidateActor = actorFactory.createTestActor(
@@ -325,7 +325,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
     }
 
     @Override
-    protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
+    protected Candidate createBehavior(final RaftActorContext actorContext) {
         return new Candidate(actorContext);
     }
 
@@ -333,7 +333,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
         return new MockRaftActorContext("candidate", getSystem(), candidateActor);
     }
 
-    private Map<String, String> setupPeers(int count) {
+    private Map<String, String> setupPeers(final int count) {
         Map<String, String> peerMap = new HashMap<>();
         peerActors = new TestActorRef[count];
         for(int i = 0; i < count; i++) {
@@ -346,8 +346,8 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
     }
 
     @Override
-    protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
-            ActorRef actorRef, RaftRPC rpc) throws Exception {
+    protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
+            final ActorRef actorRef, final RaftRPC rpc) throws Exception {
         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
     }
index 000456bd57937e8c813e64f603d58f3b75379ce5..a1301adb7c29be8af54d9d34da927aab51bdd64e 100644 (file)
@@ -183,6 +183,7 @@ public class DelayedMessagesElectionScenarioTest extends AbstractLeaderElectionS
         member2Context.setConfigParams(member2ConfigParams);
 
         member2Actor.behavior = new Follower(member2Context);
+        member2Context.setCurrentBehavior(member2Actor.behavior);
 
         // Create member 3's behavior initially as Follower
 
@@ -195,6 +196,7 @@ public class DelayedMessagesElectionScenarioTest extends AbstractLeaderElectionS
         member3Context.setConfigParams(member3ConfigParams);
 
         member3Actor.behavior = new Follower(member3Context);
+        member3Context.setCurrentBehavior(member3Actor.behavior);
 
         // Create member 1's behavior initially as Leader
 
index d5d9e88098e5bde5adaa493562ff4c09b6ce7ec8..c10ff003566198d270a0826754b890f47380fc66 100644 (file)
@@ -50,7 +50,7 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import scala.concurrent.duration.FiniteDuration;
 
-public class FollowerTest extends AbstractRaftActorBehaviorTest {
+public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
 
     private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
@@ -113,7 +113,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
 
-        RaftActorContext context = createActorContext();
+        MockRaftActorContext context = createActorContext();
         long term = 1000;
         context.getTermInformation().update(term, null);
 
@@ -132,7 +132,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
 
-        RaftActorContext context = createActorContext();
+        MockRaftActorContext context = createActorContext();
         long term = 1000;
         context.getTermInformation().update(term, "test");
 
@@ -1034,7 +1034,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
     }
 
     @Override
-    protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
+    protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
             ActorRef actorRef, RaftRPC rpc) throws Exception {
         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
 
@@ -1043,7 +1043,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
     }
 
     @Override
-    protected void handleAppendEntriesAddSameEntryToLogReply(TestActorRef<MessageCollectorActor> replyActor)
+    protected void handleAppendEntriesAddSameEntryToLogReply(final TestActorRef<MessageCollectorActor> replyActor)
             throws Exception {
         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
         assertEquals("isSuccess", true, reply.isSuccess());
index 0eb58d427afa33c62a8c73c4e77b3aeb21db84a4..a48efb21f3cc82723e9ce953ecc4d31d14d54028 100644 (file)
@@ -22,7 +22,7 @@ import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 
-public class IsolatedLeaderTest  extends AbstractLeaderTest {
+public class IsolatedLeaderTest extends AbstractLeaderTest<IsolatedLeader> {
 
     private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
@@ -43,7 +43,7 @@ public class IsolatedLeaderTest  extends AbstractLeaderTest {
     }
 
     @Override
-    protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
+    protected IsolatedLeader createBehavior(RaftActorContext actorContext) {
         return new IsolatedLeader(actorContext);
     }
 
@@ -73,6 +73,7 @@ public class IsolatedLeaderTest  extends AbstractLeaderTest {
         leaderActorContext.setPeerAddresses(peerAddresses);
 
         isolatedLeader = new IsolatedLeader(leaderActorContext);
+        leaderActorContext.setCurrentBehavior(isolatedLeader);
         assertEquals("Raft state", RaftState.IsolatedLeader, isolatedLeader.state());
 
         // in a 3 node cluster, even if 1 follower is returns a reply, the isolatedLeader is not isolated
@@ -108,6 +109,7 @@ public class IsolatedLeaderTest  extends AbstractLeaderTest {
         leaderActorContext.setPeerAddresses(peerAddresses);
 
         isolatedLeader = new IsolatedLeader(leaderActorContext);
+        leaderActorContext.setCurrentBehavior(isolatedLeader);
         assertEquals("Raft state", RaftState.IsolatedLeader, isolatedLeader.state());
 
         // in a 5 member cluster, atleast 2 followers need to be active and return a reply
index b0d220a0d6fbd31db20992b2abd011bb20196d86..9d8144c4396fc195541706306debb21bf8060296 100644 (file)
@@ -64,7 +64,7 @@ import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorAc
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import scala.concurrent.duration.FiniteDuration;
 
-public class LeaderTest extends AbstractLeaderTest {
+public class LeaderTest extends AbstractLeaderTest<Leader> {
 
     static final String FOLLOWER_ID = "follower";
     public static final String LEADER_ID = "leader";
@@ -112,6 +112,7 @@ public class LeaderTest extends AbstractLeaderTest {
         actorContext.getTermInformation().update(term, "");
 
         leader = new Leader(actorContext);
+        actorContext.setCurrentBehavior(leader);
 
         // Leader should send an immediate heartbeat with no entries as follower is inactive.
         long lastIndex = actorContext.getReplicatedLog().lastIndex();
@@ -208,6 +209,7 @@ public class LeaderTest extends AbstractLeaderTest {
         actorContext.getTermInformation().update(newTerm, "");
 
         leader = new Leader(actorContext);
+        actorContext.setCurrentBehavior(leader);
 
         // Leader will send an immediate heartbeat - ignore it.
         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
@@ -894,6 +896,7 @@ public class LeaderTest extends AbstractLeaderTest {
         actorContext.setCommitIndex(commitIndex);
 
         leader = new Leader(actorContext);
+        actorContext.setCurrentBehavior(leader);
 
         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
@@ -962,6 +965,7 @@ public class LeaderTest extends AbstractLeaderTest {
         actorContext.setCommitIndex(commitIndex);
 
         leader = new Leader(actorContext);
+        actorContext.setCurrentBehavior(leader);
 
         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
@@ -1190,8 +1194,8 @@ public class LeaderTest extends AbstractLeaderTest {
         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
     }
 
-    @Override protected RaftActorBehavior createBehavior(
-        RaftActorContext actorContext) {
+    @Override
+    protected Leader createBehavior(final RaftActorContext actorContext) {
         return new Leader(actorContext);
     }
 
@@ -1241,6 +1245,7 @@ public class LeaderTest extends AbstractLeaderTest {
 
         Follower follower = new Follower(followerActorContext);
         followerActor.underlyingActor().setBehavior(follower);
+        followerActorContext.setCurrentBehavior(follower);
 
         Map<String, String> peerAddresses = new HashMap<>();
         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
@@ -1264,6 +1269,7 @@ public class LeaderTest extends AbstractLeaderTest {
         followerActorContext.setCommitIndex(1);
 
         leader = new Leader(leaderActorContext);
+        leaderActorContext.setCurrentBehavior(leader);
 
         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
@@ -1295,6 +1301,7 @@ public class LeaderTest extends AbstractLeaderTest {
 
         Follower follower = new Follower(followerActorContext);
         followerActor.underlyingActor().setBehavior(follower);
+        followerActorContext.setCurrentBehavior(follower);
 
         Map<String, String> leaderPeerAddresses = new HashMap<>();
         leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
@@ -1462,6 +1469,7 @@ public class LeaderTest extends AbstractLeaderTest {
 
         Follower follower = new Follower(followerActorContext);
         followerActor.underlyingActor().setBehavior(follower);
+        followerActorContext.setCurrentBehavior(follower);
 
         leader = new Leader(leaderActorContext);
 
@@ -1477,6 +1485,7 @@ public class LeaderTest extends AbstractLeaderTest {
         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
 
         leaderActor.underlyingActor().setBehavior(leader);
+        leaderActorContext.setCurrentBehavior(leader);
 
         leader.handleMessage(followerActor, appendEntriesReply);
 
@@ -1541,6 +1550,7 @@ public class LeaderTest extends AbstractLeaderTest {
 
         Follower follower = new Follower(followerActorContext);
         followerActor.underlyingActor().setBehavior(follower);
+        followerActorContext.setCurrentBehavior(follower);
 
         leader = new Leader(leaderActorContext);
 
@@ -1556,6 +1566,7 @@ public class LeaderTest extends AbstractLeaderTest {
         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
 
         leaderActor.underlyingActor().setBehavior(leader);
+        leaderActorContext.setCurrentBehavior(leader);
 
         leader.handleMessage(followerActor, appendEntriesReply);
 
@@ -1738,6 +1749,7 @@ public class LeaderTest extends AbstractLeaderTest {
 
         Follower follower = new Follower(followerActorContext);
         followerActor.underlyingActor().setBehavior(follower);
+        followerActorContext.setCurrentBehavior(follower);
 
         leader = new Leader(leaderActorContext);
 
@@ -1753,6 +1765,7 @@ public class LeaderTest extends AbstractLeaderTest {
         assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
 
         leaderActor.underlyingActor().setBehavior(leader);
+        leaderActorContext.setCurrentBehavior(leader);
 
         leader.handleMessage(followerActor, appendEntriesReply);
 
@@ -1964,6 +1977,7 @@ public class LeaderTest extends AbstractLeaderTest {
         leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING);
 
         leader = new Leader(leaderActorContext);
+        leaderActorContext.setCurrentBehavior(leader);
 
         // Ignore initial heartbeats
         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
@@ -2020,6 +2034,7 @@ public class LeaderTest extends AbstractLeaderTest {
         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
 
         leader = new Leader(leaderActorContext);
+        leaderActorContext.setCurrentBehavior(leader);
 
         // Initial heartbeat
         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
@@ -2059,6 +2074,7 @@ public class LeaderTest extends AbstractLeaderTest {
         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
 
         leader = new Leader(leaderActorContext);
+        leaderActorContext.setCurrentBehavior(leader);
 
         // Initial heartbeat
         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
@@ -2090,6 +2106,7 @@ public class LeaderTest extends AbstractLeaderTest {
                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
 
         leader = new Leader(leaderActorContext);
+        leaderActorContext.setCurrentBehavior(leader);
 
         // Initial heartbeat
         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
@@ -2128,6 +2145,7 @@ public class LeaderTest extends AbstractLeaderTest {
         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
 
         leader = new Leader(leaderActorContext);
+        leaderActorContext.setCurrentBehavior(leader);
 
         // Initial heartbeat
         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
@@ -2157,7 +2175,7 @@ public class LeaderTest extends AbstractLeaderTest {
     }
 
     @Override
-    protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
+    protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
             ActorRef actorRef, RaftRPC rpc) throws Exception {
         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
index 376c01e500930b7a5ada41591116cad9460d19ae..a323aa78ae3dbb0550740a055900546c02c09769 100644 (file)
@@ -181,6 +181,7 @@ public class PartitionedCandidateOnStartupElectionScenarioTest extends AbstractL
 
         Candidate member3Behavior = new Candidate(member3Context);
         member3Actor.behavior = member3Behavior;
+        member3Context.setCurrentBehavior(member3Behavior);
 
         // Send several additional ElectionTimeouts to Candidate member 3. Each ElectionTimeout will
         // start a new term so Candidate member 3's current term will be greater than the leader's
@@ -219,6 +220,7 @@ public class PartitionedCandidateOnStartupElectionScenarioTest extends AbstractL
         member2Context.setConfigParams(member2ConfigParams);
 
         member2Actor.behavior = new Follower(member2Context);
+        member2Context.setCurrentBehavior(member2Actor.behavior);
 
         // Create member 1's behavior as Leader.
 
index 83f7a75c9cc368157d018429892df5467a3dca51..11cc1f0a6d87b70f49ba8482431dae46ca08555e 100644 (file)
@@ -287,6 +287,7 @@ public class PartitionedLeadersElectionScenarioTest extends AbstractLeaderElecti
         member2Context.setConfigParams(member2ConfigParams);
 
         member2Actor.behavior = new Follower(member2Context);
+        member2Context.setCurrentBehavior(member2Actor.behavior);
 
         // Create member 3's behavior initially as Follower
 
@@ -299,6 +300,7 @@ public class PartitionedLeadersElectionScenarioTest extends AbstractLeaderElecti
         member3Context.setConfigParams(member3ConfigParams);
 
         member3Actor.behavior = new Follower(member3Context);
+        member3Context.setCurrentBehavior(member3Actor.behavior);
 
         // Create member 1's behavior initially as Leader