From: Robert Varga Date: Tue, 15 Mar 2016 13:32:58 +0000 (+0100) Subject: Remove DelegatingRaftActorBehavior X-Git-Tag: release/boron~288 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=24ace09aacc620fd9768e0a7004e802f9385bcfc Remove DelegatingRaftActorBehavior The delegate is leaked through various methods, implementations of which already have access to the current behavior if it were available from RaftActorContext. Simplify calling conventions Change-Id: I9e27f68e55f28a9afd446abff91fbb38dd26c011 Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index a36807a8c8..fdd4b2395b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -41,7 +41,6 @@ import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader; import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior; -import org.opendaylight.controller.cluster.raft.behaviors.DelegatingRaftActorBehavior; import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; @@ -100,12 +99,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { protected final Logger LOG = LoggerFactory.getLogger(getClass()); - /** - * The current state determines the current behavior of a RaftActor - * A Raft Actor always starts off in the Follower State - */ - private final DelegatingRaftActorBehavior currentBehavior = new DelegatingRaftActorBehavior(); - /** * This context should NOT be passed directly to any other actor it is * only to be consumed by the RaftActorBehaviors @@ -143,7 +136,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { delegatingPersistenceProvider, LOG); context.setPayloadVersion(payloadVersion); - context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, currentBehavior)); + context.setReplicatedLog(ReplicatedLogImpl.newInstance(context)); } @Override @@ -159,14 +152,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { @Override public void postStop() { - if(currentBehavior.getDelegate() != null) { - try { - currentBehavior.close(); - } catch (Exception e) { - LOG.debug("{}: Error closing behavior {}", persistenceId(), currentBehavior.state()); - } - } - + context.close(); super.postStop(); } @@ -194,13 +180,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } protected RaftActorRecoverySupport newRaftActorRecoverySupport() { - return new RaftActorRecoverySupport(context, currentBehavior, getRaftActorRecoveryCohort()); + return new RaftActorRecoverySupport(context, getRaftActorRecoveryCohort()); } - protected void initializeBehavior(){ + @VisibleForTesting + void initializeBehavior(){ changeCurrentBehavior(new Follower(context)); } + @VisibleForTesting protected void changeCurrentBehavior(RaftActorBehavior newBehavior){ if(getCurrentBehavior() != null) { try { @@ -249,7 +237,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { // and recovery shows data missing context.getReplicatedLog().captureSnapshotIfReady(applyState.getReplicatedLogEntry()); - context.getSnapshotManager().trimLog(context.getLastApplied(), currentBehavior); + context.getSnapshotManager().trimLog(context.getLastApplied()); } } else if (message instanceof ApplyJournalEntries) { @@ -315,6 +303,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } shuttingDown = true; + + final RaftActorBehavior currentBehavior = context.getCurrentBehavior(); if(currentBehavior.state() == RaftState.Leader && context.hasFollowers()) { initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() { @Override @@ -349,9 +339,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private void onLeaderTransitioning() { LOG.debug("{}: onLeaderTransitioning", persistenceId()); Optional roleChangeNotifier = getRoleChangeNotifier(); - if(currentBehavior.state() == RaftState.Follower && roleChangeNotifier.isPresent()) { + if(getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()) { roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), null, - currentBehavior.getLeaderPayloadVersion()), getSelf()); + getCurrentBehavior().getLeaderPayloadVersion()), getSelf()); } } @@ -375,9 +365,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior()); } - protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() { - return new RaftActorSnapshotMessageSupport(context, currentBehavior, - getRaftActorSnapshotCohort()); + @VisibleForTesting + RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() { + return new RaftActorSnapshotMessageSupport(context, getRaftActorSnapshotCohort()); } private void onGetOnDemandRaftStats() { @@ -388,6 +378,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { peerAddresses.put(peerId, context.getPeerAddress(peerId)); } + final RaftActorBehavior currentBehavior = context.getCurrentBehavior(); OnDemandRaftState.Builder builder = OnDemandRaftState.builder() .commitIndex(context.getCommitIndex()) .currentTerm(context.getTermInformation().getCurrentTerm()) @@ -406,7 +397,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { .peerAddresses(peerAddresses) .customRaftPolicyClassName(context.getConfigParams().getCustomRaftPolicyImplementationClass()); - ReplicatedLogEntry lastLogEntry = getLastLogEntry(); + ReplicatedLogEntry lastLogEntry = replicatedLog().last(); if (lastLogEntry != null) { builder.lastLogIndex(lastLogEntry.getIndex()); builder.lastLogTerm(lastLogEntry.getTerm()); @@ -518,7 +509,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry); // Send message for replication - currentBehavior.handleMessage(getSelf(), + getCurrentBehavior().handleMessage(getSelf(), new Replicate(clientActor, identifier, replicatedLogEntry)); } } @@ -535,11 +526,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { @VisibleForTesting void setCurrentBehavior(RaftActorBehavior behavior) { - currentBehavior.setDelegate(behavior); + context.setCurrentBehavior(behavior); } protected RaftActorBehavior getCurrentBehavior() { - return currentBehavior.getDelegate(); + return context.getCurrentBehavior(); } /** @@ -549,11 +540,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { * @return true it this RaftActor is a Leader false otherwise */ protected boolean isLeader() { - return context.getId().equals(currentBehavior.getLeaderId()); + return context.getId().equals(getCurrentBehavior().getLeaderId()); } - protected boolean isLeaderActive() { - return currentBehavior.state() != RaftState.IsolatedLeader && !shuttingDown && + protected final boolean isLeaderActive() { + return getRaftState() != RaftState.IsolatedLeader && !shuttingDown && !isLeadershipTransferInProgress(); } @@ -582,30 +573,19 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { * * @return the current leader's id */ - protected String getLeaderId(){ - return currentBehavior.getLeaderId(); - } - - protected RaftState getRaftState() { - return currentBehavior.state(); + protected final String getLeaderId(){ + return getCurrentBehavior().getLeaderId(); } - protected ReplicatedLogEntry getLastLogEntry() { - return replicatedLog().last(); + @VisibleForTesting + protected final RaftState getRaftState() { + return getCurrentBehavior().state(); } protected Long getCurrentTerm(){ return context.getTermInformation().getCurrentTerm(); } - protected Long getCommitIndex(){ - return context.getCommitIndex(); - } - - protected Long getLastApplied(){ - return context.getLastApplied(); - } - protected RaftActorContext getRaftActorContext() { return context; } @@ -625,7 +605,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { // The RaftPolicy was modified. If the current behavior is Follower then re-initialize to Follower // but transfer the previous leaderId so it doesn't immediately try to schedule an election. This // avoids potential disruption. Otherwise, switch to Follower normally. - RaftActorBehavior behavior = currentBehavior.getDelegate(); + RaftActorBehavior behavior = getCurrentBehavior(); if(behavior instanceof Follower) { String previousLeaderId = ((Follower)behavior).getLeaderId(); short previousLeaderPayloadVersion = behavior.getLeaderPayloadVersion(); @@ -757,13 +737,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { operation.run(); } - protected void onLeaderChanged(String oldLeader, String newLeader){}; + protected void onLeaderChanged(String oldLeader, String newLeader) { + + }; private String getLeaderAddress(){ if(isLeader()){ return getSelf().path().toString(); } - String leaderId = currentBehavior.getLeaderId(); + String leaderId = getLeaderId(); if (leaderId == null) { return null; } @@ -783,11 +765,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private void captureSnapshot() { SnapshotManager snapshotManager = context.getSnapshotManager(); - if(!snapshotManager.isCapturing()) { + if (!snapshotManager.isCapturing()) { + final long idx = getCurrentBehavior().getReplicatedToAllIndex(); LOG.debug("Take a snapshot of current state. lastReplicatedLog is {} and replicatedToAllIndex is {}", - replicatedLog().last(), currentBehavior.getReplicatedToAllIndex()); + replicatedLog().last(), idx); - snapshotManager.capture(replicatedLog().last(), currentBehavior.getReplicatedToAllIndex()); + snapshotManager.capture(replicatedLog().last(), idx); } } @@ -879,7 +862,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if(this.message instanceof SwitchBehavior){ return AbstractRaftActorBehavior.createBehavior(context, ((SwitchBehavior) message).getNewState()); } - return currentBehavior.handleMessage(sender, message); + return getCurrentBehavior().handleMessage(sender, message); } } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java index 6f941d7dbe..267a4d2b7e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java @@ -17,6 +17,7 @@ import com.google.common.base.Supplier; import java.util.Collection; import javax.annotation.Nullable; import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.policy.RaftPolicy; import org.slf4j.Logger; @@ -241,4 +242,9 @@ public interface RaftActorContext { * @return true if this RaftActor is a voting member of the cluster, false otherwise. */ boolean isVotingMember(); + + /** + * @return current behavior attached to the raft actor. + */ + RaftActorBehavior getCurrentBehavior(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java index 0bf71ab799..8b28d5213f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java @@ -14,6 +14,7 @@ import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Props; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import java.util.ArrayList; import java.util.Collection; @@ -24,6 +25,7 @@ import java.util.Map; import java.util.Set; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo; +import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.policy.RaftPolicy; import org.slf4j.Logger; @@ -64,6 +66,8 @@ public class RaftActorContextImpl implements RaftActorContext { private boolean votingMember = true; + private RaftActorBehavior currentBehavior; + public RaftActorContextImpl(ActorRef actor, ActorContext context, String id, ElectionTerm termInformation, long commitIndex, long lastApplied, Map peerAddresses, ConfigParams configParams, DataPersistenceProvider persistenceProvider, Logger logger) { @@ -323,4 +327,23 @@ public class RaftActorContextImpl implements RaftActorContext { public boolean isVotingMember() { return votingMember; } + + @Override + public RaftActorBehavior getCurrentBehavior() { + return currentBehavior; + } + + void setCurrentBehavior(final RaftActorBehavior behavior) { + this.currentBehavior = Preconditions.checkNotNull(behavior); + } + + void close() { + if (currentBehavior != null) { + try { + currentBehavior.close(); + } catch (Exception e) { + LOG.debug("{}: Error closing behavior {}", getId(), currentBehavior.state()); + } + } + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java index 923e1f1c0d..ae7fdeadbc 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java @@ -20,7 +20,6 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries; import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm; -import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.slf4j.Logger; /** @@ -30,7 +29,6 @@ import org.slf4j.Logger; */ class RaftActorRecoverySupport { private final RaftActorContext context; - private final RaftActorBehavior currentBehavior; private final RaftActorRecoveryCohort cohort; private int currentRecoveryBatchCount; @@ -40,10 +38,8 @@ class RaftActorRecoverySupport { private Stopwatch recoveryTimer; private final Logger log; - RaftActorRecoverySupport(RaftActorContext context, RaftActorBehavior currentBehavior, - RaftActorRecoveryCohort cohort) { + RaftActorRecoverySupport(final RaftActorContext context, final RaftActorRecoveryCohort cohort) { this.context = context; - this.currentBehavior = currentBehavior; this.cohort = cohort; this.log = context.getLogger(); } @@ -167,7 +163,7 @@ class RaftActorRecoverySupport { // The replicated log can be used later on to retrieve this snapshot // when we need to install it on a peer - context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, currentBehavior)); + context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context)); context.setLastApplied(snapshot.getLastAppliedIndex()); context.setCommitIndex(snapshot.getLastAppliedIndex()); context.getTermInformation().update(snapshot.getElectionTerm(), snapshot.getElectionVotedFor()); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java index 09020812bd..7012e0db86 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java @@ -120,7 +120,7 @@ class RaftActorServerConfigurationSupport { private void onRemoveServer(RemoveServer removeServer, ActorRef sender) { LOG.debug("{}: onRemoveServer: {}, state: {}", raftContext.getId(), removeServer, currentOperationState); - boolean isSelf = removeServer.getServerId().equals(raftActor.getId()); + boolean isSelf = removeServer.getServerId().equals(raftContext.getId()); if(isSelf && !raftContext.hasFollowers()) { sender.tell(new RemoveServerReply(ServerChangeStatus.NOT_SUPPORTED, raftActor.getLeaderId()), raftActor.getSelf()); @@ -316,7 +316,7 @@ class RaftActorServerConfigurationSupport { // Sanity check - we could get an ApplyState from a previous operation that timed out so make // sure it's meant for us. if(operationContext.getContextId().equals(applyState.getIdentifier())) { - LOG.info("{}: {} has been successfully replicated to a majority of followers", raftActor.getId(), + LOG.info("{}: {} has been successfully replicated to a majority of followers", raftContext.getId(), applyState.getReplicatedLogEntry().getData()); timer.cancel(); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java index 7750cc8923..8b68711741 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java @@ -18,7 +18,6 @@ import org.apache.commons.lang3.SerializationUtils; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; -import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply; import org.slf4j.Logger; @@ -33,7 +32,6 @@ class RaftActorSnapshotMessageSupport { static final String COMMIT_SNAPSHOT = "commit_snapshot"; private final RaftActorContext context; - private final RaftActorBehavior currentBehavior; private final RaftActorSnapshotCohort cohort; private final Logger log; @@ -53,10 +51,8 @@ class RaftActorSnapshotMessageSupport { private Duration snapshotReplyActorTimeout = Duration.create(30, TimeUnit.SECONDS); - RaftActorSnapshotMessageSupport(RaftActorContext context, RaftActorBehavior currentBehavior, - RaftActorSnapshotCohort cohort) { + RaftActorSnapshotMessageSupport(final RaftActorContext context, final RaftActorSnapshotCohort cohort) { this.context = context; - this.currentBehavior = currentBehavior; this.cohort = cohort; this.log = context.getLogger(); @@ -74,7 +70,7 @@ class RaftActorSnapshotMessageSupport { } else if (message instanceof CaptureSnapshotReply) { onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot()); } else if (message.equals(COMMIT_SNAPSHOT)) { - context.getSnapshotManager().commit(-1, currentBehavior); + context.getSnapshotManager().commit(-1); } else if (message instanceof GetSnapshot) { onGetSnapshot(sender); } else { @@ -87,7 +83,7 @@ class RaftActorSnapshotMessageSupport { private void onCaptureSnapshotReply(byte[] snapshotBytes) { log.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", context.getId(), snapshotBytes.length); - context.getSnapshotManager().persist(snapshotBytes, currentBehavior, context.getTotalMemory()); + context.getSnapshotManager().persist(snapshotBytes, context.getTotalMemory()); } private void onSaveSnapshotFailure(SaveSnapshotFailure saveSnapshotFailure) { @@ -102,7 +98,7 @@ class RaftActorSnapshotMessageSupport { long sequenceNumber = success.metadata().sequenceNr(); - context.getSnapshotManager().commit(sequenceNumber, currentBehavior); + context.getSnapshotManager().commit(sequenceNumber); } private void onApplySnapshot(ApplySnapshot message) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java index 6b4427d5ce..90042907a5 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java @@ -12,7 +12,6 @@ import com.google.common.base.Preconditions; import java.util.Collections; import java.util.List; import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries; -import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; /** * Implementation of ReplicatedLog used by the RaftActor. @@ -22,7 +21,6 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl { private long dataSizeSinceLastSnapshot = 0L; private final RaftActorContext context; - private final RaftActorBehavior currentBehavior; private final Procedure deleteProcedure = new Procedure() { @Override @@ -30,22 +28,19 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl { } }; - static ReplicatedLog newInstance(final Snapshot snapshot, final RaftActorContext context, - final RaftActorBehavior currentBehavior) { + static ReplicatedLog newInstance(final Snapshot snapshot, final RaftActorContext context) { return new ReplicatedLogImpl(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(), - snapshot.getUnAppliedEntries(), context, currentBehavior); + snapshot.getUnAppliedEntries(), context); } - static ReplicatedLog newInstance(final RaftActorContext context, final RaftActorBehavior currentBehavior) { - return new ReplicatedLogImpl(-1L, -1L, Collections.emptyList(), context, - currentBehavior); + static ReplicatedLog newInstance(final RaftActorContext context) { + return new ReplicatedLogImpl(-1L, -1L, Collections.emptyList(), context); } private ReplicatedLogImpl(final long snapshotIndex, final long snapshotTerm, final List unAppliedEntries, - final RaftActorContext context, final RaftActorBehavior currentBehavior) { + final RaftActorContext context) { super(snapshotIndex, snapshotTerm, unAppliedEntries); this.context = Preconditions.checkNotNull(context); - this.currentBehavior = Preconditions.checkNotNull(currentBehavior); } @Override @@ -72,7 +67,7 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl { || getDataSizeForSnapshotCheck() > dataThreshold)) { boolean started = context.getSnapshotManager().capture(replicatedLogEntry, - currentBehavior.getReplicatedToAllIndex()); + context.getCurrentBehavior().getReplicatedToAllIndex()); if (started) { if (!context.hasFollowers()) { dataSizeSinceLastSnapshot = 0; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java index 197fa86715..7109980f3d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java @@ -72,13 +72,13 @@ public class SnapshotManager implements SnapshotState { } @Override - public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) { - currentState.persist(snapshotBytes, currentBehavior, totalMemory); + public void persist(final byte[] snapshotBytes, final long totalMemory) { + currentState.persist(snapshotBytes, totalMemory); } @Override - public void commit(long sequenceNumber, RaftActorBehavior currentBehavior) { - currentState.commit(sequenceNumber, currentBehavior); + public void commit(final long sequenceNumber) { + currentState.commit(sequenceNumber); } @Override @@ -87,8 +87,8 @@ public class SnapshotManager implements SnapshotState { } @Override - public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) { - return currentState.trimLog(desiredTrimIndex, currentBehavior); + public long trimLog(final long desiredTrimIndex) { + return currentState.trimLog(desiredTrimIndex); } public void setCreateSnapshotCallable(Procedure createSnapshotProcedure) { @@ -172,12 +172,12 @@ public class SnapshotManager implements SnapshotState { } @Override - public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) { + public void persist(final byte[] snapshotBytes, final long totalMemory) { LOG.debug("persist should not be called in state {}", this); } @Override - public void commit(long sequenceNumber, RaftActorBehavior currentBehavior) { + public void commit(final long sequenceNumber) { LOG.debug("commit should not be called in state {}", this); } @@ -187,12 +187,12 @@ public class SnapshotManager implements SnapshotState { } @Override - public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) { + public long trimLog(final long desiredTrimIndex) { LOG.debug("trimLog should not be called in state {}", this); return -1; } - protected long doTrimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior){ + protected long doTrimLog(final long desiredTrimIndex) { // we would want to keep the lastApplied as its used while capturing snapshots long lastApplied = context.getLastApplied(); long tempMin = Math.min(desiredTrimIndex, (lastApplied > -1 ? lastApplied - 1 : -1)); @@ -211,7 +211,10 @@ public class SnapshotManager implements SnapshotState { context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm()); context.getReplicatedLog().snapshotCommit(); return tempMin; - } else if(tempMin > currentBehavior.getReplicatedToAllIndex()) { + } + + final RaftActorBehavior currentBehavior = context.getCurrentBehavior(); + if(tempMin > currentBehavior.getReplicatedToAllIndex()) { // It's possible a follower was lagging and an install snapshot advanced its match index past // the current replicatedToAllIndex. Since the follower is now caught up we should advance the // replicatedToAllIndex (to tempMin). The fact that tempMin wasn't found in the log is likely @@ -286,15 +289,15 @@ public class SnapshotManager implements SnapshotState { } @Override - public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) { - return doTrimLog(desiredTrimIndex, currentBehavior); + public long trimLog(final long desiredTrimIndex) { + return doTrimLog(desiredTrimIndex); } } private class Creating extends AbstractSnapshotState { @Override - public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) { + public void persist(final byte[] snapshotBytes, final long totalMemory) { // create a snapshot object from the state provided and save it // when snapshot is saved async, SaveSnapshotSuccess is raised. @@ -316,6 +319,7 @@ public class SnapshotManager implements SnapshotState { boolean logSizeExceededSnapshotBatchCount = context.getReplicatedLog().size() >= context.getConfigParams().getSnapshotBatchCount(); + final RaftActorBehavior currentBehavior = context.getCurrentBehavior(); if (dataSizeThresholdExceeded || logSizeExceededSnapshotBatchCount) { if(LOG.isDebugEnabled()) { if(dataSizeThresholdExceeded) { @@ -381,7 +385,7 @@ public class SnapshotManager implements SnapshotState { private class Persisting extends AbstractSnapshotState { @Override - public void commit(long sequenceNumber, RaftActorBehavior currentBehavior) { + public void commit(final long sequenceNumber) { LOG.debug("{}: Snapshot success - sequence number: {}", persistenceId(), sequenceNumber); if(applySnapshot != null) { @@ -389,7 +393,7 @@ public class SnapshotManager implements SnapshotState { Snapshot snapshot = applySnapshot.getSnapshot(); //clears the followers log, sets the snapshot index to ensure adjusted-index works - context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, currentBehavior)); + context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context)); context.setLastApplied(snapshot.getLastAppliedIndex()); context.setCommitIndex(snapshot.getLastAppliedIndex()); context.getTermInformation().update(snapshot.getElectionTerm(), snapshot.getElectionVotedFor()); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java index f5a175f389..5d1304fe75 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java @@ -9,7 +9,6 @@ package org.opendaylight.controller.cluster.raft; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; -import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; public interface SnapshotState { /** @@ -52,14 +51,14 @@ public interface SnapshotState { * @param currentBehavior * @param totalMemory */ - void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory); + void persist(byte[] snapshotBytes, long totalMemory); /** * Commit the snapshot by trimming the log * * @param sequenceNumber */ - void commit(long sequenceNumber, RaftActorBehavior currentBehavior); + void commit(long sequenceNumber); /** * Rollback the snapshot @@ -72,5 +71,5 @@ public interface SnapshotState { * @param desiredTrimIndex * @return the actual trim index */ - long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior); + long trimLog(long desiredTrimIndex); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index 5394d65e2a..237e5eeaae 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -495,7 +495,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * @param snapshotCapturedIndex */ protected void performSnapshotWithoutCapture(final long snapshotCapturedIndex) { - long actualIndex = context.getSnapshotManager().trimLog(snapshotCapturedIndex, this); + long actualIndex = context.getSnapshotManager().trimLog(snapshotCapturedIndex); if(actualIndex != -1){ setReplicatedToAllIndex(actualIndex); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/DelegatingRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/DelegatingRaftActorBehavior.java deleted file mode 100644 index 175e16f21b..0000000000 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/DelegatingRaftActorBehavior.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.raft.behaviors; - -import akka.actor.ActorRef; -import org.opendaylight.controller.cluster.raft.RaftState; - -/** - * A RaftActorBehavior implementation that delegates to another implementation. - * - * @author Thomas Pantelis - */ -public class DelegatingRaftActorBehavior implements RaftActorBehavior { - private RaftActorBehavior delegate; - - public RaftActorBehavior getDelegate() { - return delegate; - } - - public void setDelegate(RaftActorBehavior delegate) { - this.delegate = delegate; - } - - @Override - public void close() throws Exception { - delegate.close(); - } - - @Override - public RaftActorBehavior handleMessage(ActorRef sender, Object message) { - return delegate.handleMessage(sender, message); - } - - @Override - public RaftState state() { - return delegate.state(); - } - - @Override - public String getLeaderId() { - return delegate.getLeaderId(); - } - - @Override - public void setReplicatedToAllIndex(long replicatedToAllIndex) { - delegate.setReplicatedToAllIndex(replicatedToAllIndex); - } - - @Override - public long getReplicatedToAllIndex() { - return delegate.getReplicatedToAllIndex(); - } - - @Override - public short getLeaderPayloadVersion() { - return delegate.getLeaderPayloadVersion(); - } - - @Override - public RaftActorBehavior switchBehavior(RaftActorBehavior behavior) { - return delegate.switchBehavior(behavior); - } -} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java index 25d14a2b19..34572653c3 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -17,6 +17,7 @@ import java.io.Serializable; import java.util.HashMap; import java.util.Map; import org.opendaylight.controller.cluster.NonPersistentDataProvider; +import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.policy.RaftPolicy; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.slf4j.Logger; @@ -318,4 +319,9 @@ public class MockRaftActorContext extends RaftActorContextImpl { return this.mockLog; } } + + @Override + public void setCurrentBehavior(final RaftActorBehavior behavior) { + super.setCurrentBehavior(behavior); + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java index 71ca4cae5d..b4497fd298 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java @@ -40,7 +40,6 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntrie import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries; import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm; -import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,8 +56,6 @@ public class RaftActorRecoverySupportTest { @Mock private DataPersistenceProvider mockPersistence; - @Mock - private RaftActorBehavior mockBehavior; @Mock private RaftActorRecoveryCohort mockCohort; @@ -80,11 +77,11 @@ public class RaftActorRecoverySupportTest { context = new RaftActorContextImpl(null, null, localId, new ElectionTermImpl(mockPersistentProvider, "test", LOG), -1, -1, Collections.emptyMap(), configParams, mockPersistence, LOG); - support = new RaftActorRecoverySupport(context, mockBehavior , mockCohort); + support = new RaftActorRecoverySupport(context, mockCohort); doReturn(true).when(mockPersistence).isRecoveryApplicable(); - context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, mockBehavior)); + context.setReplicatedLog(ReplicatedLogImpl.newInstance(context)); } private void sendMessageToSupport(Object message) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java index bdfb2b20c8..514346f1ab 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java @@ -135,7 +135,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { public void testAddServerWithExistingFollower() throws Exception { LOG.info("testAddServerWithExistingFollower starting"); setupNewFollower(); - RaftActorContext followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor); + RaftActorContextImpl followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor); followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries( 0, 3, 1).build()); followerActorContext.setCommitIndex(2); @@ -143,6 +143,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { Follower follower = new Follower(followerActorContext); followerActor.underlyingActor().setBehavior(follower); + followerActorContext.setCurrentBehavior(follower); TestActorRef leaderActor = actorFactory.createTestActor( MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()), @@ -1441,7 +1442,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { assertEquals("Server config", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig())); } - private static RaftActorContext newFollowerContext(String id, TestActorRef actor) { + private static RaftActorContextImpl newFollowerContext(String id, TestActorRef actor) { DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS)); configParams.setElectionTimeoutFactor(100000); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java index 94000d0712..32b2e465f6 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java @@ -71,11 +71,11 @@ public class RaftActorSnapshotMessageSupportTest { } }; - support = new RaftActorSnapshotMessageSupport(context, mockBehavior, mockCohort); + support = new RaftActorSnapshotMessageSupport(context, mockCohort); doReturn(true).when(mockPersistence).isRecoveryApplicable(); - context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, mockBehavior)); + context.setReplicatedLog(ReplicatedLogImpl.newInstance(context)); } private void sendMessageToSupport(Object message) { @@ -109,7 +109,7 @@ public class RaftActorSnapshotMessageSupportTest { byte[] snapshot = {1,2,3,4,5}; sendMessageToSupport(new CaptureSnapshotReply(snapshot)); - verify(mockSnapshotManager).persist(same(snapshot), same(mockBehavior), anyLong()); + verify(mockSnapshotManager).persist(same(snapshot), anyLong()); } @Test @@ -118,7 +118,7 @@ public class RaftActorSnapshotMessageSupportTest { long sequenceNumber = 100; sendMessageToSupport(new SaveSnapshotSuccess(new SnapshotMetadata("foo", sequenceNumber, 1234L))); - verify(mockSnapshotManager).commit(eq(sequenceNumber), same(mockBehavior)); + verify(mockSnapshotManager).commit(eq(sequenceNumber)); } @Test @@ -135,7 +135,7 @@ public class RaftActorSnapshotMessageSupportTest { sendMessageToSupport(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT); - verify(mockSnapshotManager).commit(eq(-1L), same(mockBehavior)); + verify(mockSnapshotManager).commit(eq(-1L)); } @Test diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index d37681e872..99b17fe8ab 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -657,12 +657,12 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("foo-4"))); leaderActor.getRaftActorContext().getSnapshotManager().persist(snapshotBytes.toByteArray(), - leader, Runtime.getRuntime().totalMemory()); + Runtime.getRuntime().totalMemory()); assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing()); // The commit is needed to complete the snapshot creation process - leaderActor.getRaftActorContext().getSnapshotManager().commit(-1, leader); + leaderActor.getRaftActorContext().getSnapshotManager().commit(-1); // capture snapshot reply should remove the snapshotted entries only assertEquals(3, leaderActor.getReplicatedLog().size()); @@ -765,7 +765,7 @@ public class RaftActorTest extends AbstractActorTest { assertTrue(followerActor.getRaftActorContext().getSnapshotManager().isCapturing()); // The commit is needed to complete the snapshot creation process - followerActor.getRaftActorContext().getSnapshotManager().commit(-1, follower); + followerActor.getRaftActorContext().getSnapshotManager().commit(-1); // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplTest.java index 51cff356f2..8f674cca3b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplTest.java @@ -76,7 +76,7 @@ public class ReplicatedLogImplTest { @SuppressWarnings("unchecked") @Test public void testAppendAndPersistExpectingNoCapture() throws Exception { - ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockBehavior); + ReplicatedLog log = ReplicatedLogImpl.newInstance(context); MockReplicatedLogEntry logEntry = new MockReplicatedLogEntry(1, 1, new MockPayload("1")); @@ -104,7 +104,7 @@ public class ReplicatedLogImplTest { doReturn(1L).when(mockBehavior).getReplicatedToAllIndex(); - ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockBehavior); + ReplicatedLog log = ReplicatedLogImpl.newInstance(context); MockReplicatedLogEntry logEntry1 = new MockReplicatedLogEntry(1, 2, new MockPayload("2")); MockReplicatedLogEntry logEntry2 = new MockReplicatedLogEntry(1, 3, new MockPayload("3")); @@ -132,7 +132,7 @@ public class ReplicatedLogImplTest { } }); - ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockBehavior); + ReplicatedLog log = ReplicatedLogImpl.newInstance(context); int dataSize = 600; MockReplicatedLogEntry logEntry = new MockReplicatedLogEntry(1, 2, new MockPayload("2", dataSize)); @@ -153,7 +153,7 @@ public class ReplicatedLogImplTest { @Test public void testRemoveFromAndPersist() throws Exception { - ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockBehavior); + ReplicatedLog log = ReplicatedLogImpl.newInstance(context); log.append(new MockReplicatedLogEntry(1, 0, new MockPayload("0"))); log.append(new MockReplicatedLogEntry(1, 1, new MockPayload("1"))); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java index 27596dff74..e6f7bd7b9e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java @@ -81,6 +81,7 @@ public class SnapshotManagerTest extends AbstractActorTest { doReturn(mockReplicatedLog).when(mockRaftActorContext).getReplicatedLog(); doReturn("123").when(mockRaftActorContext).getId(); doReturn(mockDataPersistenceProvider).when(mockRaftActorContext).getPersistenceProvider(); + doReturn(mockRaftActorBehavior).when(mockRaftActorContext).getCurrentBehavior(); doReturn("123").when(mockRaftActorBehavior).getLeaderId(); doReturn(mockElectionTerm).when(mockRaftActorContext).getTermInformation(); @@ -247,7 +248,7 @@ public class SnapshotManagerTest extends AbstractActorTest { snapshotManager.capture(lastLogEntry, -1); byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10}; - snapshotManager.persist(bytes, mockRaftActorBehavior, Runtime.getRuntime().totalMemory()); + snapshotManager.persist(bytes, Runtime.getRuntime().totalMemory()); ArgumentCaptor snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class); verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture()); @@ -280,7 +281,7 @@ public class SnapshotManagerTest extends AbstractActorTest { new MockRaftActorContext.MockPayload()), 9); byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10}; - snapshotManager.persist(bytes, mockRaftActorBehavior, Runtime.getRuntime().totalMemory()); + snapshotManager.persist(bytes, Runtime.getRuntime().totalMemory()); ArgumentCaptor snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class); verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture()); @@ -307,7 +308,7 @@ public class SnapshotManagerTest extends AbstractActorTest { snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9, new MockRaftActorContext.MockPayload()), -1); - snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory()); + snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory()); verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class)); @@ -333,7 +334,7 @@ public class SnapshotManagerTest extends AbstractActorTest { snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, new MockRaftActorContext.MockPayload()), replicatedToAllIndex); - snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, 2000000L); + snapshotManager.persist(new byte[]{}, 2000000L); verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class)); @@ -354,7 +355,7 @@ public class SnapshotManagerTest extends AbstractActorTest { byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10}; - snapshotManager.persist(bytes, mockRaftActorBehavior, Runtime.getRuntime().totalMemory()); + snapshotManager.persist(bytes, Runtime.getRuntime().totalMemory()); assertEquals(true, snapshotManager.isCapturing()); @@ -374,7 +375,7 @@ public class SnapshotManagerTest extends AbstractActorTest { @Test public void testCallingPersistWithoutCaptureWillDoNothing(){ - snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory()); + snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory()); verify(mockDataPersistenceProvider, never()).saveSnapshot(any(Snapshot.class)); @@ -390,9 +391,9 @@ public class SnapshotManagerTest extends AbstractActorTest { snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, new MockRaftActorContext.MockPayload()), -1, "follower-1"); - snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory()); + snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory()); - snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory()); + snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory()); verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class)); @@ -409,11 +410,11 @@ public class SnapshotManagerTest extends AbstractActorTest { snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, new MockRaftActorContext.MockPayload()), -1, "follower-1"); - snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory()); + snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory()); assertEquals(true, snapshotManager.isCapturing()); - snapshotManager.commit(100L, mockRaftActorBehavior); + snapshotManager.commit(100L); assertEquals(false, snapshotManager.isCapturing()); @@ -438,7 +439,7 @@ public class SnapshotManagerTest extends AbstractActorTest { snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, new MockRaftActorContext.MockPayload()), -1, "follower-1"); - snapshotManager.commit(100L, mockRaftActorBehavior); + snapshotManager.commit(100L); verify(mockReplicatedLog, never()).snapshotCommit(); @@ -450,7 +451,7 @@ public class SnapshotManagerTest extends AbstractActorTest { @Test public void testCommitBeforeCapture(){ - snapshotManager.commit(100L, mockRaftActorBehavior); + snapshotManager.commit(100L); verify(mockReplicatedLog, never()).snapshotCommit(); @@ -468,11 +469,11 @@ public class SnapshotManagerTest extends AbstractActorTest { snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, new MockRaftActorContext.MockPayload()), -1, "follower-1"); - snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory()); + snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory()); - snapshotManager.commit(100L, mockRaftActorBehavior); + snapshotManager.commit(100L); - snapshotManager.commit(100L, mockRaftActorBehavior); + snapshotManager.commit(100L); verify(mockReplicatedLog, times(1)).snapshotCommit(); @@ -487,7 +488,7 @@ public class SnapshotManagerTest extends AbstractActorTest { snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, new MockRaftActorContext.MockPayload()), -1, "follower-1"); - snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory()); + snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory()); snapshotManager.rollback(); @@ -521,7 +522,7 @@ public class SnapshotManagerTest extends AbstractActorTest { snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, new MockRaftActorContext.MockPayload()), -1, "follower-1"); - snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory()); + snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory()); snapshotManager.rollback(); @@ -539,7 +540,7 @@ public class SnapshotManagerTest extends AbstractActorTest { doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10); doReturn(5L).when(replicatedLogEntry).getTerm(); - long retIndex = snapshotManager.trimLog(10, mockRaftActorBehavior); + long retIndex = snapshotManager.trimLog(10); assertEquals("return index", 10L, retIndex); verify(mockReplicatedLog).snapshotPreCommit(10, 5); @@ -557,7 +558,7 @@ public class SnapshotManagerTest extends AbstractActorTest { doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10); doReturn(5L).when(replicatedLogEntry).getTerm(); - long retIndex = snapshotManager.trimLog(10, mockRaftActorBehavior); + long retIndex = snapshotManager.trimLog(10); assertEquals("return index", -1L, retIndex); verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong()); @@ -575,7 +576,7 @@ public class SnapshotManagerTest extends AbstractActorTest { doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10); doReturn(5L).when(replicatedLogEntry).getTerm(); - long retIndex = snapshotManager.trimLog(10, mockRaftActorBehavior); + long retIndex = snapshotManager.trimLog(10); assertEquals("return index", -1L, retIndex); verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong()); @@ -590,7 +591,7 @@ public class SnapshotManagerTest extends AbstractActorTest { doReturn(false).when(mockReplicatedLog).isPresent(10); - long retIndex = snapshotManager.trimLog(10, mockRaftActorBehavior); + long retIndex = snapshotManager.trimLog(10); assertEquals("return index", -1L, retIndex); verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong()); @@ -615,7 +616,7 @@ public class SnapshotManagerTest extends AbstractActorTest { doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10); doReturn(5L).when(replicatedLogEntry).getTerm(); - snapshotManager.trimLog(10, mockRaftActorBehavior); + snapshotManager.trimLog(10); verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong()); verify(mockReplicatedLog, never()).snapshotCommit(); @@ -637,7 +638,7 @@ public class SnapshotManagerTest extends AbstractActorTest { doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10); doReturn(5L).when(replicatedLogEntry).getTerm(); - snapshotManager.trimLog(10, mockRaftActorBehavior); + snapshotManager.trimLog(10); verify(mockReplicatedLog, never()).snapshotPreCommit(10, 5); verify(mockReplicatedLog, never()).snapshotCommit(); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeaderElectionScenarioTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeaderElectionScenarioTest.java index 63d9643e3c..31f2d62964 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeaderElectionScenarioTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeaderElectionScenarioTest.java @@ -25,7 +25,6 @@ import org.junit.After; import org.junit.Before; import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; -import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.TestActorFactory; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; @@ -199,13 +198,14 @@ public class AbstractLeaderElectionScenarioTest { assertEquals(name + " behavior state", expState, actor.behavior.state()); } - void initializeLeaderBehavior(MemberActor actor, RaftActorContext context, int numActiveFollowers) throws Exception { + void initializeLeaderBehavior(MemberActor actor, MockRaftActorContext context, int numActiveFollowers) throws Exception { // Leader sends immediate heartbeats - we don't care about it so ignore it. actor.expectMessageClass(AppendEntriesReply.class, numActiveFollowers); - @SuppressWarnings("resource") Leader leader = new Leader(context); + context.setCurrentBehavior(leader); + actor.waitForExpectedMessages(AppendEntriesReply.class); // Delay assignment here so the AppendEntriesReply isn't forwarded to the behavior. actor.behavior = leader; diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeaderTest.java index dd3ed2347c..003964a0a1 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeaderTest.java @@ -25,7 +25,7 @@ import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorAc import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import scala.concurrent.duration.FiniteDuration; -public abstract class AbstractLeaderTest extends AbstractRaftActorBehaviorTest{ +public abstract class AbstractLeaderTest extends AbstractRaftActorBehaviorTest { /** * When we removed scheduling of heartbeat in the AbstractLeader constructor we ended up with a situation where diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java index 496f7b071a..30ca63b063 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java @@ -40,7 +40,7 @@ import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payloa import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.slf4j.LoggerFactory; -public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { +public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { protected final TestActorFactory actorFactory = new TestActorFactory(getSystem()); @@ -66,7 +66,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { */ @Test public void testHandleRaftRPCWithNewerTerm() throws Exception { - RaftActorContext actorContext = createActorContext(); + MockRaftActorContext actorContext = createActorContext(); assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, behaviorActor, createAppendEntriesWithNewerTerm()); @@ -212,7 +212,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { */ @Test public void testHandleRequestVoteWhenSenderTermLessThanCurrentTerm() { - RaftActorContext context = createActorContext(); + MockRaftActorContext context = createActorContext(); context.getTermInformation().update(1000, null); @@ -272,11 +272,11 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { } - protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext, + protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext, ActorRef actorRef, RaftRPC rpc) throws Exception { Payload p = new MockRaftActorContext.MockPayload(""); - setLastLogEntry((MockRaftActorContext) actorContext, 1, 0, p); + setLastLogEntry(actorContext, 1, 0, p); actorContext.getTermInformation().update(1, "test"); RaftActorBehavior origBehavior = createBehavior(actorContext); @@ -304,8 +304,13 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { return log; } - protected abstract RaftActorBehavior createBehavior( - RaftActorContext actorContext); + protected abstract T createBehavior(RaftActorContext actorContext); + + protected final T createBehavior(MockRaftActorContext actorContext) { + T ret = createBehavior((RaftActorContext)actorContext); + actorContext.setCurrentBehavior(ret); + return ret; + } protected RaftActorBehavior createBehavior() { return createBehavior(createActorContext()); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java index 73e0a16515..6e5c931502 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java @@ -43,7 +43,7 @@ import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class CandidateTest extends AbstractRaftActorBehaviorTest { +public class CandidateTest extends AbstractRaftActorBehaviorTest { static final Logger LOG = LoggerFactory.getLogger(CandidateTest.class); private final TestActorRef candidateActor = actorFactory.createTestActor( @@ -325,7 +325,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { } @Override - protected RaftActorBehavior createBehavior(RaftActorContext actorContext) { + protected Candidate createBehavior(final RaftActorContext actorContext) { return new Candidate(actorContext); } @@ -333,7 +333,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { return new MockRaftActorContext("candidate", getSystem(), candidateActor); } - private Map setupPeers(int count) { + private Map setupPeers(final int count) { Map peerMap = new HashMap<>(); peerActors = new TestActorRef[count]; for(int i = 0; i < count; i++) { @@ -346,8 +346,8 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { } @Override - protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext, - ActorRef actorRef, RaftRPC rpc) throws Exception { + protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext, + final ActorRef actorRef, final RaftRPC rpc) throws Exception { super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc); assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor()); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/DelayedMessagesElectionScenarioTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/DelayedMessagesElectionScenarioTest.java index 000456bd57..a1301adb7c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/DelayedMessagesElectionScenarioTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/DelayedMessagesElectionScenarioTest.java @@ -183,6 +183,7 @@ public class DelayedMessagesElectionScenarioTest extends AbstractLeaderElectionS member2Context.setConfigParams(member2ConfigParams); member2Actor.behavior = new Follower(member2Context); + member2Context.setCurrentBehavior(member2Actor.behavior); // Create member 3's behavior initially as Follower @@ -195,6 +196,7 @@ public class DelayedMessagesElectionScenarioTest extends AbstractLeaderElectionS member3Context.setConfigParams(member3ConfigParams); member3Actor.behavior = new Follower(member3Context); + member3Context.setCurrentBehavior(member3Actor.behavior); // Create member 1's behavior initially as Leader diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index d5d9e88098..c10ff00356 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -50,7 +50,7 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import scala.concurrent.duration.FiniteDuration; -public class FollowerTest extends AbstractRaftActorBehaviorTest { +public class FollowerTest extends AbstractRaftActorBehaviorTest { private final TestActorRef followerActor = actorFactory.createTestActor( Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower")); @@ -113,7 +113,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){ logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull"); - RaftActorContext context = createActorContext(); + MockRaftActorContext context = createActorContext(); long term = 1000; context.getTermInformation().update(term, null); @@ -132,7 +132,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){ logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId"); - RaftActorContext context = createActorContext(); + MockRaftActorContext context = createActorContext(); long term = 1000; context.getTermInformation().update(term, "test"); @@ -1034,7 +1034,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } @Override - protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext, + protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext, ActorRef actorRef, RaftRPC rpc) throws Exception { super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc); @@ -1043,7 +1043,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } @Override - protected void handleAppendEntriesAddSameEntryToLogReply(TestActorRef replyActor) + protected void handleAppendEntriesAddSameEntryToLogReply(final TestActorRef replyActor) throws Exception { AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class); assertEquals("isSuccess", true, reply.isSuccess()); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeaderTest.java index 0eb58d427a..a48efb21f3 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeaderTest.java @@ -22,7 +22,7 @@ import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; -public class IsolatedLeaderTest extends AbstractLeaderTest { +public class IsolatedLeaderTest extends AbstractLeaderTest { private final TestActorRef leaderActor = actorFactory.createTestActor( Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader")); @@ -43,7 +43,7 @@ public class IsolatedLeaderTest extends AbstractLeaderTest { } @Override - protected RaftActorBehavior createBehavior(RaftActorContext actorContext) { + protected IsolatedLeader createBehavior(RaftActorContext actorContext) { return new IsolatedLeader(actorContext); } @@ -73,6 +73,7 @@ public class IsolatedLeaderTest extends AbstractLeaderTest { leaderActorContext.setPeerAddresses(peerAddresses); isolatedLeader = new IsolatedLeader(leaderActorContext); + leaderActorContext.setCurrentBehavior(isolatedLeader); assertEquals("Raft state", RaftState.IsolatedLeader, isolatedLeader.state()); // in a 3 node cluster, even if 1 follower is returns a reply, the isolatedLeader is not isolated @@ -108,6 +109,7 @@ public class IsolatedLeaderTest extends AbstractLeaderTest { leaderActorContext.setPeerAddresses(peerAddresses); isolatedLeader = new IsolatedLeader(leaderActorContext); + leaderActorContext.setCurrentBehavior(isolatedLeader); assertEquals("Raft state", RaftState.IsolatedLeader, isolatedLeader.state()); // in a 5 member cluster, atleast 2 followers need to be active and return a reply diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index b0d220a0d6..9d8144c439 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -64,7 +64,7 @@ import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorAc import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import scala.concurrent.duration.FiniteDuration; -public class LeaderTest extends AbstractLeaderTest { +public class LeaderTest extends AbstractLeaderTest { static final String FOLLOWER_ID = "follower"; public static final String LEADER_ID = "leader"; @@ -112,6 +112,7 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(term, ""); leader = new Leader(actorContext); + actorContext.setCurrentBehavior(leader); // Leader should send an immediate heartbeat with no entries as follower is inactive. long lastIndex = actorContext.getReplicatedLog().lastIndex(); @@ -208,6 +209,7 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(newTerm, ""); leader = new Leader(actorContext); + actorContext.setCurrentBehavior(leader); // Leader will send an immediate heartbeat - ignore it. MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); @@ -894,6 +896,7 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.setCommitIndex(commitIndex); leader = new Leader(actorContext); + actorContext.setCurrentBehavior(leader); leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); leader.getFollower(FOLLOWER_ID).setNextIndex(0); @@ -962,6 +965,7 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.setCommitIndex(commitIndex); leader = new Leader(actorContext); + actorContext.setCurrentBehavior(leader); leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); leader.getFollower(FOLLOWER_ID).setNextIndex(0); @@ -1190,8 +1194,8 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks()); } - @Override protected RaftActorBehavior createBehavior( - RaftActorContext actorContext) { + @Override + protected Leader createBehavior(final RaftActorContext actorContext) { return new Leader(actorContext); } @@ -1241,6 +1245,7 @@ public class LeaderTest extends AbstractLeaderTest { Follower follower = new Follower(followerActorContext); followerActor.underlyingActor().setBehavior(follower); + followerActorContext.setCurrentBehavior(follower); Map peerAddresses = new HashMap<>(); peerAddresses.put(FOLLOWER_ID, followerActor.path().toString()); @@ -1264,6 +1269,7 @@ public class LeaderTest extends AbstractLeaderTest { followerActorContext.setCommitIndex(1); leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); @@ -1295,6 +1301,7 @@ public class LeaderTest extends AbstractLeaderTest { Follower follower = new Follower(followerActorContext); followerActor.underlyingActor().setBehavior(follower); + followerActorContext.setCurrentBehavior(follower); Map leaderPeerAddresses = new HashMap<>(); leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString()); @@ -1462,6 +1469,7 @@ public class LeaderTest extends AbstractLeaderTest { Follower follower = new Follower(followerActorContext); followerActor.underlyingActor().setBehavior(follower); + followerActorContext.setCurrentBehavior(follower); leader = new Leader(leaderActorContext); @@ -1477,6 +1485,7 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex()); leaderActor.underlyingActor().setBehavior(leader); + leaderActorContext.setCurrentBehavior(leader); leader.handleMessage(followerActor, appendEntriesReply); @@ -1541,6 +1550,7 @@ public class LeaderTest extends AbstractLeaderTest { Follower follower = new Follower(followerActorContext); followerActor.underlyingActor().setBehavior(follower); + followerActorContext.setCurrentBehavior(follower); leader = new Leader(leaderActorContext); @@ -1556,6 +1566,7 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex()); leaderActor.underlyingActor().setBehavior(leader); + leaderActorContext.setCurrentBehavior(leader); leader.handleMessage(followerActor, appendEntriesReply); @@ -1738,6 +1749,7 @@ public class LeaderTest extends AbstractLeaderTest { Follower follower = new Follower(followerActorContext); followerActor.underlyingActor().setBehavior(follower); + followerActorContext.setCurrentBehavior(follower); leader = new Leader(leaderActorContext); @@ -1753,6 +1765,7 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex()); leaderActor.underlyingActor().setBehavior(leader); + leaderActorContext.setCurrentBehavior(leader); leader.handleMessage(followerActor, appendEntriesReply); @@ -1964,6 +1977,7 @@ public class LeaderTest extends AbstractLeaderTest { leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING); leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); // Ignore initial heartbeats MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); @@ -2020,6 +2034,7 @@ public class LeaderTest extends AbstractLeaderTest { leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); // Initial heartbeat MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); @@ -2059,6 +2074,7 @@ public class LeaderTest extends AbstractLeaderTest { leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); // Initial heartbeat MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); @@ -2090,6 +2106,7 @@ public class LeaderTest extends AbstractLeaderTest { new FiniteDuration(200, TimeUnit.MILLISECONDS)); leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); // Initial heartbeat MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); @@ -2128,6 +2145,7 @@ public class LeaderTest extends AbstractLeaderTest { leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); // Initial heartbeat MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); @@ -2157,7 +2175,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Override - protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext, + protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext, ActorRef actorRef, RaftRPC rpc) throws Exception { super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc); assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor()); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/PartitionedCandidateOnStartupElectionScenarioTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/PartitionedCandidateOnStartupElectionScenarioTest.java index 376c01e500..a323aa78ae 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/PartitionedCandidateOnStartupElectionScenarioTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/PartitionedCandidateOnStartupElectionScenarioTest.java @@ -181,6 +181,7 @@ public class PartitionedCandidateOnStartupElectionScenarioTest extends AbstractL Candidate member3Behavior = new Candidate(member3Context); member3Actor.behavior = member3Behavior; + member3Context.setCurrentBehavior(member3Behavior); // Send several additional ElectionTimeouts to Candidate member 3. Each ElectionTimeout will // start a new term so Candidate member 3's current term will be greater than the leader's @@ -219,6 +220,7 @@ public class PartitionedCandidateOnStartupElectionScenarioTest extends AbstractL member2Context.setConfigParams(member2ConfigParams); member2Actor.behavior = new Follower(member2Context); + member2Context.setCurrentBehavior(member2Actor.behavior); // Create member 1's behavior as Leader. diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/PartitionedLeadersElectionScenarioTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/PartitionedLeadersElectionScenarioTest.java index 83f7a75c9c..11cc1f0a6d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/PartitionedLeadersElectionScenarioTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/PartitionedLeadersElectionScenarioTest.java @@ -287,6 +287,7 @@ public class PartitionedLeadersElectionScenarioTest extends AbstractLeaderElecti member2Context.setConfigParams(member2ConfigParams); member2Actor.behavior = new Follower(member2Context); + member2Context.setCurrentBehavior(member2Actor.behavior); // Create member 3's behavior initially as Follower @@ -299,6 +300,7 @@ public class PartitionedLeadersElectionScenarioTest extends AbstractLeaderElecti member3Context.setConfigParams(member3ConfigParams); member3Actor.behavior = new Follower(member3Context); + member3Context.setCurrentBehavior(member3Actor.behavior); // Create member 1's behavior initially as Leader