From 2b0c99463883b10d5eacdec901d7543d5815a54f Mon Sep 17 00:00:00 2001 From: Kamal Rameshan Date: Tue, 4 Nov 2014 16:50:26 -0800 Subject: [PATCH] Bug-2301 - Clustering:Snapshots need not be stored in in-mem ReplicatedLog for Installing snapshots 1. Snapshots, once persisted to disk, would not be stored as part of ReplicatedLog in memory 2. An Install snapshot would initiate a snapshot capture and be sent to the follower 3. FollowerLogInformation in Leader has a stopwatch which helps to find if a given follower is active/up. Rebased Change-Id: If7aac2518a0f624a0cc121112ce165456d002a18 Signed-off-by: Kamal Rameshan --- .../cluster/example/ExampleActor.java | 12 +- .../example/ExampleConfigParamsImpl.java | 2 +- .../raft/AbstractReplicatedLogImpl.java | 30 +- .../cluster/raft/FollowerLogInformation.java | 11 + .../raft/FollowerLogInformationImpl.java | 25 +- .../controller/cluster/raft/RaftActor.java | 23 +- .../cluster/raft/ReplicatedLog.java | 19 +- .../raft/base/messages/CaptureSnapshot.java | 11 + .../messages/InitiateInstallSnapshot.java | 16 + .../base/messages/SendInstallSnapshot.java | 13 +- .../cluster/raft/behaviors/Leader.java | 250 +++++--- .../raft/FollowerLogInformationImplTest.java | 66 +++ .../cluster/raft/RaftActorTest.java | 44 +- .../cluster/raft/behaviors/LeaderTest.java | 543 +++++++++++------- .../src/test/resources/application.conf | 2 +- 15 files changed, 718 insertions(+), 349 deletions(-) create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/InitiateInstallSnapshot.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java index 06538fd2ae..03d24fad02 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -20,7 +20,9 @@ import org.opendaylight.controller.cluster.example.messages.PrintRole; import org.opendaylight.controller.cluster.example.messages.PrintState; import org.opendaylight.controller.cluster.raft.ConfigParams; import org.opendaylight.controller.cluster.raft.RaftActor; +import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; +import org.opendaylight.controller.cluster.raft.behaviors.Leader; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import java.io.ByteArrayInputStream; @@ -77,7 +79,15 @@ public class ExampleActor extends RaftActor { } else if (message instanceof PrintRole) { if(LOG.isDebugEnabled()) { - LOG.debug("{} = {}, Peers={}", getId(), getRaftState(), getPeers()); + String followers = ""; + if (getRaftState() == RaftState.Leader) { + followers = ((Leader)this.getCurrentBehavior()).printFollowerStates(); + LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(), getPeers(), followers); + } else { + LOG.debug("{} = {}, Peers={}", getId(), getRaftState(), getPeers()); + } + + } } else { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleConfigParamsImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleConfigParamsImpl.java index 6192cad230..2faae48838 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleConfigParamsImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleConfigParamsImpl.java @@ -15,7 +15,7 @@ import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; public class ExampleConfigParamsImpl extends DefaultConfigParamsImpl { @Override public long getSnapshotBatchCount() { - return 50; + return 25; } @Override diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java index 2be4a0c36f..a2c9d660ad 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java @@ -7,8 +7,6 @@ */ package org.opendaylight.controller.cluster.raft; -import com.google.protobuf.ByteString; - import java.util.ArrayList; import java.util.List; @@ -20,27 +18,23 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { // We define this as ArrayList so we can use ensureCapacity. protected ArrayList journal; - protected ByteString snapshot; + protected long snapshotIndex = -1; protected long snapshotTerm = -1; // to be used for rollback during save snapshot failure protected ArrayList snapshottedJournal; - protected ByteString previousSnapshot; protected long previousSnapshotIndex = -1; protected long previousSnapshotTerm = -1; - public AbstractReplicatedLogImpl(ByteString state, long snapshotIndex, + public AbstractReplicatedLogImpl(long snapshotIndex, long snapshotTerm, List unAppliedEntries) { - this.snapshot = state; this.snapshotIndex = snapshotIndex; this.snapshotTerm = snapshotTerm; this.journal = new ArrayList<>(unAppliedEntries); } - public AbstractReplicatedLogImpl() { - this.snapshot = null; this.journal = new ArrayList<>(); } @@ -154,11 +148,6 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { return logEntryIndex <= snapshotIndex && snapshotIndex != -1; } - @Override - public ByteString getSnapshot() { - return snapshot; - } - @Override public long getSnapshotIndex() { return snapshotIndex; @@ -185,18 +174,13 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { this.snapshotTerm = snapshotTerm; } - @Override - public void setSnapshot(ByteString snapshot) { - this.snapshot = snapshot; - } - @Override public void clear(int startIndex, int endIndex) { journal.subList(startIndex, endIndex).clear(); } @Override - public void snapshotPreCommit(ByteString snapshot, long snapshotCapturedIndex, long snapshotCapturedTerm) { + public void snapshotPreCommit(long snapshotCapturedIndex, long snapshotCapturedTerm) { snapshottedJournal = new ArrayList<>(journal.size()); snapshottedJournal.addAll(journal.subList(0, (int)(snapshotCapturedIndex - snapshotIndex))); @@ -207,9 +191,6 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { previousSnapshotTerm = snapshotTerm; setSnapshotTerm(snapshotCapturedTerm); - - previousSnapshot = getSnapshot(); - setSnapshot(snapshot); } @Override @@ -217,7 +198,6 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { snapshottedJournal = null; previousSnapshotIndex = -1; previousSnapshotTerm = -1; - previousSnapshot = null; } @Override @@ -231,9 +211,5 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { snapshotTerm = previousSnapshotTerm; previousSnapshotTerm = -1; - - snapshot = previousSnapshot; - previousSnapshot = null; - } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java index f3de983538..2c4304d404 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java @@ -61,5 +61,16 @@ public interface FollowerLogInformation { */ public AtomicLong getMatchIndex(); + /** + * Checks if the follower is active by comparing the last updated with the duration + * @return boolean + */ + public boolean isFollowerActive(); + + /** + * restarts the timeout clock of the follower + */ + public void markFollowerActive(); + } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java index 94f9a53a85..c0cfd7e862 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java @@ -8,6 +8,10 @@ package org.opendaylight.controller.cluster.raft; +import com.google.common.base.Stopwatch; +import scala.concurrent.duration.FiniteDuration; + +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; public class FollowerLogInformationImpl implements FollowerLogInformation{ @@ -18,11 +22,17 @@ public class FollowerLogInformationImpl implements FollowerLogInformation{ private final AtomicLong matchIndex; + private final Stopwatch stopwatch; + + private final long followerTimeoutMillis; + public FollowerLogInformationImpl(String id, AtomicLong nextIndex, - AtomicLong matchIndex) { + AtomicLong matchIndex, FiniteDuration followerTimeoutDuration) { this.id = id; this.nextIndex = nextIndex; this.matchIndex = matchIndex; + this.stopwatch = new Stopwatch(); + this.followerTimeoutMillis = followerTimeoutDuration.toMillis(); } public long incrNextIndex(){ @@ -57,4 +67,17 @@ public class FollowerLogInformationImpl implements FollowerLogInformation{ return matchIndex; } + @Override + public boolean isFollowerActive() { + long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); + return (stopwatch.isRunning()) && (elapsed <= followerTimeoutMillis); + } + + @Override + public void markFollowerActive() { + if (stopwatch.isRunning()) { + stopwatch.reset(); + } + stopwatch.start(); + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 2459c2ff8b..f02b52beb9 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -18,6 +18,7 @@ import akka.persistence.SaveSnapshotFailure; import akka.persistence.SaveSnapshotSuccess; import akka.persistence.SnapshotOffer; import akka.persistence.SnapshotSelectionCriteria; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Stopwatch; import com.google.protobuf.ByteString; @@ -30,6 +31,8 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; +import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; +import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior; import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer; @@ -388,6 +391,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } public java.util.Set getPeers() { + return context.getPeerAddresses().keySet(); } @@ -636,7 +640,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { //be greedy and remove entries from in-mem journal which are in the snapshot // and update snapshotIndex and snapshotTerm without waiting for the success, - context.getReplicatedLog().snapshotPreCommit(stateInBytes, + context.getReplicatedLog().snapshotPreCommit( captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm()); @@ -644,16 +648,19 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { "and term:{}", captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm()); + if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) { + // this would be call straight to the leader and won't initiate in serialization + currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot(stateInBytes)); + } + captureSnapshot = null; hasSnapshotCaptureInitiated = false; } - private class ReplicatedLogImpl extends AbstractReplicatedLogImpl { public ReplicatedLogImpl(Snapshot snapshot) { - super(ByteString.copyFrom(snapshot.getState()), - snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(), + super(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(), snapshot.getUnAppliedEntries()); } @@ -843,4 +850,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } } + @VisibleForTesting + void setCurrentBehavior(AbstractRaftActorBehavior behavior) { + currentBehavior = behavior; + } + + protected RaftActorBehavior getCurrentBehavior() { + return currentBehavior; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java index 85893333c2..7ee85322a6 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java @@ -8,8 +8,6 @@ package org.opendaylight.controller.cluster.raft; -import com.google.protobuf.ByteString; - import java.util.List; /** @@ -122,13 +120,6 @@ public interface ReplicatedLog { */ boolean isInSnapshot(long index); - /** - * Get the snapshot - * - * @return an object representing the snapshot if it exists. null otherwise - */ - ByteString getSnapshot(); - /** * Get the index of the snapshot * @@ -156,12 +147,6 @@ public interface ReplicatedLog { */ public void setSnapshotTerm(long snapshotTerm); - /** - * sets the snapshot in bytes - * @param snapshot - */ - public void setSnapshot(ByteString snapshot); - /** * Clears the journal entries with startIndex(inclusive) and endIndex (exclusive) * @param startIndex @@ -172,12 +157,10 @@ public interface ReplicatedLog { /** * Handles all the bookkeeping in order to perform a rollback in the * event of SaveSnapshotFailure - * @param snapshot * @param snapshotCapturedIndex * @param snapshotCapturedTerm */ - public void snapshotPreCommit(ByteString snapshot, - long snapshotCapturedIndex, long snapshotCapturedTerm); + public void snapshotPreCommit(long snapshotCapturedIndex, long snapshotCapturedTerm); /** * Sets the Replicated log to state after snapshot success. diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java index bb86e1a37d..d4dd3350f3 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java @@ -13,13 +13,20 @@ public class CaptureSnapshot { private long lastAppliedTerm; private long lastIndex; private long lastTerm; + private boolean installSnapshotInitiated; public CaptureSnapshot(long lastIndex, long lastTerm, long lastAppliedIndex, long lastAppliedTerm) { + this(lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, false); + } + + public CaptureSnapshot(long lastIndex, long lastTerm,long lastAppliedIndex, + long lastAppliedTerm, boolean installSnapshotInitiated) { this.lastIndex = lastIndex; this.lastTerm = lastTerm; this.lastAppliedIndex = lastAppliedIndex; this.lastAppliedTerm = lastAppliedTerm; + this.installSnapshotInitiated = installSnapshotInitiated; } public long getLastAppliedIndex() { @@ -37,4 +44,8 @@ public class CaptureSnapshot { public long getLastTerm() { return lastTerm; } + + public boolean isInstallSnapshotInitiated() { + return installSnapshotInitiated; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/InitiateInstallSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/InitiateInstallSnapshot.java new file mode 100644 index 0000000000..7844914873 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/InitiateInstallSnapshot.java @@ -0,0 +1,16 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.base.messages; + +/** + * Internal message by Leader to initiate an install snapshot + */ +public class InitiateInstallSnapshot { +} + diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SendInstallSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SendInstallSnapshot.java index 6c3313f316..83c85d9135 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SendInstallSnapshot.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SendInstallSnapshot.java @@ -8,7 +8,16 @@ package org.opendaylight.controller.cluster.raft.base.messages; -import java.io.Serializable; +import com.google.protobuf.ByteString; -public class SendInstallSnapshot implements Serializable { +public class SendInstallSnapshot { + private ByteString snapshot; + + public SendInstallSnapshot(ByteString snapshot) { + this.snapshot = snapshot; + } + + public ByteString getSnapshot() { + return snapshot; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index de748675a7..ef104e7f58 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -11,6 +11,8 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Cancellable; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; @@ -20,6 +22,8 @@ import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; @@ -66,8 +70,7 @@ import java.util.concurrent.atomic.AtomicLong; public class Leader extends AbstractRaftActorBehavior { - protected final Map followerToLog = - new HashMap(); + protected final Map followerToLog = new HashMap(); protected final Map mapFollowerToSnapshot = new HashMap<>(); private final Set followers; @@ -79,6 +82,8 @@ public class Leader extends AbstractRaftActorBehavior { private final int minReplicationCount; + private Optional snapshot; + public Leader(RaftActorContext context) { super(context); @@ -88,7 +93,8 @@ public class Leader extends AbstractRaftActorBehavior { FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(followerId, new AtomicLong(context.getCommitIndex()), - new AtomicLong(-1)); + new AtomicLong(-1), + context.getConfigParams().getElectionTimeOutInterval()); followerToLog.put(followerId, followerLogInformation); } @@ -103,6 +109,7 @@ public class Leader extends AbstractRaftActorBehavior { minReplicationCount = 0; } + snapshot = Optional.absent(); // Immediately schedule a heartbeat // Upon election: send initial empty AppendEntries RPCs @@ -117,6 +124,15 @@ public class Leader extends AbstractRaftActorBehavior { } + private Optional getSnapshot() { + return snapshot; + } + + @VisibleForTesting + void setSnapshot(Optional snapshot) { + this.snapshot = snapshot; + } + @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries) { @@ -146,6 +162,8 @@ public class Leader extends AbstractRaftActorBehavior { return this; } + followerLogInformation.markFollowerActive(); + if (appendEntriesReply.isSuccess()) { followerLogInformation .setMatchIndex(appendEntriesReply.getLogLastIndex()); @@ -246,10 +264,18 @@ public class Leader extends AbstractRaftActorBehavior { if (message instanceof SendHeartBeat) { sendHeartBeat(); return this; - } else if(message instanceof SendInstallSnapshot) { + + } else if(message instanceof InitiateInstallSnapshot) { installSnapshotIfNeeded(); + + } else if(message instanceof SendInstallSnapshot) { + // received from RaftActor + setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot())); + sendInstallSnapshot(); + } else if (message instanceof Replicate) { replicate((Replicate) message); + } else if (message instanceof InstallSnapshotReply){ handleInstallSnapshotReply( (InstallSnapshotReply) message); @@ -263,8 +289,9 @@ public class Leader extends AbstractRaftActorBehavior { private void handleInstallSnapshotReply(InstallSnapshotReply reply) { String followerId = reply.getFollowerId(); - FollowerToSnapshot followerToSnapshot = - mapFollowerToSnapshot.get(followerId); + FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); + FollowerLogInformation followerLogInformation = followerToLog.get(followerId); + followerLogInformation.markFollowerActive(); if (followerToSnapshot != null && followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) { @@ -280,8 +307,6 @@ public class Leader extends AbstractRaftActorBehavior { ); } - FollowerLogInformation followerLogInformation = - followerToLog.get(followerId); followerLogInformation.setMatchIndex( context.getReplicatedLog().getSnapshotIndex()); followerLogInformation.setNextIndex( @@ -293,6 +318,12 @@ public class Leader extends AbstractRaftActorBehavior { followerToLog.get(followerId).getNextIndex().get()); } + if (mapFollowerToSnapshot.isEmpty()) { + // once there are no pending followers receiving snapshots + // we can remove snapshot from the memory + setSnapshot(Optional.absent()); + } + } else { followerToSnapshot.markSendStatus(true); } @@ -344,64 +375,87 @@ public class Leader extends AbstractRaftActorBehavior { if (followerActor != null) { FollowerLogInformation followerLogInformation = followerToLog.get(followerId); long followerNextIndex = followerLogInformation.getNextIndex().get(); - List entries = Collections.emptyList(); + boolean isFollowerActive = followerLogInformation.isFollowerActive(); + List entries = null; if (mapFollowerToSnapshot.get(followerId) != null) { - if (mapFollowerToSnapshot.get(followerId).canSendNextChunk()) { + // if install snapshot is in process , then sent next chunk if possible + if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) { sendSnapshotChunk(followerActor, followerId); + } else { + // we send a heartbeat even if we have not received a reply for the last chunk + sendAppendEntriesToFollower(followerActor, followerNextIndex, + Collections.emptyList()); } } else { + long leaderLastIndex = context.getReplicatedLog().lastIndex(); + long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex(); - if (context.getReplicatedLog().isPresent(followerNextIndex)) { + if (isFollowerActive && + context.getReplicatedLog().isPresent(followerNextIndex)) { // FIXME : Sending one entry at a time entries = context.getReplicatedLog().getFrom(followerNextIndex, 1); - followerActor.tell( - new AppendEntries(currentTerm(), context.getId(), - prevLogIndex(followerNextIndex), - prevLogTerm(followerNextIndex), entries, - context.getCommitIndex()).toSerializable(), - actor() - ); - - } else { - // if the followers next index is not present in the leaders log, then snapshot should be sent - long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex(); - long leaderLastIndex = context.getReplicatedLog().lastIndex(); - if (followerNextIndex >= 0 && leaderLastIndex >= followerNextIndex ) { - // if the follower is just not starting and leader's index - // is more than followers index - if(LOG.isDebugEnabled()) { - LOG.debug("SendInstallSnapshot to follower:{}," + - "follower-nextIndex:{}, leader-snapshot-index:{}, " + - "leader-last-index:{}", followerId, - followerNextIndex, leaderSnapShotIndex, leaderLastIndex - ); - } - - actor().tell(new SendInstallSnapshot(), actor()); - } else { - followerActor.tell( - new AppendEntries(currentTerm(), context.getId(), - prevLogIndex(followerNextIndex), - prevLogTerm(followerNextIndex), entries, - context.getCommitIndex()).toSerializable(), - actor() + } else if (isFollowerActive && followerNextIndex >= 0 && + leaderLastIndex >= followerNextIndex ) { + // if the followers next index is not present in the leaders log, and + // if the follower is just not starting and if leader's index is more than followers index + // then snapshot should be sent + + if(LOG.isDebugEnabled()) { + LOG.debug("InitiateInstallSnapshot to follower:{}," + + "follower-nextIndex:{}, leader-snapshot-index:{}, " + + "leader-last-index:{}", followerId, + followerNextIndex, leaderSnapShotIndex, leaderLastIndex ); } + actor().tell(new InitiateInstallSnapshot(), actor()); + + // we would want to sent AE as the capture snapshot might take time + entries = Collections.emptyList(); + + } else { + //we send an AppendEntries, even if the follower is inactive + // in-order to update the followers timestamp, in case it becomes active again + entries = Collections.emptyList(); } + + sendAppendEntriesToFollower(followerActor, followerNextIndex, entries); + } } } } + private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex, + List entries) { + followerActor.tell( + new AppendEntries(currentTerm(), context.getId(), + prevLogIndex(followerNextIndex), + prevLogTerm(followerNextIndex), entries, + context.getCommitIndex()).toSerializable(), + actor() + ); + } + /** * An installSnapshot is scheduled at a interval that is a multiple of * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing * snapshots at every heartbeat. + * + * Install Snapshot works as follows + * 1. Leader sends a InitiateInstallSnapshot message to self + * 2. Leader then initiates the capture snapshot by sending a CaptureSnapshot message to actor + * 3. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log + * and makes a call to Leader's handleMessage , with SendInstallSnapshot message. + * 4. Leader , picks the snapshot from im-mem ReplicatedLog and sends it in chunks to the Follower + * 5. On complete, Follower sends back a InstallSnapshotReply. + * 6. On receipt of the InstallSnapshotReply for the last chunk, Leader marks the install complete for that follower + * and replenishes the memory by deleting the snapshot in Replicated log. + * */ - private void installSnapshotIfNeeded(){ + private void installSnapshotIfNeeded() { for (String followerId : followers) { ActorSelection followerActor = context.getPeerActorSelection(followerId); @@ -412,6 +466,58 @@ public class Leader extends AbstractRaftActorBehavior { long nextIndex = followerLogInformation.getNextIndex().get(); + if (!context.getReplicatedLog().isPresent(nextIndex) && + context.getReplicatedLog().isInSnapshot(nextIndex)) { + LOG.info("{} follower needs a snapshot install", followerId); + if (snapshot.isPresent()) { + // if a snapshot is present in the memory, most likely another install is in progress + // no need to capture snapshot + sendSnapshotChunk(followerActor, followerId); + + } else { + initiateCaptureSnapshot(); + //we just need 1 follower who would need snapshot to be installed. + // when we have the snapshot captured, we would again check (in SendInstallSnapshot) + // who needs an install and send to all who need + break; + } + + } + } + } + } + + // on every install snapshot, we try to capture the snapshot. + // Once a capture is going on, another one issued will get ignored by RaftActor. + private void initiateCaptureSnapshot() { + LOG.info("Initiating Snapshot Capture to Install Snapshot, Leader:{}", getLeaderId()); + ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied()); + long lastAppliedIndex = -1; + long lastAppliedTerm = -1; + + if (lastAppliedEntry != null) { + lastAppliedIndex = lastAppliedEntry.getIndex(); + lastAppliedTerm = lastAppliedEntry.getTerm(); + } else if (context.getReplicatedLog().getSnapshotIndex() > -1) { + lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex(); + lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm(); + } + + boolean isInstallSnapshotInitiated = true; + actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(), + lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated), + actor()); + } + + + private void sendInstallSnapshot() { + for (String followerId : followers) { + ActorSelection followerActor = context.getPeerActorSelection(followerId); + + if(followerActor != null) { + FollowerLogInformation followerLogInformation = followerToLog.get(followerId); + long nextIndex = followerLogInformation.getNextIndex().get(); + if (!context.getReplicatedLog().isPresent(nextIndex) && context.getReplicatedLog().isInSnapshot(nextIndex)) { sendSnapshotChunk(followerActor, followerId); @@ -426,20 +532,21 @@ public class Leader extends AbstractRaftActorBehavior { */ private void sendSnapshotChunk(ActorSelection followerActor, String followerId) { try { - followerActor.tell( - new InstallSnapshot(currentTerm(), context.getId(), - context.getReplicatedLog().getSnapshotIndex(), - context.getReplicatedLog().getSnapshotTerm(), - getNextSnapshotChunk(followerId, - context.getReplicatedLog().getSnapshot()), - mapFollowerToSnapshot.get(followerId).incrementChunkIndex(), - mapFollowerToSnapshot.get(followerId).getTotalChunks() - ).toSerializable(), - actor() - ); - LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}", - followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(), - mapFollowerToSnapshot.get(followerId).getTotalChunks()); + if (snapshot.isPresent()) { + followerActor.tell( + new InstallSnapshot(currentTerm(), context.getId(), + context.getReplicatedLog().getSnapshotIndex(), + context.getReplicatedLog().getSnapshotTerm(), + getNextSnapshotChunk(followerId,snapshot.get()), + mapFollowerToSnapshot.get(followerId).incrementChunkIndex(), + mapFollowerToSnapshot.get(followerId).getTotalChunks() + ).toSerializable(), + actor() + ); + LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}", + followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(), + mapFollowerToSnapshot.get(followerId).getTotalChunks()); + } } catch (IOException e) { LOG.error(e, "InstallSnapshot failed for Leader."); } @@ -456,10 +563,9 @@ public class Leader extends AbstractRaftActorBehavior { mapFollowerToSnapshot.put(followerId, followerToSnapshot); } ByteString nextChunk = followerToSnapshot.getNextChunk(); - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("Leader's snapshot nextChunk size:{}", nextChunk.size()); } - return nextChunk; } @@ -495,14 +601,11 @@ public class Leader extends AbstractRaftActorBehavior { // Scheduling the heartbeat only once here because heartbeats do not // need to be sent if there are other messages being sent to the remote // actor. - heartbeatSchedule = - context.getActorSystem().scheduler().scheduleOnce( - interval, - context.getActor(), new SendHeartBeat(), - context.getActorSystem().dispatcher(), context.getActor()); + heartbeatSchedule = context.getActorSystem().scheduler().scheduleOnce( + interval, context.getActor(), new SendHeartBeat(), + context.getActorSystem().dispatcher(), context.getActor()); } - private void scheduleInstallSnapshotCheck(FiniteDuration interval) { if(followers.size() == 0){ // Optimization - do not bother scheduling a heartbeat as there are @@ -517,7 +620,7 @@ public class Leader extends AbstractRaftActorBehavior { installSnapshotSchedule = context.getActorSystem().scheduler().scheduleOnce( interval, - context.getActor(), new SendInstallSnapshot(), + context.getActor(), new InitiateInstallSnapshot(), context.getActorSystem().dispatcher(), context.getActor()); } @@ -628,4 +731,19 @@ public class Leader extends AbstractRaftActorBehavior { } } + // called from example-actor for printing the follower-states + public String printFollowerStates() { + StringBuilder sb = new StringBuilder(); + for(FollowerLogInformation followerLogInformation : followerToLog.values()) { + boolean isFollowerActive = followerLogInformation.isFollowerActive(); + sb.append("{"+followerLogInformation.getId() + " state:" + isFollowerActive + "},"); + + } + return "[" + sb.toString() + "]"; + } + + @VisibleForTesting + void markFollowerActive(String followerId) { + followerToLog.get(followerId).markFollowerActive(); + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java new file mode 100644 index 0000000000..7df9f3713f --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft; + + +import com.google.common.base.Stopwatch; +import com.google.common.util.concurrent.Uninterruptibles; +import org.junit.Test; +import scala.concurrent.duration.FiniteDuration; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class FollowerLogInformationImplTest { + + @Test + public void testIsFollowerActive() { + + FiniteDuration timeoutDuration = + new FiniteDuration(500, TimeUnit.MILLISECONDS); + + FollowerLogInformation followerLogInformation = + new FollowerLogInformationImpl( + "follower1", new AtomicLong(10), new AtomicLong(9), timeoutDuration); + + + + assertFalse("Follower should be termed inactive before stopwatch starts", + followerLogInformation.isFollowerActive()); + + followerLogInformation.markFollowerActive(); + if (sleepWithElaspsedTimeReturned(200) > 200) { + return; + } + assertTrue("Follower should be active", followerLogInformation.isFollowerActive()); + + if (sleepWithElaspsedTimeReturned(400) > 400) { + return; + } + assertFalse("Follower should be inactive after time lapsed", + followerLogInformation.isFollowerActive()); + + followerLogInformation.markFollowerActive(); + assertTrue("Follower should be active from inactive", + followerLogInformation.isFollowerActive()); + } + + // we cannot rely comfortably that the sleep will indeed sleep for the desired time + // hence getting the actual elapsed time and do a match. + // if the sleep has spilled over, then return the test gracefully + private long sleepWithElaspsedTimeReturned(long millis) { + Stopwatch stopwatch = new Stopwatch(); + stopwatch.start(); + Uninterruptibles.sleepUninterruptibly(millis, TimeUnit.MILLISECONDS); + stopwatch.stop(); + return stopwatch.elapsed(TimeUnit.MILLISECONDS); + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index ca864eb426..87e40f236c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -31,6 +31,8 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; +import org.opendaylight.controller.cluster.raft.behaviors.Follower; +import org.opendaylight.controller.cluster.raft.behaviors.Leader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; @@ -557,7 +559,6 @@ public class RaftActorTest extends AbstractActorTest { config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); - DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, @@ -600,7 +601,6 @@ public class RaftActorTest extends AbstractActorTest { verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class)); - mockActorRef.tell(PoisonPill.getInstance(), getRef()); } @@ -646,8 +646,9 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); - TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, - Collections.EMPTY_MAP, Optional.of(config), dataPersistenceProvider), persistenceId); + TestActorRef mockActorRef = TestActorRef.create(getSystem(), + MockRaftActor.props(persistenceId,Collections.EMPTY_MAP, + Optional.of(config), dataPersistenceProvider), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); @@ -659,6 +660,10 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1)); + RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext(); + + mockRaftActor.setCurrentBehavior(new Leader(raftActorContext)); + mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes)); verify(dataPersistenceProvider).saveSnapshot(anyObject()); @@ -698,6 +703,9 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("C"), new MockRaftActorContext.MockPayload("D"))); + RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext(); + mockRaftActor.setCurrentBehavior(new Follower(raftActorContext)); + mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1)); verify(mockRaftActor.delegate).createSnapshot(); @@ -710,8 +718,6 @@ public class RaftActorTest extends AbstractActorTest { verify(dataPersistenceProvider).deleteMessages(100); - assertNotNull("Snapshot should not be null", mockRaftActor.getReplicatedLog().getSnapshot()); - assertEquals(2, mockRaftActor.getReplicatedLog().size()); assertNotNull(mockRaftActor.getReplicatedLog().get(3)); @@ -755,8 +761,6 @@ public class RaftActorTest extends AbstractActorTest { } }; - - } @Test @@ -780,13 +784,15 @@ public class RaftActorTest extends AbstractActorTest { oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,0,mock(Payload.class))); oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,1,mock(Payload.class))); - oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,2,mock(Payload.class))); + oldReplicatedLog.append( + new MockRaftActorContext.MockReplicatedLogEntry(1, 2, + mock(Payload.class))); ByteString snapshotBytes = fromObject(Arrays.asList( - new MockRaftActorContext.MockPayload("A"), - new MockRaftActorContext.MockPayload("B"), - new MockRaftActorContext.MockPayload("C"), - new MockRaftActorContext.MockPayload("D"))); + new MockRaftActorContext.MockPayload("A"), + new MockRaftActorContext.MockPayload("B"), + new MockRaftActorContext.MockPayload("C"), + new MockRaftActorContext.MockPayload("D"))); Snapshot snapshot = mock(Snapshot.class); @@ -798,9 +804,11 @@ public class RaftActorTest extends AbstractActorTest { verify(mockRaftActor.delegate).applySnapshot(eq(snapshotBytes)); - assertTrue("The replicatedLog should have changed", oldReplicatedLog != mockRaftActor.getReplicatedLog()); + assertTrue("The replicatedLog should have changed", + oldReplicatedLog != mockRaftActor.getReplicatedLog()); - assertEquals("lastApplied should be same as in the snapshot", (Long) 3L, mockRaftActor.getLastApplied()); + assertEquals("lastApplied should be same as in the snapshot", + (Long) 3L, mockRaftActor.getLastApplied()); assertEquals(0, mockRaftActor.getReplicatedLog().size()); @@ -833,6 +841,10 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("C"), new MockRaftActorContext.MockPayload("D"))); + RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext(); + + mockRaftActor.setCurrentBehavior(new Leader(raftActorContext)); + mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1)); mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes)); @@ -843,8 +855,6 @@ public class RaftActorTest extends AbstractActorTest { assertEquals("Snapshot index should not have advanced because save snapshot failed", -1, mockRaftActor.getReplicatedLog().getSnapshotIndex()); - assertNull("Snapshot should be null", mockRaftActor.getReplicatedLog().getSnapshot()); - mockActorRef.tell(PoisonPill.getInstance(), getRef()); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 030b4b2376..705c69607c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -1,15 +1,14 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; -import akka.actor.ActorSystem; import akka.actor.Props; import akka.testkit.JavaTestKit; +import com.google.common.base.Optional; import com.google.protobuf.ByteString; import org.junit.Assert; import org.junit.Test; import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; -import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; @@ -17,6 +16,8 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; import org.opendaylight.controller.cluster.raft.SerializationUtils; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; @@ -29,6 +30,7 @@ import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages; import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages; +import scala.concurrent.duration.FiniteDuration; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -36,7 +38,7 @@ import java.io.ObjectOutputStream; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -72,8 +74,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { ActorRef followerActor = getTestActor(); - MockRaftActorContext actorContext = - (MockRaftActorContext) createActorContext(); + MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext(); Map peerAddresses = new HashMap(); @@ -155,10 +156,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { }.get(); // this extracts the received message assertEquals("match", out); - } - - }; }}; } @@ -214,229 +212,360 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { } @Test - public void testSendInstallSnapshot() { - new LeaderTestKit(getSystem()) {{ + public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception { + new JavaTestKit(getSystem()) {{ + ActorRef followerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class)); - new Within(duration("1 seconds")) { - protected void run() { - ActorRef followerActor = getTestActor(); + Map peerAddresses = new HashMap(); + peerAddresses.put(followerActor.path().toString(), + followerActor.path().toString()); - Map peerAddresses = new HashMap(); - peerAddresses.put(followerActor.path().toString(), - followerActor.path().toString()); + MockRaftActorContext actorContext = + (MockRaftActorContext) createActorContext(leaderActor); + actorContext.setPeerAddresses(peerAddresses); + Map leadersSnapshot = new HashMap<>(); + leadersSnapshot.put("1", "A"); + leadersSnapshot.put("2", "B"); + leadersSnapshot.put("3", "C"); - MockRaftActorContext actorContext = - (MockRaftActorContext) createActorContext(getRef()); - actorContext.setPeerAddresses(peerAddresses); + //clears leaders log + actorContext.getReplicatedLog().removeFrom(0); + final int followersLastIndex = 2; + final int snapshotIndex = 3; + final int newEntryIndex = 4; + final int snapshotTerm = 1; + final int currentTerm = 2; - Map leadersSnapshot = new HashMap<>(); - leadersSnapshot.put("1", "A"); - leadersSnapshot.put("2", "B"); - leadersSnapshot.put("3", "C"); + // set the snapshot variables in replicatedlog + actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); + actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + actorContext.setCommitIndex(followersLastIndex); + //set follower timeout to 2 mins, helps during debugging + actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10)); - //clears leaders log - actorContext.getReplicatedLog().removeFrom(0); + MockLeader leader = new MockLeader(actorContext); - final int followersLastIndex = 2; - final int snapshotIndex = 3; - final int newEntryIndex = 4; - final int snapshotTerm = 1; - final int currentTerm = 2; + // new entry + ReplicatedLogImplEntry entry = + new ReplicatedLogImplEntry(newEntryIndex, currentTerm, + new MockRaftActorContext.MockPayload("D")); - // set the snapshot variables in replicatedlog - actorContext.getReplicatedLog().setSnapshot( - toByteString(leadersSnapshot)); - actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); - actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + //update follower timestamp + leader.markFollowerActive(followerActor.path().toString()); - MockLeader leader = new MockLeader(actorContext); - // set the follower info in leader - leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1); + ByteString bs = toByteString(leadersSnapshot); + leader.setSnapshot(Optional.of(bs)); + leader.createFollowerToSnapshot(followerActor.path().toString(), bs); - // new entry - ReplicatedLogImplEntry entry = - new ReplicatedLogImplEntry(newEntryIndex, currentTerm, - new MockRaftActorContext.MockPayload("D")); + //send first chunk and no InstallSnapshotReply received yet + leader.getFollowerToSnapshot().getNextChunk(); + leader.getFollowerToSnapshot().incrementChunkIndex(); - // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex - RaftActorBehavior raftBehavior = leader.handleMessage( - senderActor, new Replicate(null, "state-id", entry)); + leader.handleMessage(leaderActor, new SendHeartBeat()); - assertTrue(raftBehavior instanceof Leader); + AppendEntriesMessages.AppendEntries aeproto = (AppendEntriesMessages.AppendEntries)MessageCollectorActor.getFirstMatching( + followerActor, AppendEntries.SERIALIZABLE_CLASS); - // we might receive some heartbeat messages, so wait till we SendInstallSnapshot - Boolean[] matches = new ReceiveWhile(Boolean.class, duration("2 seconds")) { - @Override - protected Boolean match(Object o) throws Exception { - if (o instanceof SendInstallSnapshot) { - return true; - } - return false; - } - }.get(); + assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " + + "received", aeproto); - boolean sendInstallSnapshotReceived = false; - for (Boolean b: matches) { - sendInstallSnapshotReceived = b | sendInstallSnapshotReceived; - } + AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto); + + assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty()); + + //InstallSnapshotReply received + leader.getFollowerToSnapshot().markSendStatus(true); + + leader.handleMessage(senderActor, new SendHeartBeat()); + + InstallSnapshotMessages.InstallSnapshot isproto = (InstallSnapshotMessages.InstallSnapshot) + MessageCollectorActor.getFirstMatching(followerActor, + InstallSnapshot.SERIALIZABLE_CLASS); + + assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot", + isproto); + + InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto); - assertTrue(sendInstallSnapshotReceived); + assertEquals(snapshotIndex, is.getLastIncludedIndex()); + }}; + } + + @Test + public void testSendAppendEntriesSnapshotScenario() { + new JavaTestKit(getSystem()) {{ + + ActorRef followerActor = getTestActor(); + + Map peerAddresses = new HashMap(); + peerAddresses.put(followerActor.path().toString(), + followerActor.path().toString()); + + MockRaftActorContext actorContext = + (MockRaftActorContext) createActorContext(getRef()); + actorContext.setPeerAddresses(peerAddresses); + + Map leadersSnapshot = new HashMap<>(); + leadersSnapshot.put("1", "A"); + leadersSnapshot.put("2", "B"); + leadersSnapshot.put("3", "C"); + + //clears leaders log + actorContext.getReplicatedLog().removeFrom(0); + + final int followersLastIndex = 2; + final int snapshotIndex = 3; + final int newEntryIndex = 4; + final int snapshotTerm = 1; + final int currentTerm = 2; + + // set the snapshot variables in replicatedlog + actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); + actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + actorContext.setCommitIndex(followersLastIndex); + + Leader leader = new Leader(actorContext); + + // new entry + ReplicatedLogImplEntry entry = + new ReplicatedLogImplEntry(newEntryIndex, currentTerm, + new MockRaftActorContext.MockPayload("D")); + + //update follower timestamp + leader.markFollowerActive(followerActor.path().toString()); + + // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex + RaftActorBehavior raftBehavior = leader.handleMessage( + senderActor, new Replicate(null, "state-id", entry)); + + assertTrue(raftBehavior instanceof Leader); + + // we might receive some heartbeat messages, so wait till we InitiateInstallSnapshot + Boolean[] matches = new ReceiveWhile(Boolean.class, duration("2 seconds")) { + @Override + protected Boolean match(Object o) throws Exception { + if (o instanceof InitiateInstallSnapshot) { + return true; + } + return false; } - }; + }.get(); + + boolean initiateInitiateInstallSnapshot = false; + for (Boolean b: matches) { + initiateInitiateInstallSnapshot = b | initiateInitiateInstallSnapshot; + } + + assertTrue(initiateInitiateInstallSnapshot); }}; } @Test - public void testInstallSnapshot() { - new LeaderTestKit(getSystem()) {{ + public void testInitiateInstallSnapshot() throws Exception { + new JavaTestKit(getSystem()) {{ - new Within(duration("1 seconds")) { - protected void run() { - ActorRef followerActor = getTestActor(); + ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class)); - Map peerAddresses = new HashMap(); - peerAddresses.put(followerActor.path().toString(), - followerActor.path().toString()); + ActorRef followerActor = getTestActor(); - MockRaftActorContext actorContext = - (MockRaftActorContext) createActorContext(); - actorContext.setPeerAddresses(peerAddresses); + Map peerAddresses = new HashMap(); + peerAddresses.put(followerActor.path().toString(), + followerActor.path().toString()); - Map leadersSnapshot = new HashMap<>(); - leadersSnapshot.put("1", "A"); - leadersSnapshot.put("2", "B"); - leadersSnapshot.put("3", "C"); + MockRaftActorContext actorContext = + (MockRaftActorContext) createActorContext(leaderActor); + actorContext.setPeerAddresses(peerAddresses); - //clears leaders log - actorContext.getReplicatedLog().removeFrom(0); + Map leadersSnapshot = new HashMap<>(); + leadersSnapshot.put("1", "A"); + leadersSnapshot.put("2", "B"); + leadersSnapshot.put("3", "C"); - final int followersLastIndex = 2; - final int snapshotIndex = 3; - final int newEntryIndex = 4; - final int snapshotTerm = 1; - final int currentTerm = 2; + //clears leaders log + actorContext.getReplicatedLog().removeFrom(0); - // set the snapshot variables in replicatedlog - actorContext.getReplicatedLog().setSnapshot(toByteString(leadersSnapshot)); - actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); - actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + final int followersLastIndex = 2; + final int snapshotIndex = 3; + final int newEntryIndex = 4; + final int snapshotTerm = 1; + final int currentTerm = 2; - actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); + // set the snapshot variables in replicatedlog + actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); + actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + actorContext.setLastApplied(3); + actorContext.setCommitIndex(followersLastIndex); - MockLeader leader = new MockLeader(actorContext); - // set the follower info in leader - leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1); + Leader leader = new Leader(actorContext); + // set the snapshot as absent and check if capture-snapshot is invoked. + leader.setSnapshot(Optional.absent()); - // new entry - ReplicatedLogImplEntry entry = - new ReplicatedLogImplEntry(newEntryIndex, currentTerm, - new MockRaftActorContext.MockPayload("D")); + // new entry + ReplicatedLogImplEntry entry = + new ReplicatedLogImplEntry(newEntryIndex, currentTerm, + new MockRaftActorContext.MockPayload("D")); - RaftActorBehavior raftBehavior = leader.handleMessage(senderActor, new SendInstallSnapshot()); + actorContext.getReplicatedLog().append(entry); - assertTrue(raftBehavior instanceof Leader); + // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex + RaftActorBehavior raftBehavior = leader.handleMessage( + leaderActor, new InitiateInstallSnapshot()); - // check if installsnapshot gets called with the correct values. - final String out = - new ExpectMsg(duration("1 seconds"), "match hint") { - // do not put code outside this method, will run afterwards - protected String match(Object in) { - if (in instanceof InstallSnapshotMessages.InstallSnapshot) { - InstallSnapshot is = (InstallSnapshot) - SerializationUtils.fromSerializable(in); - if (is.getData() == null) { - return "InstallSnapshot data is null"; - } - if (is.getLastIncludedIndex() != snapshotIndex) { - return is.getLastIncludedIndex() + "!=" + snapshotIndex; - } - if (is.getLastIncludedTerm() != snapshotTerm) { - return is.getLastIncludedTerm() + "!=" + snapshotTerm; - } - if (is.getTerm() == currentTerm) { - return is.getTerm() + "!=" + currentTerm; - } + CaptureSnapshot cs = (CaptureSnapshot) MessageCollectorActor. + getFirstMatching(leaderActor, CaptureSnapshot.class); - return "match"; + assertNotNull(cs); - } else { - return "message mismatch:" + in.getClass(); - } + assertTrue(cs.isInstallSnapshotInitiated()); + assertEquals(3, cs.getLastAppliedIndex()); + assertEquals(1, cs.getLastAppliedTerm()); + assertEquals(4, cs.getLastIndex()); + assertEquals(2, cs.getLastTerm()); + }}; + } + + @Test + public void testInstallSnapshot() { + new JavaTestKit(getSystem()) {{ + + ActorRef followerActor = getTestActor(); + + Map peerAddresses = new HashMap(); + peerAddresses.put(followerActor.path().toString(), + followerActor.path().toString()); + + MockRaftActorContext actorContext = + (MockRaftActorContext) createActorContext(); + actorContext.setPeerAddresses(peerAddresses); + + + Map leadersSnapshot = new HashMap<>(); + leadersSnapshot.put("1", "A"); + leadersSnapshot.put("2", "B"); + leadersSnapshot.put("3", "C"); + + //clears leaders log + actorContext.getReplicatedLog().removeFrom(0); + + final int followersLastIndex = 2; + final int snapshotIndex = 3; + final int newEntryIndex = 4; + final int snapshotTerm = 1; + final int currentTerm = 2; + + // set the snapshot variables in replicatedlog + actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); + actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); + actorContext.setCommitIndex(followersLastIndex); + + Leader leader = new Leader(actorContext); + + // new entry + ReplicatedLogImplEntry entry = + new ReplicatedLogImplEntry(newEntryIndex, currentTerm, + new MockRaftActorContext.MockPayload("D")); + + RaftActorBehavior raftBehavior = leader.handleMessage(senderActor, + new SendInstallSnapshot(toByteString(leadersSnapshot))); + + assertTrue(raftBehavior instanceof Leader); + + // check if installsnapshot gets called with the correct values. + final String out = + new ExpectMsg(duration("1 seconds"), "match hint") { + // do not put code outside this method, will run afterwards + protected String match(Object in) { + if (in instanceof InstallSnapshotMessages.InstallSnapshot) { + InstallSnapshot is = (InstallSnapshot) + SerializationUtils.fromSerializable(in); + if (is.getData() == null) { + return "InstallSnapshot data is null"; + } + if (is.getLastIncludedIndex() != snapshotIndex) { + return is.getLastIncludedIndex() + "!=" + snapshotIndex; + } + if (is.getLastIncludedTerm() != snapshotTerm) { + return is.getLastIncludedTerm() + "!=" + snapshotTerm; + } + if (is.getTerm() == currentTerm) { + return is.getTerm() + "!=" + currentTerm; } - }.get(); // this extracts the received message - assertEquals("match", out); - } - }; + return "match"; + + } else { + return "message mismatch:" + in.getClass(); + } + } + }.get(); // this extracts the received message + + assertEquals("match", out); }}; } @Test public void testHandleInstallSnapshotReplyLastChunk() { - new LeaderTestKit(getSystem()) {{ - new Within(duration("1 seconds")) { - protected void run() { - ActorRef followerActor = getTestActor(); + new JavaTestKit(getSystem()) {{ - Map peerAddresses = new HashMap(); - peerAddresses.put(followerActor.path().toString(), - followerActor.path().toString()); + ActorRef followerActor = getTestActor(); - MockRaftActorContext actorContext = - (MockRaftActorContext) createActorContext(); - actorContext.setPeerAddresses(peerAddresses); + Map peerAddresses = new HashMap(); + peerAddresses.put(followerActor.path().toString(), + followerActor.path().toString()); - final int followersLastIndex = 2; - final int snapshotIndex = 3; - final int newEntryIndex = 4; - final int snapshotTerm = 1; - final int currentTerm = 2; - - MockLeader leader = new MockLeader(actorContext); - // set the follower info in leader - leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1); - - Map leadersSnapshot = new HashMap<>(); - leadersSnapshot.put("1", "A"); - leadersSnapshot.put("2", "B"); - leadersSnapshot.put("3", "C"); - - // set the snapshot variables in replicatedlog - actorContext.getReplicatedLog().setSnapshot( - toByteString(leadersSnapshot)); - actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); - actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); - actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); - - ByteString bs = toByteString(leadersSnapshot); - leader.createFollowerToSnapshot(followerActor.path().toString(), bs); - while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) { - leader.getFollowerToSnapshot().getNextChunk(); - leader.getFollowerToSnapshot().incrementChunkIndex(); - } + final int followersLastIndex = 2; + final int snapshotIndex = 3; + final int newEntryIndex = 4; + final int snapshotTerm = 1; + final int currentTerm = 2; - //clears leaders log - actorContext.getReplicatedLog().removeFrom(0); + MockRaftActorContext actorContext = + (MockRaftActorContext) createActorContext(); + actorContext.setPeerAddresses(peerAddresses); + actorContext.setCommitIndex(followersLastIndex); - RaftActorBehavior raftBehavior = leader.handleMessage(senderActor, - new InstallSnapshotReply(currentTerm, followerActor.path().toString(), - leader.getFollowerToSnapshot().getChunkIndex(), true)); + MockLeader leader = new MockLeader(actorContext); - assertTrue(raftBehavior instanceof Leader); + Map leadersSnapshot = new HashMap<>(); + leadersSnapshot.put("1", "A"); + leadersSnapshot.put("2", "B"); + leadersSnapshot.put("3", "C"); - assertEquals(leader.mapFollowerToSnapshot.size(), 0); - assertEquals(leader.followerToLog.size(), 1); - assertNotNull(leader.followerToLog.get(followerActor.path().toString())); - FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString()); - assertEquals(snapshotIndex, fli.getMatchIndex().get()); - assertEquals(snapshotIndex, fli.getMatchIndex().get()); - assertEquals(snapshotIndex + 1, fli.getNextIndex().get()); - } - }; + // set the snapshot variables in replicatedlog + + actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); + actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); + + ByteString bs = toByteString(leadersSnapshot); + leader.setSnapshot(Optional.of(bs)); + leader.createFollowerToSnapshot(followerActor.path().toString(), bs); + while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) { + leader.getFollowerToSnapshot().getNextChunk(); + leader.getFollowerToSnapshot().incrementChunkIndex(); + } + + //clears leaders log + actorContext.getReplicatedLog().removeFrom(0); + + RaftActorBehavior raftBehavior = leader.handleMessage(senderActor, + new InstallSnapshotReply(currentTerm, followerActor.path().toString(), + leader.getFollowerToSnapshot().getChunkIndex(), true)); + + assertTrue(raftBehavior instanceof Leader); + + assertEquals(leader.mapFollowerToSnapshot.size(), 0); + assertEquals(leader.followerToLog.size(), 1); + assertNotNull(leader.followerToLog.get(followerActor.path().toString())); + FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString()); + assertEquals(snapshotIndex, fli.getMatchIndex().get()); + assertEquals(snapshotIndex, fli.getMatchIndex().get()); + assertEquals(snapshotIndex + 1, fli.getNextIndex().get()); }}; } @@ -584,6 +713,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { followerActorContext.setCommitIndex(1); Leader leader = new Leader(leaderActorContext); + leader.markFollowerActive(followerActor.path().toString()); leader.handleMessage(leaderActor, new SendHeartBeat()); @@ -652,6 +782,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { followerActorContext.setCommitIndex(2); Leader leader = new Leader(leaderActorContext); + leader.markFollowerActive(followerActor.path().toString()); leader.handleMessage(leaderActor, new SendHeartBeat()); @@ -816,30 +947,6 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { } - private static class LeaderTestKit extends JavaTestKit { - - private LeaderTestKit(ActorSystem actorSystem) { - super(actorSystem); - } - - protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){ - // Wait for a specific log message to show up - final boolean result = - new JavaTestKit.EventFilter(logLevel - ) { - @Override - protected Boolean run() { - return true; - } - }.from(subject.path().toString()) - .message(logMessage) - .occurrences(1).exec(); - - Assert.assertEquals(true, result); - - } - } - class MockLeader extends Leader { FollowerToSnapshot fts; @@ -848,14 +955,6 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { super(context); } - public void addToFollowerToLog(String followerId, long nextIndex, long matchIndex) { - FollowerLogInformation followerLogInformation = - new FollowerLogInformationImpl(followerId, - new AtomicLong(nextIndex), - new AtomicLong(matchIndex)); - followerToLog.put(followerId, followerLogInformation); - } - public FollowerToSnapshot getFollowerToSnapshot() { return fts; } @@ -866,4 +965,26 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { } } + + private class MockConfigParamsImpl extends DefaultConfigParamsImpl { + + private long electionTimeOutIntervalMillis; + private int snapshotChunkSize; + + public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) { + super(); + this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis; + this.snapshotChunkSize = snapshotChunkSize; + } + + @Override + public FiniteDuration getElectionTimeOutInterval() { + return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS); + } + + @Override + public int getSnapshotChunkSize() { + return snapshotChunkSize; + } + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/resources/application.conf b/opendaylight/md-sal/sal-akka-raft/src/test/resources/application.conf index 8a45108f8b..818ddf7d85 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/resources/application.conf +++ b/opendaylight/md-sal/sal-akka-raft/src/test/resources/application.conf @@ -7,7 +7,7 @@ akka { actor { # enable to test serialization only. - serialize-messages = on + serialize-messages = off serializers { java = "akka.serialization.JavaSerializer" -- 2.36.6