The delegate is leaked through various methods, implementations of which already have access
to the current behavior if it were available from RaftActorContext. Simplify calling
conventions
Change-Id: I9e27f68e55f28a9afd446abff91fbb38dd26c011
Signed-off-by: Robert Varga <rovarga@cisco.com>
import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior;
-import org.opendaylight.controller.cluster.raft.behaviors.DelegatingRaftActorBehavior;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
protected final Logger LOG = LoggerFactory.getLogger(getClass());
- /**
- * The current state determines the current behavior of a RaftActor
- * A Raft Actor always starts off in the Follower State
- */
- private final DelegatingRaftActorBehavior currentBehavior = new DelegatingRaftActorBehavior();
-
/**
* This context should NOT be passed directly to any other actor it is
* only to be consumed by the RaftActorBehaviors
delegatingPersistenceProvider, LOG);
context.setPayloadVersion(payloadVersion);
- context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, currentBehavior));
+ context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
}
@Override
@Override
public void postStop() {
- if(currentBehavior.getDelegate() != null) {
- try {
- currentBehavior.close();
- } catch (Exception e) {
- LOG.debug("{}: Error closing behavior {}", persistenceId(), currentBehavior.state());
- }
- }
-
+ context.close();
super.postStop();
}
}
protected RaftActorRecoverySupport newRaftActorRecoverySupport() {
- return new RaftActorRecoverySupport(context, currentBehavior, getRaftActorRecoveryCohort());
+ return new RaftActorRecoverySupport(context, getRaftActorRecoveryCohort());
}
- protected void initializeBehavior(){
+ @VisibleForTesting
+ void initializeBehavior(){
changeCurrentBehavior(new Follower(context));
}
+ @VisibleForTesting
protected void changeCurrentBehavior(RaftActorBehavior newBehavior){
if(getCurrentBehavior() != null) {
try {
// and recovery shows data missing
context.getReplicatedLog().captureSnapshotIfReady(applyState.getReplicatedLogEntry());
- context.getSnapshotManager().trimLog(context.getLastApplied(), currentBehavior);
+ context.getSnapshotManager().trimLog(context.getLastApplied());
}
} else if (message instanceof ApplyJournalEntries) {
}
shuttingDown = true;
+
+ final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
if(currentBehavior.state() == RaftState.Leader && context.hasFollowers()) {
initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
@Override
private void onLeaderTransitioning() {
LOG.debug("{}: onLeaderTransitioning", persistenceId());
Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
- if(currentBehavior.state() == RaftState.Follower && roleChangeNotifier.isPresent()) {
+ if(getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()) {
roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), null,
- currentBehavior.getLeaderPayloadVersion()), getSelf());
+ getCurrentBehavior().getLeaderPayloadVersion()), getSelf());
}
}
handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior());
}
- protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
- return new RaftActorSnapshotMessageSupport(context, currentBehavior,
- getRaftActorSnapshotCohort());
+ @VisibleForTesting
+ RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
+ return new RaftActorSnapshotMessageSupport(context, getRaftActorSnapshotCohort());
}
private void onGetOnDemandRaftStats() {
peerAddresses.put(peerId, context.getPeerAddress(peerId));
}
+ final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
OnDemandRaftState.Builder builder = OnDemandRaftState.builder()
.commitIndex(context.getCommitIndex())
.currentTerm(context.getTermInformation().getCurrentTerm())
.peerAddresses(peerAddresses)
.customRaftPolicyClassName(context.getConfigParams().getCustomRaftPolicyImplementationClass());
- ReplicatedLogEntry lastLogEntry = getLastLogEntry();
+ ReplicatedLogEntry lastLogEntry = replicatedLog().last();
if (lastLogEntry != null) {
builder.lastLogIndex(lastLogEntry.getIndex());
builder.lastLogTerm(lastLogEntry.getTerm());
context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry);
// Send message for replication
- currentBehavior.handleMessage(getSelf(),
+ getCurrentBehavior().handleMessage(getSelf(),
new Replicate(clientActor, identifier, replicatedLogEntry));
}
}
@VisibleForTesting
void setCurrentBehavior(RaftActorBehavior behavior) {
- currentBehavior.setDelegate(behavior);
+ context.setCurrentBehavior(behavior);
}
protected RaftActorBehavior getCurrentBehavior() {
- return currentBehavior.getDelegate();
+ return context.getCurrentBehavior();
}
/**
* @return true it this RaftActor is a Leader false otherwise
*/
protected boolean isLeader() {
- return context.getId().equals(currentBehavior.getLeaderId());
+ return context.getId().equals(getCurrentBehavior().getLeaderId());
}
- protected boolean isLeaderActive() {
- return currentBehavior.state() != RaftState.IsolatedLeader && !shuttingDown &&
+ protected final boolean isLeaderActive() {
+ return getRaftState() != RaftState.IsolatedLeader && !shuttingDown &&
!isLeadershipTransferInProgress();
}
*
* @return the current leader's id
*/
- protected String getLeaderId(){
- return currentBehavior.getLeaderId();
- }
-
- protected RaftState getRaftState() {
- return currentBehavior.state();
+ protected final String getLeaderId(){
+ return getCurrentBehavior().getLeaderId();
}
- protected ReplicatedLogEntry getLastLogEntry() {
- return replicatedLog().last();
+ @VisibleForTesting
+ protected final RaftState getRaftState() {
+ return getCurrentBehavior().state();
}
protected Long getCurrentTerm(){
return context.getTermInformation().getCurrentTerm();
}
- protected Long getCommitIndex(){
- return context.getCommitIndex();
- }
-
- protected Long getLastApplied(){
- return context.getLastApplied();
- }
-
protected RaftActorContext getRaftActorContext() {
return context;
}
// The RaftPolicy was modified. If the current behavior is Follower then re-initialize to Follower
// but transfer the previous leaderId so it doesn't immediately try to schedule an election. This
// avoids potential disruption. Otherwise, switch to Follower normally.
- RaftActorBehavior behavior = currentBehavior.getDelegate();
+ RaftActorBehavior behavior = getCurrentBehavior();
if(behavior instanceof Follower) {
String previousLeaderId = ((Follower)behavior).getLeaderId();
short previousLeaderPayloadVersion = behavior.getLeaderPayloadVersion();
operation.run();
}
- protected void onLeaderChanged(String oldLeader, String newLeader){};
+ protected void onLeaderChanged(String oldLeader, String newLeader) {
+
+ };
private String getLeaderAddress(){
if(isLeader()){
return getSelf().path().toString();
}
- String leaderId = currentBehavior.getLeaderId();
+ String leaderId = getLeaderId();
if (leaderId == null) {
return null;
}
private void captureSnapshot() {
SnapshotManager snapshotManager = context.getSnapshotManager();
- if(!snapshotManager.isCapturing()) {
+ if (!snapshotManager.isCapturing()) {
+ final long idx = getCurrentBehavior().getReplicatedToAllIndex();
LOG.debug("Take a snapshot of current state. lastReplicatedLog is {} and replicatedToAllIndex is {}",
- replicatedLog().last(), currentBehavior.getReplicatedToAllIndex());
+ replicatedLog().last(), idx);
- snapshotManager.capture(replicatedLog().last(), currentBehavior.getReplicatedToAllIndex());
+ snapshotManager.capture(replicatedLog().last(), idx);
}
}
if(this.message instanceof SwitchBehavior){
return AbstractRaftActorBehavior.createBehavior(context, ((SwitchBehavior) message).getNewState());
}
- return currentBehavior.handleMessage(sender, message);
+ return getCurrentBehavior().handleMessage(sender, message);
}
}
}
import java.util.Collection;
import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
import org.slf4j.Logger;
* @return true if this RaftActor is a voting member of the cluster, false otherwise.
*/
boolean isVotingMember();
+
+ /**
+ * @return current behavior attached to the raft actor.
+ */
+ RaftActorBehavior getCurrentBehavior();
}
import akka.actor.ActorSystem;
import akka.actor.Props;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Set;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
import org.slf4j.Logger;
private boolean votingMember = true;
+ private RaftActorBehavior currentBehavior;
+
public RaftActorContextImpl(ActorRef actor, ActorContext context, String id,
ElectionTerm termInformation, long commitIndex, long lastApplied, Map<String, String> peerAddresses,
ConfigParams configParams, DataPersistenceProvider persistenceProvider, Logger logger) {
public boolean isVotingMember() {
return votingMember;
}
+
+ @Override
+ public RaftActorBehavior getCurrentBehavior() {
+ return currentBehavior;
+ }
+
+ void setCurrentBehavior(final RaftActorBehavior behavior) {
+ this.currentBehavior = Preconditions.checkNotNull(behavior);
+ }
+
+ void close() {
+ if (currentBehavior != null) {
+ try {
+ currentBehavior.close();
+ } catch (Exception e) {
+ LOG.debug("{}: Error closing behavior {}", getId(), currentBehavior.state());
+ }
+ }
+ }
}
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
-import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.slf4j.Logger;
/**
*/
class RaftActorRecoverySupport {
private final RaftActorContext context;
- private final RaftActorBehavior currentBehavior;
private final RaftActorRecoveryCohort cohort;
private int currentRecoveryBatchCount;
private Stopwatch recoveryTimer;
private final Logger log;
- RaftActorRecoverySupport(RaftActorContext context, RaftActorBehavior currentBehavior,
- RaftActorRecoveryCohort cohort) {
+ RaftActorRecoverySupport(final RaftActorContext context, final RaftActorRecoveryCohort cohort) {
this.context = context;
- this.currentBehavior = currentBehavior;
this.cohort = cohort;
this.log = context.getLogger();
}
// The replicated log can be used later on to retrieve this snapshot
// when we need to install it on a peer
- context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, currentBehavior));
+ context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context));
context.setLastApplied(snapshot.getLastAppliedIndex());
context.setCommitIndex(snapshot.getLastAppliedIndex());
context.getTermInformation().update(snapshot.getElectionTerm(), snapshot.getElectionVotedFor());
private void onRemoveServer(RemoveServer removeServer, ActorRef sender) {
LOG.debug("{}: onRemoveServer: {}, state: {}", raftContext.getId(), removeServer, currentOperationState);
- boolean isSelf = removeServer.getServerId().equals(raftActor.getId());
+ boolean isSelf = removeServer.getServerId().equals(raftContext.getId());
if(isSelf && !raftContext.hasFollowers()) {
sender.tell(new RemoveServerReply(ServerChangeStatus.NOT_SUPPORTED, raftActor.getLeaderId()),
raftActor.getSelf());
// Sanity check - we could get an ApplyState from a previous operation that timed out so make
// sure it's meant for us.
if(operationContext.getContextId().equals(applyState.getIdentifier())) {
- LOG.info("{}: {} has been successfully replicated to a majority of followers", raftActor.getId(),
+ LOG.info("{}: {} has been successfully replicated to a majority of followers", raftContext.getId(),
applyState.getReplicatedLogEntry().getData());
timer.cancel();
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
-import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply;
import org.slf4j.Logger;
static final String COMMIT_SNAPSHOT = "commit_snapshot";
private final RaftActorContext context;
- private final RaftActorBehavior currentBehavior;
private final RaftActorSnapshotCohort cohort;
private final Logger log;
private Duration snapshotReplyActorTimeout = Duration.create(30, TimeUnit.SECONDS);
- RaftActorSnapshotMessageSupport(RaftActorContext context, RaftActorBehavior currentBehavior,
- RaftActorSnapshotCohort cohort) {
+ RaftActorSnapshotMessageSupport(final RaftActorContext context, final RaftActorSnapshotCohort cohort) {
this.context = context;
- this.currentBehavior = currentBehavior;
this.cohort = cohort;
this.log = context.getLogger();
} else if (message instanceof CaptureSnapshotReply) {
onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
} else if (message.equals(COMMIT_SNAPSHOT)) {
- context.getSnapshotManager().commit(-1, currentBehavior);
+ context.getSnapshotManager().commit(-1);
} else if (message instanceof GetSnapshot) {
onGetSnapshot(sender);
} else {
private void onCaptureSnapshotReply(byte[] snapshotBytes) {
log.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", context.getId(), snapshotBytes.length);
- context.getSnapshotManager().persist(snapshotBytes, currentBehavior, context.getTotalMemory());
+ context.getSnapshotManager().persist(snapshotBytes, context.getTotalMemory());
}
private void onSaveSnapshotFailure(SaveSnapshotFailure saveSnapshotFailure) {
long sequenceNumber = success.metadata().sequenceNr();
- context.getSnapshotManager().commit(sequenceNumber, currentBehavior);
+ context.getSnapshotManager().commit(sequenceNumber);
}
private void onApplySnapshot(ApplySnapshot message) {
import java.util.Collections;
import java.util.List;
import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
-import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
/**
* Implementation of ReplicatedLog used by the RaftActor.
private long dataSizeSinceLastSnapshot = 0L;
private final RaftActorContext context;
- private final RaftActorBehavior currentBehavior;
private final Procedure<DeleteEntries> deleteProcedure = new Procedure<DeleteEntries>() {
@Override
}
};
- static ReplicatedLog newInstance(final Snapshot snapshot, final RaftActorContext context,
- final RaftActorBehavior currentBehavior) {
+ static ReplicatedLog newInstance(final Snapshot snapshot, final RaftActorContext context) {
return new ReplicatedLogImpl(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
- snapshot.getUnAppliedEntries(), context, currentBehavior);
+ snapshot.getUnAppliedEntries(), context);
}
- static ReplicatedLog newInstance(final RaftActorContext context, final RaftActorBehavior currentBehavior) {
- return new ReplicatedLogImpl(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), context,
- currentBehavior);
+ static ReplicatedLog newInstance(final RaftActorContext context) {
+ return new ReplicatedLogImpl(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), context);
}
private ReplicatedLogImpl(final long snapshotIndex, final long snapshotTerm, final List<ReplicatedLogEntry> unAppliedEntries,
- final RaftActorContext context, final RaftActorBehavior currentBehavior) {
+ final RaftActorContext context) {
super(snapshotIndex, snapshotTerm, unAppliedEntries);
this.context = Preconditions.checkNotNull(context);
- this.currentBehavior = Preconditions.checkNotNull(currentBehavior);
}
@Override
|| getDataSizeForSnapshotCheck() > dataThreshold)) {
boolean started = context.getSnapshotManager().capture(replicatedLogEntry,
- currentBehavior.getReplicatedToAllIndex());
+ context.getCurrentBehavior().getReplicatedToAllIndex());
if (started) {
if (!context.hasFollowers()) {
dataSizeSinceLastSnapshot = 0;
}
@Override
- public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) {
- currentState.persist(snapshotBytes, currentBehavior, totalMemory);
+ public void persist(final byte[] snapshotBytes, final long totalMemory) {
+ currentState.persist(snapshotBytes, totalMemory);
}
@Override
- public void commit(long sequenceNumber, RaftActorBehavior currentBehavior) {
- currentState.commit(sequenceNumber, currentBehavior);
+ public void commit(final long sequenceNumber) {
+ currentState.commit(sequenceNumber);
}
@Override
}
@Override
- public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
- return currentState.trimLog(desiredTrimIndex, currentBehavior);
+ public long trimLog(final long desiredTrimIndex) {
+ return currentState.trimLog(desiredTrimIndex);
}
public void setCreateSnapshotCallable(Procedure<Void> createSnapshotProcedure) {
}
@Override
- public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) {
+ public void persist(final byte[] snapshotBytes, final long totalMemory) {
LOG.debug("persist should not be called in state {}", this);
}
@Override
- public void commit(long sequenceNumber, RaftActorBehavior currentBehavior) {
+ public void commit(final long sequenceNumber) {
LOG.debug("commit should not be called in state {}", this);
}
}
@Override
- public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
+ public long trimLog(final long desiredTrimIndex) {
LOG.debug("trimLog should not be called in state {}", this);
return -1;
}
- protected long doTrimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior){
+ protected long doTrimLog(final long desiredTrimIndex) {
// we would want to keep the lastApplied as its used while capturing snapshots
long lastApplied = context.getLastApplied();
long tempMin = Math.min(desiredTrimIndex, (lastApplied > -1 ? lastApplied - 1 : -1));
context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm());
context.getReplicatedLog().snapshotCommit();
return tempMin;
- } else if(tempMin > currentBehavior.getReplicatedToAllIndex()) {
+ }
+
+ final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
+ if(tempMin > currentBehavior.getReplicatedToAllIndex()) {
// It's possible a follower was lagging and an install snapshot advanced its match index past
// the current replicatedToAllIndex. Since the follower is now caught up we should advance the
// replicatedToAllIndex (to tempMin). The fact that tempMin wasn't found in the log is likely
}
@Override
- public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
- return doTrimLog(desiredTrimIndex, currentBehavior);
+ public long trimLog(final long desiredTrimIndex) {
+ return doTrimLog(desiredTrimIndex);
}
}
private class Creating extends AbstractSnapshotState {
@Override
- public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) {
+ public void persist(final byte[] snapshotBytes, final long totalMemory) {
// create a snapshot object from the state provided and save it
// when snapshot is saved async, SaveSnapshotSuccess is raised.
boolean logSizeExceededSnapshotBatchCount =
context.getReplicatedLog().size() >= context.getConfigParams().getSnapshotBatchCount();
+ final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
if (dataSizeThresholdExceeded || logSizeExceededSnapshotBatchCount) {
if(LOG.isDebugEnabled()) {
if(dataSizeThresholdExceeded) {
private class Persisting extends AbstractSnapshotState {
@Override
- public void commit(long sequenceNumber, RaftActorBehavior currentBehavior) {
+ public void commit(final long sequenceNumber) {
LOG.debug("{}: Snapshot success - sequence number: {}", persistenceId(), sequenceNumber);
if(applySnapshot != null) {
Snapshot snapshot = applySnapshot.getSnapshot();
//clears the followers log, sets the snapshot index to ensure adjusted-index works
- context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, currentBehavior));
+ context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context));
context.setLastApplied(snapshot.getLastAppliedIndex());
context.setCommitIndex(snapshot.getLastAppliedIndex());
context.getTermInformation().update(snapshot.getElectionTerm(), snapshot.getElectionVotedFor());
package org.opendaylight.controller.cluster.raft;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
-import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
public interface SnapshotState {
/**
* @param currentBehavior
* @param totalMemory
*/
- void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory);
+ void persist(byte[] snapshotBytes, long totalMemory);
/**
* Commit the snapshot by trimming the log
*
* @param sequenceNumber
*/
- void commit(long sequenceNumber, RaftActorBehavior currentBehavior);
+ void commit(long sequenceNumber);
/**
* Rollback the snapshot
* @param desiredTrimIndex
* @return the actual trim index
*/
- long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior);
+ long trimLog(long desiredTrimIndex);
}
* @param snapshotCapturedIndex
*/
protected void performSnapshotWithoutCapture(final long snapshotCapturedIndex) {
- long actualIndex = context.getSnapshotManager().trimLog(snapshotCapturedIndex, this);
+ long actualIndex = context.getSnapshotManager().trimLog(snapshotCapturedIndex);
if(actualIndex != -1){
setReplicatedToAllIndex(actualIndex);
+++ /dev/null
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.raft.behaviors;
-
-import akka.actor.ActorRef;
-import org.opendaylight.controller.cluster.raft.RaftState;
-
-/**
- * A RaftActorBehavior implementation that delegates to another implementation.
- *
- * @author Thomas Pantelis
- */
-public class DelegatingRaftActorBehavior implements RaftActorBehavior {
- private RaftActorBehavior delegate;
-
- public RaftActorBehavior getDelegate() {
- return delegate;
- }
-
- public void setDelegate(RaftActorBehavior delegate) {
- this.delegate = delegate;
- }
-
- @Override
- public void close() throws Exception {
- delegate.close();
- }
-
- @Override
- public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
- return delegate.handleMessage(sender, message);
- }
-
- @Override
- public RaftState state() {
- return delegate.state();
- }
-
- @Override
- public String getLeaderId() {
- return delegate.getLeaderId();
- }
-
- @Override
- public void setReplicatedToAllIndex(long replicatedToAllIndex) {
- delegate.setReplicatedToAllIndex(replicatedToAllIndex);
- }
-
- @Override
- public long getReplicatedToAllIndex() {
- return delegate.getReplicatedToAllIndex();
- }
-
- @Override
- public short getLeaderPayloadVersion() {
- return delegate.getLeaderPayloadVersion();
- }
-
- @Override
- public RaftActorBehavior switchBehavior(RaftActorBehavior behavior) {
- return delegate.switchBehavior(behavior);
- }
-}
import java.util.HashMap;
import java.util.Map;
import org.opendaylight.controller.cluster.NonPersistentDataProvider;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.slf4j.Logger;
return this.mockLog;
}
}
+
+ @Override
+ public void setCurrentBehavior(final RaftActorBehavior behavior) {
+ super.setCurrentBehavior(behavior);
+ }
}
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
-import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Mock
private DataPersistenceProvider mockPersistence;
- @Mock
- private RaftActorBehavior mockBehavior;
@Mock
private RaftActorRecoveryCohort mockCohort;
context = new RaftActorContextImpl(null, null, localId, new ElectionTermImpl(mockPersistentProvider, "test", LOG),
-1, -1, Collections.<String,String>emptyMap(), configParams, mockPersistence, LOG);
- support = new RaftActorRecoverySupport(context, mockBehavior , mockCohort);
+ support = new RaftActorRecoverySupport(context, mockCohort);
doReturn(true).when(mockPersistence).isRecoveryApplicable();
- context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, mockBehavior));
+ context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
}
private void sendMessageToSupport(Object message) {
public void testAddServerWithExistingFollower() throws Exception {
LOG.info("testAddServerWithExistingFollower starting");
setupNewFollower();
- RaftActorContext followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
+ RaftActorContextImpl followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
0, 3, 1).build());
followerActorContext.setCommitIndex(2);
Follower follower = new Follower(followerActorContext);
followerActor.underlyingActor().setBehavior(follower);
+ followerActorContext.setCurrentBehavior(follower);
TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
assertEquals("Server config", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig()));
}
- private static RaftActorContext newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
+ private static RaftActorContextImpl newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
configParams.setElectionTimeoutFactor(100000);
}
};
- support = new RaftActorSnapshotMessageSupport(context, mockBehavior, mockCohort);
+ support = new RaftActorSnapshotMessageSupport(context, mockCohort);
doReturn(true).when(mockPersistence).isRecoveryApplicable();
- context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, mockBehavior));
+ context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
}
private void sendMessageToSupport(Object message) {
byte[] snapshot = {1,2,3,4,5};
sendMessageToSupport(new CaptureSnapshotReply(snapshot));
- verify(mockSnapshotManager).persist(same(snapshot), same(mockBehavior), anyLong());
+ verify(mockSnapshotManager).persist(same(snapshot), anyLong());
}
@Test
long sequenceNumber = 100;
sendMessageToSupport(new SaveSnapshotSuccess(new SnapshotMetadata("foo", sequenceNumber, 1234L)));
- verify(mockSnapshotManager).commit(eq(sequenceNumber), same(mockBehavior));
+ verify(mockSnapshotManager).commit(eq(sequenceNumber));
}
@Test
sendMessageToSupport(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT);
- verify(mockSnapshotManager).commit(eq(-1L), same(mockBehavior));
+ verify(mockSnapshotManager).commit(eq(-1L));
}
@Test
new MockRaftActorContext.MockPayload("foo-4")));
leaderActor.getRaftActorContext().getSnapshotManager().persist(snapshotBytes.toByteArray(),
- leader, Runtime.getRuntime().totalMemory());
+ Runtime.getRuntime().totalMemory());
assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
// The commit is needed to complete the snapshot creation process
- leaderActor.getRaftActorContext().getSnapshotManager().commit(-1, leader);
+ leaderActor.getRaftActorContext().getSnapshotManager().commit(-1);
// capture snapshot reply should remove the snapshotted entries only
assertEquals(3, leaderActor.getReplicatedLog().size());
assertTrue(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
// The commit is needed to complete the snapshot creation process
- followerActor.getRaftActorContext().getSnapshotManager().commit(-1, follower);
+ followerActor.getRaftActorContext().getSnapshotManager().commit(-1);
// capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
@SuppressWarnings("unchecked")
@Test
public void testAppendAndPersistExpectingNoCapture() throws Exception {
- ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockBehavior);
+ ReplicatedLog log = ReplicatedLogImpl.newInstance(context);
MockReplicatedLogEntry logEntry = new MockReplicatedLogEntry(1, 1, new MockPayload("1"));
doReturn(1L).when(mockBehavior).getReplicatedToAllIndex();
- ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockBehavior);
+ ReplicatedLog log = ReplicatedLogImpl.newInstance(context);
MockReplicatedLogEntry logEntry1 = new MockReplicatedLogEntry(1, 2, new MockPayload("2"));
MockReplicatedLogEntry logEntry2 = new MockReplicatedLogEntry(1, 3, new MockPayload("3"));
}
});
- ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockBehavior);
+ ReplicatedLog log = ReplicatedLogImpl.newInstance(context);
int dataSize = 600;
MockReplicatedLogEntry logEntry = new MockReplicatedLogEntry(1, 2, new MockPayload("2", dataSize));
@Test
public void testRemoveFromAndPersist() throws Exception {
- ReplicatedLog log = ReplicatedLogImpl.newInstance(context, mockBehavior);
+ ReplicatedLog log = ReplicatedLogImpl.newInstance(context);
log.append(new MockReplicatedLogEntry(1, 0, new MockPayload("0")));
log.append(new MockReplicatedLogEntry(1, 1, new MockPayload("1")));
doReturn(mockReplicatedLog).when(mockRaftActorContext).getReplicatedLog();
doReturn("123").when(mockRaftActorContext).getId();
doReturn(mockDataPersistenceProvider).when(mockRaftActorContext).getPersistenceProvider();
+ doReturn(mockRaftActorBehavior).when(mockRaftActorContext).getCurrentBehavior();
doReturn("123").when(mockRaftActorBehavior).getLeaderId();
doReturn(mockElectionTerm).when(mockRaftActorContext).getTermInformation();
snapshotManager.capture(lastLogEntry, -1);
byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
- snapshotManager.persist(bytes, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(bytes, Runtime.getRuntime().totalMemory());
ArgumentCaptor<Snapshot> snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class);
verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture());
new MockRaftActorContext.MockPayload()), 9);
byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
- snapshotManager.persist(bytes, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(bytes, Runtime.getRuntime().totalMemory());
ArgumentCaptor<Snapshot> snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class);
verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture());
snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
new MockRaftActorContext.MockPayload()), -1);
- snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), replicatedToAllIndex);
- snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, 2000000L);
+ snapshotManager.persist(new byte[]{}, 2000000L);
verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
- snapshotManager.persist(bytes, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(bytes, Runtime.getRuntime().totalMemory());
assertEquals(true, snapshotManager.isCapturing());
@Test
public void testCallingPersistWithoutCaptureWillDoNothing(){
- snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
verify(mockDataPersistenceProvider, never()).saveSnapshot(any(Snapshot.class));
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
- snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
- snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
- snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
assertEquals(true, snapshotManager.isCapturing());
- snapshotManager.commit(100L, mockRaftActorBehavior);
+ snapshotManager.commit(100L);
assertEquals(false, snapshotManager.isCapturing());
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
- snapshotManager.commit(100L, mockRaftActorBehavior);
+ snapshotManager.commit(100L);
verify(mockReplicatedLog, never()).snapshotCommit();
@Test
public void testCommitBeforeCapture(){
- snapshotManager.commit(100L, mockRaftActorBehavior);
+ snapshotManager.commit(100L);
verify(mockReplicatedLog, never()).snapshotCommit();
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
- snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
- snapshotManager.commit(100L, mockRaftActorBehavior);
+ snapshotManager.commit(100L);
- snapshotManager.commit(100L, mockRaftActorBehavior);
+ snapshotManager.commit(100L);
verify(mockReplicatedLog, times(1)).snapshotCommit();
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
- snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
snapshotManager.rollback();
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
- snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
snapshotManager.rollback();
doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
doReturn(5L).when(replicatedLogEntry).getTerm();
- long retIndex = snapshotManager.trimLog(10, mockRaftActorBehavior);
+ long retIndex = snapshotManager.trimLog(10);
assertEquals("return index", 10L, retIndex);
verify(mockReplicatedLog).snapshotPreCommit(10, 5);
doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
doReturn(5L).when(replicatedLogEntry).getTerm();
- long retIndex = snapshotManager.trimLog(10, mockRaftActorBehavior);
+ long retIndex = snapshotManager.trimLog(10);
assertEquals("return index", -1L, retIndex);
verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong());
doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
doReturn(5L).when(replicatedLogEntry).getTerm();
- long retIndex = snapshotManager.trimLog(10, mockRaftActorBehavior);
+ long retIndex = snapshotManager.trimLog(10);
assertEquals("return index", -1L, retIndex);
verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong());
doReturn(false).when(mockReplicatedLog).isPresent(10);
- long retIndex = snapshotManager.trimLog(10, mockRaftActorBehavior);
+ long retIndex = snapshotManager.trimLog(10);
assertEquals("return index", -1L, retIndex);
verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong());
doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
doReturn(5L).when(replicatedLogEntry).getTerm();
- snapshotManager.trimLog(10, mockRaftActorBehavior);
+ snapshotManager.trimLog(10);
verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong());
verify(mockReplicatedLog, never()).snapshotCommit();
doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
doReturn(5L).when(replicatedLogEntry).getTerm();
- snapshotManager.trimLog(10, mockRaftActorBehavior);
+ snapshotManager.trimLog(10);
verify(mockReplicatedLog, never()).snapshotPreCommit(10, 5);
verify(mockReplicatedLog, never()).snapshotCommit();
import org.junit.Before;
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
-import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.TestActorFactory;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
assertEquals(name + " behavior state", expState, actor.behavior.state());
}
- void initializeLeaderBehavior(MemberActor actor, RaftActorContext context, int numActiveFollowers) throws Exception {
+ void initializeLeaderBehavior(MemberActor actor, MockRaftActorContext context, int numActiveFollowers) throws Exception {
// Leader sends immediate heartbeats - we don't care about it so ignore it.
actor.expectMessageClass(AppendEntriesReply.class, numActiveFollowers);
- @SuppressWarnings("resource")
Leader leader = new Leader(context);
+ context.setCurrentBehavior(leader);
+
actor.waitForExpectedMessages(AppendEntriesReply.class);
// Delay assignment here so the AppendEntriesReply isn't forwarded to the behavior.
actor.behavior = leader;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import scala.concurrent.duration.FiniteDuration;
-public abstract class AbstractLeaderTest extends AbstractRaftActorBehaviorTest{
+public abstract class AbstractLeaderTest<T extends AbstractLeader> extends AbstractRaftActorBehaviorTest<T> {
/**
* When we removed scheduling of heartbeat in the AbstractLeader constructor we ended up with a situation where
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.slf4j.LoggerFactory;
-public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
+public abstract class AbstractRaftActorBehaviorTest<T extends RaftActorBehavior> extends AbstractActorTest {
protected final TestActorFactory actorFactory = new TestActorFactory(getSystem());
*/
@Test
public void testHandleRaftRPCWithNewerTerm() throws Exception {
- RaftActorContext actorContext = createActorContext();
+ MockRaftActorContext actorContext = createActorContext();
assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, behaviorActor,
createAppendEntriesWithNewerTerm());
*/
@Test
public void testHandleRequestVoteWhenSenderTermLessThanCurrentTerm() {
- RaftActorContext context = createActorContext();
+ MockRaftActorContext context = createActorContext();
context.getTermInformation().update(1000, null);
}
- protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
+ protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
ActorRef actorRef, RaftRPC rpc) throws Exception {
Payload p = new MockRaftActorContext.MockPayload("");
- setLastLogEntry((MockRaftActorContext) actorContext, 1, 0, p);
+ setLastLogEntry(actorContext, 1, 0, p);
actorContext.getTermInformation().update(1, "test");
RaftActorBehavior origBehavior = createBehavior(actorContext);
return log;
}
- protected abstract RaftActorBehavior createBehavior(
- RaftActorContext actorContext);
+ protected abstract T createBehavior(RaftActorContext actorContext);
+
+ protected final T createBehavior(MockRaftActorContext actorContext) {
+ T ret = createBehavior((RaftActorContext)actorContext);
+ actorContext.setCurrentBehavior(ret);
+ return ret;
+ }
protected RaftActorBehavior createBehavior() {
return createBehavior(createActorContext());
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class CandidateTest extends AbstractRaftActorBehaviorTest {
+public class CandidateTest extends AbstractRaftActorBehaviorTest<Candidate> {
static final Logger LOG = LoggerFactory.getLogger(CandidateTest.class);
private final TestActorRef<MessageCollectorActor> candidateActor = actorFactory.createTestActor(
}
@Override
- protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
+ protected Candidate createBehavior(final RaftActorContext actorContext) {
return new Candidate(actorContext);
}
return new MockRaftActorContext("candidate", getSystem(), candidateActor);
}
- private Map<String, String> setupPeers(int count) {
+ private Map<String, String> setupPeers(final int count) {
Map<String, String> peerMap = new HashMap<>();
peerActors = new TestActorRef[count];
for(int i = 0; i < count; i++) {
}
@Override
- protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
- ActorRef actorRef, RaftRPC rpc) throws Exception {
+ protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
+ final ActorRef actorRef, final RaftRPC rpc) throws Exception {
super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
}
member2Context.setConfigParams(member2ConfigParams);
member2Actor.behavior = new Follower(member2Context);
+ member2Context.setCurrentBehavior(member2Actor.behavior);
// Create member 3's behavior initially as Follower
member3Context.setConfigParams(member3ConfigParams);
member3Actor.behavior = new Follower(member3Context);
+ member3Context.setCurrentBehavior(member3Actor.behavior);
// Create member 1's behavior initially as Leader
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import scala.concurrent.duration.FiniteDuration;
-public class FollowerTest extends AbstractRaftActorBehaviorTest {
+public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
- RaftActorContext context = createActorContext();
+ MockRaftActorContext context = createActorContext();
long term = 1000;
context.getTermInformation().update(term, null);
public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
- RaftActorContext context = createActorContext();
+ MockRaftActorContext context = createActorContext();
long term = 1000;
context.getTermInformation().update(term, "test");
}
@Override
- protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
+ protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
ActorRef actorRef, RaftRPC rpc) throws Exception {
super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
}
@Override
- protected void handleAppendEntriesAddSameEntryToLogReply(TestActorRef<MessageCollectorActor> replyActor)
+ protected void handleAppendEntriesAddSameEntryToLogReply(final TestActorRef<MessageCollectorActor> replyActor)
throws Exception {
AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
assertEquals("isSuccess", true, reply.isSuccess());
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
-public class IsolatedLeaderTest extends AbstractLeaderTest {
+public class IsolatedLeaderTest extends AbstractLeaderTest<IsolatedLeader> {
private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
}
@Override
- protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
+ protected IsolatedLeader createBehavior(RaftActorContext actorContext) {
return new IsolatedLeader(actorContext);
}
leaderActorContext.setPeerAddresses(peerAddresses);
isolatedLeader = new IsolatedLeader(leaderActorContext);
+ leaderActorContext.setCurrentBehavior(isolatedLeader);
assertEquals("Raft state", RaftState.IsolatedLeader, isolatedLeader.state());
// in a 3 node cluster, even if 1 follower is returns a reply, the isolatedLeader is not isolated
leaderActorContext.setPeerAddresses(peerAddresses);
isolatedLeader = new IsolatedLeader(leaderActorContext);
+ leaderActorContext.setCurrentBehavior(isolatedLeader);
assertEquals("Raft state", RaftState.IsolatedLeader, isolatedLeader.state());
// in a 5 member cluster, atleast 2 followers need to be active and return a reply
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import scala.concurrent.duration.FiniteDuration;
-public class LeaderTest extends AbstractLeaderTest {
+public class LeaderTest extends AbstractLeaderTest<Leader> {
static final String FOLLOWER_ID = "follower";
public static final String LEADER_ID = "leader";
actorContext.getTermInformation().update(term, "");
leader = new Leader(actorContext);
+ actorContext.setCurrentBehavior(leader);
// Leader should send an immediate heartbeat with no entries as follower is inactive.
long lastIndex = actorContext.getReplicatedLog().lastIndex();
actorContext.getTermInformation().update(newTerm, "");
leader = new Leader(actorContext);
+ actorContext.setCurrentBehavior(leader);
// Leader will send an immediate heartbeat - ignore it.
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
actorContext.setCommitIndex(commitIndex);
leader = new Leader(actorContext);
+ actorContext.setCurrentBehavior(leader);
leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
leader.getFollower(FOLLOWER_ID).setNextIndex(0);
actorContext.setCommitIndex(commitIndex);
leader = new Leader(actorContext);
+ actorContext.setCurrentBehavior(leader);
leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
leader.getFollower(FOLLOWER_ID).setNextIndex(0);
assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
}
- @Override protected RaftActorBehavior createBehavior(
- RaftActorContext actorContext) {
+ @Override
+ protected Leader createBehavior(final RaftActorContext actorContext) {
return new Leader(actorContext);
}
Follower follower = new Follower(followerActorContext);
followerActor.underlyingActor().setBehavior(follower);
+ followerActorContext.setCurrentBehavior(follower);
Map<String, String> peerAddresses = new HashMap<>();
peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
followerActorContext.setCommitIndex(1);
leader = new Leader(leaderActorContext);
+ leaderActorContext.setCurrentBehavior(leader);
AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
Follower follower = new Follower(followerActorContext);
followerActor.underlyingActor().setBehavior(follower);
+ followerActorContext.setCurrentBehavior(follower);
Map<String, String> leaderPeerAddresses = new HashMap<>();
leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
Follower follower = new Follower(followerActorContext);
followerActor.underlyingActor().setBehavior(follower);
+ followerActorContext.setCurrentBehavior(follower);
leader = new Leader(leaderActorContext);
assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
leaderActor.underlyingActor().setBehavior(leader);
+ leaderActorContext.setCurrentBehavior(leader);
leader.handleMessage(followerActor, appendEntriesReply);
Follower follower = new Follower(followerActorContext);
followerActor.underlyingActor().setBehavior(follower);
+ followerActorContext.setCurrentBehavior(follower);
leader = new Leader(leaderActorContext);
assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
leaderActor.underlyingActor().setBehavior(leader);
+ leaderActorContext.setCurrentBehavior(leader);
leader.handleMessage(followerActor, appendEntriesReply);
Follower follower = new Follower(followerActorContext);
followerActor.underlyingActor().setBehavior(follower);
+ followerActorContext.setCurrentBehavior(follower);
leader = new Leader(leaderActorContext);
assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
leaderActor.underlyingActor().setBehavior(leader);
+ leaderActorContext.setCurrentBehavior(leader);
leader.handleMessage(followerActor, appendEntriesReply);
leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING);
leader = new Leader(leaderActorContext);
+ leaderActorContext.setCurrentBehavior(leader);
// Ignore initial heartbeats
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
leader = new Leader(leaderActorContext);
+ leaderActorContext.setCurrentBehavior(leader);
// Initial heartbeat
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
leader = new Leader(leaderActorContext);
+ leaderActorContext.setCurrentBehavior(leader);
// Initial heartbeat
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
new FiniteDuration(200, TimeUnit.MILLISECONDS));
leader = new Leader(leaderActorContext);
+ leaderActorContext.setCurrentBehavior(leader);
// Initial heartbeat
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
leader = new Leader(leaderActorContext);
+ leaderActorContext.setCurrentBehavior(leader);
// Initial heartbeat
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
}
@Override
- protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
+ protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
ActorRef actorRef, RaftRPC rpc) throws Exception {
super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
Candidate member3Behavior = new Candidate(member3Context);
member3Actor.behavior = member3Behavior;
+ member3Context.setCurrentBehavior(member3Behavior);
// Send several additional ElectionTimeouts to Candidate member 3. Each ElectionTimeout will
// start a new term so Candidate member 3's current term will be greater than the leader's
member2Context.setConfigParams(member2ConfigParams);
member2Actor.behavior = new Follower(member2Context);
+ member2Context.setCurrentBehavior(member2Actor.behavior);
// Create member 1's behavior as Leader.
member2Context.setConfigParams(member2ConfigParams);
member2Actor.behavior = new Follower(member2Context);
+ member2Context.setCurrentBehavior(member2Actor.behavior);
// Create member 3's behavior initially as Follower
member3Context.setConfigParams(member3ConfigParams);
member3Actor.behavior = new Follower(member3Context);
+ member3Context.setCurrentBehavior(member3Actor.behavior);
// Create member 1's behavior initially as Leader