From: Moiz Raja Date: Tue, 25 Nov 2014 05:22:34 +0000 (-0800) Subject: BUG 2371 : Leader should reset it's snapshot tracking when follower is restarted X-Git-Tag: release/lithium~819 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=3e3f54ed3ff69a24f663cf2d3b10e61c49c58ccd BUG 2371 : Leader should reset it's snapshot tracking when follower is restarted This patch adds a new protocol to InstallSnapshot. It the InstallSnapshotReply returns a failure and the chunkIndex is -1 then the Leader will reset the FollowerSnapshot so that when the next heartbeat occurs the Leader would start sending chunks from the beginning. Change-Id: I0d5f0a4230209856ecf9bcef46220ae348f52b5d Signed-off-by: Moiz Raja --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index d85ac8ef67..e5c5dc752d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -67,6 +67,16 @@ import scala.concurrent.duration.FiniteDuration; * set commitIndex = N (§5.3, §5.4). */ public abstract class AbstractLeader extends AbstractRaftActorBehavior { + + // The index of the first chunk that is sent when installing a snapshot + public static final int FIRST_CHUNK_INDEX = 1; + + // The index that the follower should respond with if it needs the install snapshot to be reset + public static final int INVALID_CHUNK_INDEX = -1; + + // This would be passed as the hash code of the last chunk when sending the first chunk + public static final int INITIAL_LAST_CHUNK_HASH_CODE = -1; + protected final Map followerToLog = new HashMap<>(); protected final Map mapFollowerToSnapshot = new HashMap<>(); @@ -332,6 +342,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { "sending snapshot chunk failed, Will retry, Chunk:{}", reply.getChunkIndex() ); + followerToSnapshot.markSendStatus(false); } @@ -341,6 +352,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { " or Chunk Index in InstallSnapshotReply not matching {} != {}", followerToSnapshot.getChunkIndex(), reply.getChunkIndex() ); + + if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){ + // Since the Follower did not find this index to be valid we should reset the follower snapshot + // so that Installing the snapshot can resume from the beginning + followerToSnapshot.reset(); + } } } @@ -539,7 +556,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { context.getReplicatedLog().getSnapshotTerm(), getNextSnapshotChunk(followerId,snapshot.get()), mapFollowerToSnapshot.get(followerId).incrementChunkIndex(), - mapFollowerToSnapshot.get(followerId).getTotalChunks() + mapFollowerToSnapshot.get(followerId).getTotalChunks(), + Optional.of(mapFollowerToSnapshot.get(followerId).getLastChunkHashCode()) ).toSerializable(), actor() ); @@ -636,11 +654,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private boolean replyStatus = false; private int chunkIndex; private int totalChunks; + private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE; + private int nextChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE; public FollowerToSnapshot(ByteString snapshotBytes) { this.snapshotBytes = snapshotBytes; - replyReceivedForOffset = -1; - chunkIndex = 1; int size = snapshotBytes.size(); totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) + ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0); @@ -648,6 +666,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { LOG.debug("Snapshot {} bytes, total chunks to send:{}", size, totalChunks); } + replyReceivedForOffset = -1; + chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX; } public ByteString getSnapshotBytes() { @@ -692,6 +712,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // if the chunk sent was successful replyReceivedForOffset = offset; replyStatus = true; + lastChunkHashCode = nextChunkHashCode; } else { // if the chunk sent was failure replyReceivedForOffset = offset; @@ -715,8 +736,24 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { LOG.debug("length={}, offset={},size={}", snapshotLength, start, size); } - return getSnapshotBytes().substring(start, start + size); + ByteString substring = getSnapshotBytes().substring(start, start + size); + nextChunkHashCode = substring.hashCode(); + return substring; + } + + /** + * reset should be called when the Follower needs to be sent the snapshot from the beginning + */ + public void reset(){ + offset = 0; + replyStatus = false; + replyReceivedForOffset = offset; + chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX; + lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE; + } + public int getLastChunkHashCode() { + return lastChunkHashCode; } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index 7ada8b31c5..b1c73f6f41 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -9,7 +9,9 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; +import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; +import java.util.ArrayList; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; @@ -23,8 +25,6 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; -import java.util.ArrayList; - /** * The behavior of a RaftActor in the Follower state *

@@ -36,7 +36,8 @@ import java.util.ArrayList; * */ public class Follower extends AbstractRaftActorBehavior { - private ByteString snapshotChunksCollected = ByteString.EMPTY; + + private SnapshotTracker snapshotTracker = null; public Follower(RaftActorContext context) { super(context); @@ -280,48 +281,54 @@ public class Follower extends AbstractRaftActorBehavior { ); } - try { - if (installSnapshot.getChunkIndex() == installSnapshot.getTotalChunks()) { - // this is the last chunk, create a snapshot object and apply - - snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData()); - if(LOG.isDebugEnabled()) { - LOG.debug("Last chunk received: snapshotChunksCollected.size:{}", - snapshotChunksCollected.size()); - } + if(snapshotTracker == null){ + snapshotTracker = new SnapshotTracker(LOG, installSnapshot.getTotalChunks()); + } - Snapshot snapshot = Snapshot.create(snapshotChunksCollected.toByteArray(), - new ArrayList(), - installSnapshot.getLastIncludedIndex(), - installSnapshot.getLastIncludedTerm(), - installSnapshot.getLastIncludedIndex(), - installSnapshot.getLastIncludedTerm()); + try { + if(snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(), + installSnapshot.getLastChunkHashCode())){ + Snapshot snapshot = Snapshot.create(snapshotTracker.getSnapshot(), + new ArrayList(), + installSnapshot.getLastIncludedIndex(), + installSnapshot.getLastIncludedTerm(), + installSnapshot.getLastIncludedIndex(), + installSnapshot.getLastIncludedTerm()); actor().tell(new ApplySnapshot(snapshot), actor()); - } else { - // we have more to go - snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData()); + snapshotTracker = null; - if(LOG.isDebugEnabled()) { - LOG.debug("Chunk={},snapshotChunksCollected.size:{}", - installSnapshot.getChunkIndex(), snapshotChunksCollected.size()); - } } sender.tell(new InstallSnapshotReply( - currentTerm(), context.getId(), installSnapshot.getChunkIndex(), - true), actor()); + currentTerm(), context.getId(), installSnapshot.getChunkIndex(), + true), actor()); + + } catch (SnapshotTracker.InvalidChunkException e) { + + sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(), + -1, false), actor()); + snapshotTracker = null; + + } catch (Exception e){ - } catch (Exception e) { LOG.error(e, "Exception in InstallSnapshot of follower:"); //send reply with success as false. The chunk will be sent again on failure sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(), - installSnapshot.getChunkIndex(), false), actor()); + installSnapshot.getChunkIndex(), false), actor()); + } } @Override public void close() throws Exception { stopElection(); } + + @VisibleForTesting + ByteString getSnapshotChunksCollected(){ + return snapshotTracker != null ? snapshotTracker.getCollectedChunks() : ByteString.EMPTY; + } + + } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java new file mode 100644 index 0000000000..26fbde0711 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.behaviors; + +import akka.event.LoggingAdapter; +import com.google.common.base.Optional; +import com.google.protobuf.ByteString; + +/** + * SnapshotTracker does house keeping for a snapshot that is being installed in chunks on the Follower + */ +public class SnapshotTracker { + private final LoggingAdapter LOG; + private final int totalChunks; + private ByteString collectedChunks = ByteString.EMPTY; + private int lastChunkIndex = AbstractLeader.FIRST_CHUNK_INDEX - 1; + private boolean sealed = false; + private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE; + + SnapshotTracker(LoggingAdapter LOG, int totalChunks){ + this.LOG = LOG; + this.totalChunks = totalChunks; + } + + /** + * Adds a chunk to the tracker + * + * @param chunkIndex + * @param chunk + * @return true when the lastChunk is received + * @throws InvalidChunkException + */ + boolean addChunk(int chunkIndex, ByteString chunk, Optional lastChunkHashCode) throws InvalidChunkException{ + if(sealed){ + throw new InvalidChunkException("Invalid chunk received with chunkIndex " + chunkIndex + " all chunks already received"); + } + + if(lastChunkIndex + 1 != chunkIndex){ + throw new InvalidChunkException("Expected chunkIndex " + (lastChunkIndex + 1) + " got " + chunkIndex); + } + + if(lastChunkHashCode.isPresent()){ + if(lastChunkHashCode.get() != this.lastChunkHashCode){ + throw new InvalidChunkException("The hash code of the recorded last chunk does not match " + + "the senders hash code expected " + lastChunkHashCode + " was " + lastChunkHashCode.get()); + } + } + + if(LOG.isDebugEnabled()) { + LOG.debug("Chunk={},collectedChunks.size:{}", + chunkIndex, collectedChunks.size()); + } + + sealed = (chunkIndex == totalChunks); + lastChunkIndex = chunkIndex; + collectedChunks = collectedChunks.concat(chunk); + this.lastChunkHashCode = chunk.hashCode(); + return sealed; + } + + byte[] getSnapshot(){ + if(!sealed) { + throw new IllegalStateException("lastChunk not received yet"); + } + + return collectedChunks.toByteArray(); + } + + ByteString getCollectedChunks(){ + return collectedChunks; + } + + public static class InvalidChunkException extends Exception { + InvalidChunkException(String message){ + super(message); + } + } + +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java index 3c4e8117c7..6337f8f6dc 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.cluster.raft.messages; +import com.google.common.base.Optional; import com.google.protobuf.ByteString; import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages; @@ -22,9 +23,10 @@ public class InstallSnapshot extends AbstractRaftRPC { private final ByteString data; private final int chunkIndex; private final int totalChunks; + private final Optional lastChunkHashCode; public InstallSnapshot(long term, String leaderId, long lastIncludedIndex, - long lastIncludedTerm, ByteString data, int chunkIndex, int totalChunks) { + long lastIncludedTerm, ByteString data, int chunkIndex, int totalChunks, Optional lastChunkHashCode) { super(term); this.leaderId = leaderId; this.lastIncludedIndex = lastIncludedIndex; @@ -32,8 +34,15 @@ public class InstallSnapshot extends AbstractRaftRPC { this.data = data; this.chunkIndex = chunkIndex; this.totalChunks = totalChunks; + this.lastChunkHashCode = lastChunkHashCode; } + public InstallSnapshot(long term, String leaderId, long lastIncludedIndex, + long lastIncludedTerm, ByteString data, int chunkIndex, int totalChunks) { + this(term, leaderId, lastIncludedIndex, lastIncludedTerm, data, chunkIndex, totalChunks, Optional.absent()); + } + + public String getLeaderId() { return leaderId; } @@ -58,25 +67,38 @@ public class InstallSnapshot extends AbstractRaftRPC { return totalChunks; } - public Object toSerializable(){ - return InstallSnapshotMessages.InstallSnapshot.newBuilder() - .setLeaderId(this.getLeaderId()) - .setChunkIndex(this.getChunkIndex()) - .setData(this.getData()) - .setLastIncludedIndex(this.getLastIncludedIndex()) - .setLastIncludedTerm(this.getLastIncludedTerm()) - .setTotalChunks(this.getTotalChunks()).build(); + public Optional getLastChunkHashCode() { + return lastChunkHashCode; + } + public Object toSerializable(){ + InstallSnapshotMessages.InstallSnapshot.Builder builder = InstallSnapshotMessages.InstallSnapshot.newBuilder() + .setLeaderId(this.getLeaderId()) + .setChunkIndex(this.getChunkIndex()) + .setData(this.getData()) + .setLastIncludedIndex(this.getLastIncludedIndex()) + .setLastIncludedTerm(this.getLastIncludedTerm()) + .setTotalChunks(this.getTotalChunks()); + + if(lastChunkHashCode.isPresent()){ + builder.setLastChunkHashCode(lastChunkHashCode.get()); + } + return builder.build(); } public static InstallSnapshot fromSerializable (Object o) { InstallSnapshotMessages.InstallSnapshot from = (InstallSnapshotMessages.InstallSnapshot) o; + Optional lastChunkHashCode = Optional.absent(); + if(from.hasLastChunkHashCode()){ + lastChunkHashCode = Optional.of(from.getLastChunkHashCode()); + } + InstallSnapshot installSnapshot = new InstallSnapshot(from.getTerm(), from.getLeaderId(), from.getLastIncludedIndex(), from.getLastIncludedTerm(), from.getData(), - from.getChunkIndex(), from.getTotalChunks()); + from.getChunkIndex(), from.getTotalChunks(), lastChunkHashCode); return installSnapshot; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index 83b9ad3ec7..0ee9693d32 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -1,9 +1,20 @@ package org.opendaylight.controller.cluster.raft.behaviors; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import akka.actor.ActorRef; import akka.actor.Props; import akka.testkit.JavaTestKit; import com.google.protobuf.ByteString; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.junit.Test; import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; @@ -20,19 +31,6 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectOutputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - public class FollowerTest extends AbstractRaftActorBehaviorTest { private final ActorRef followerActor = getSystem().actorOf(Props.create( @@ -452,18 +450,20 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { int offset = 0; int snapshotLength = bsSnapshot.size(); int i = 1; + int chunkIndex = 1; do { chunkData = getNextChunk(bsSnapshot, offset); final InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader-1", i, 1, - chunkData, i, 3); + chunkData, chunkIndex, 3); follower.handleMessage(leaderActor, installSnapshot); offset = offset + 50; i++; + chunkIndex++; } while ((offset+50) < snapshotLength); - final InstallSnapshot installSnapshot3 = new InstallSnapshot(1, "leader-1", 3, 1, chunkData, 3, 3); + final InstallSnapshot installSnapshot3 = new InstallSnapshot(1, "leader-1", 3, 1, chunkData, chunkIndex, 3); follower.handleMessage(leaderActor, installSnapshot3); String[] matches = new ReceiveWhile(String.class, duration("2 seconds")) { @@ -490,6 +490,9 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } }.get(); + // Verify that after a snapshot is successfully applied the collected snapshot chunks is reset to empty + assertEquals(ByteString.EMPTY, follower.getSnapshotChunksCollected()); + String applySnapshotMatch = ""; for (String reply: matches) { if (reply.startsWith("applySnapshot")) { @@ -517,6 +520,52 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { }}; } + @Test + public void testHandleOutOfSequenceInstallSnapshot() throws Exception { + JavaTestKit javaTestKit = new JavaTestKit(getSystem()) { + { + + ActorRef leaderActor = getSystem().actorOf(Props.create( + MessageCollectorActor.class)); + + MockRaftActorContext context = (MockRaftActorContext) + createActorContext(getRef()); + + Follower follower = (Follower) createBehavior(context); + + HashMap followerSnapshot = new HashMap<>(); + followerSnapshot.put("1", "A"); + followerSnapshot.put("2", "B"); + followerSnapshot.put("3", "C"); + + ByteString bsSnapshot = toByteString(followerSnapshot); + + final InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader-1", 3, 1, getNextChunk(bsSnapshot, 10), 3, 3); + follower.handleMessage(leaderActor, installSnapshot); + + Object messages = executeLocalOperation(leaderActor, "get-all-messages"); + + assertNotNull(messages); + assertTrue(messages instanceof List); + List listMessages = (List) messages; + + int installSnapshotReplyReceivedCount = 0; + for (Object message: listMessages) { + if (message instanceof InstallSnapshotReply) { + ++installSnapshotReplyReceivedCount; + } + } + + assertEquals(1, installSnapshotReplyReceivedCount); + InstallSnapshotReply reply = (InstallSnapshotReply) listMessages.get(0); + assertEquals(false, reply.isSuccess()); + assertEquals(-1, reply.getChunkIndex()); + assertEquals(ByteString.EMPTY, follower.getSnapshotChunksCollected()); + + + }}; + } + public Object executeLocalOperation(ActorRef actor, Object message) throws Exception { return MessageCollectorActor.getAllMessages(actor); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 0fc0b4ccfd..895fe35bff 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -1,11 +1,16 @@ package org.opendaylight.controller.cluster.raft.behaviors; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import akka.actor.ActorRef; import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.Terminated; import akka.testkit.JavaTestKit; +import akka.testkit.TestActorRef; import com.google.common.base.Optional; +import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -41,9 +46,6 @@ import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages; import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages; import scala.concurrent.duration.FiniteDuration; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; public class LeaderTest extends AbstractRaftActorBehaviorTest { @@ -125,7 +127,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { Map peerAddresses = new HashMap<>(); peerAddresses.put(followerActor.path().toString(), - followerActor.path().toString()); + followerActor.path().toString()); actorContext.setPeerAddresses(peerAddresses); @@ -570,6 +572,156 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { }}; } + @Test + public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception { + new JavaTestKit(getSystem()) {{ + + TestActorRef followerActor = + TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower"); + + Map peerAddresses = new HashMap<>(); + peerAddresses.put(followerActor.path().toString(), + followerActor.path().toString()); + + final int followersLastIndex = 2; + final int snapshotIndex = 3; + final int snapshotTerm = 1; + final int currentTerm = 2; + + MockRaftActorContext actorContext = + (MockRaftActorContext) createActorContext(); + + actorContext.setConfigParams(new DefaultConfigParamsImpl(){ + @Override + public int getSnapshotChunkSize() { + return 50; + } + }); + actorContext.setPeerAddresses(peerAddresses); + actorContext.setCommitIndex(followersLastIndex); + + MockLeader leader = new MockLeader(actorContext); + + Map leadersSnapshot = new HashMap<>(); + leadersSnapshot.put("1", "A"); + leadersSnapshot.put("2", "B"); + leadersSnapshot.put("3", "C"); + + // set the snapshot variables in replicatedlog + actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); + actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); + + ByteString bs = toByteString(leadersSnapshot); + leader.setSnapshot(Optional.of(bs)); + + leader.handleMessage(leaderActor, new SendInstallSnapshot(bs)); + + Object o = MessageCollectorActor.getAllMessages(followerActor).get(0); + + assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot); + + InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o; + + assertEquals(1, installSnapshot.getChunkIndex()); + assertEquals(3, installSnapshot.getTotalChunks()); + + + leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), followerActor.path().toString(), -1, false)); + + leader.handleMessage(leaderActor, new SendHeartBeat()); + + o = MessageCollectorActor.getAllMessages(followerActor).get(1); + + assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot); + + installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o; + + assertEquals(1, installSnapshot.getChunkIndex()); + assertEquals(3, installSnapshot.getTotalChunks()); + + followerActor.tell(PoisonPill.getInstance(), getRef()); + }}; + } + + @Test + public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception { + new JavaTestKit(getSystem()) { + { + + TestActorRef followerActor = + TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower"); + + Map peerAddresses = new HashMap<>(); + peerAddresses.put(followerActor.path().toString(), + followerActor.path().toString()); + + final int followersLastIndex = 2; + final int snapshotIndex = 3; + final int snapshotTerm = 1; + final int currentTerm = 2; + + MockRaftActorContext actorContext = + (MockRaftActorContext) createActorContext(); + + actorContext.setConfigParams(new DefaultConfigParamsImpl() { + @Override + public int getSnapshotChunkSize() { + return 50; + } + }); + actorContext.setPeerAddresses(peerAddresses); + actorContext.setCommitIndex(followersLastIndex); + + MockLeader leader = new MockLeader(actorContext); + + Map leadersSnapshot = new HashMap<>(); + leadersSnapshot.put("1", "A"); + leadersSnapshot.put("2", "B"); + leadersSnapshot.put("3", "C"); + + // set the snapshot variables in replicatedlog + actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); + actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); + + ByteString bs = toByteString(leadersSnapshot); + leader.setSnapshot(Optional.of(bs)); + + leader.handleMessage(leaderActor, new SendInstallSnapshot(bs)); + + Object o = MessageCollectorActor.getAllMessages(followerActor).get(0); + + assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot); + + InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o; + + assertEquals(1, installSnapshot.getChunkIndex()); + assertEquals(3, installSnapshot.getTotalChunks()); + assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode()); + + int hashCode = installSnapshot.getData().hashCode(); + + leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true )); + + leader.handleMessage(leaderActor, new SendHeartBeat()); + + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + + o = MessageCollectorActor.getAllMessages(followerActor).get(1); + + assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot); + + installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o; + + assertEquals(2, installSnapshot.getChunkIndex()); + assertEquals(3, installSnapshot.getTotalChunks()); + assertEquals(hashCode, installSnapshot.getLastChunkHashCode()); + + followerActor.tell(PoisonPill.getInstance(), getRef()); + }}; + } + @Test public void testFollowerToSnapshotLogic() { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java new file mode 100644 index 0000000000..1b3a8f5fb5 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java @@ -0,0 +1,181 @@ +package org.opendaylight.controller.cluster.raft.behaviors; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import akka.event.LoggingAdapter; +import com.google.common.base.Optional; +import com.google.protobuf.ByteString; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.util.HashMap; +import java.util.Map; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class SnapshotTrackerTest { + + Map data; + ByteString byteString; + ByteString chunk1; + ByteString chunk2; + ByteString chunk3; + + @Before + public void setup(){ + data = new HashMap<>(); + data.put("key1", "value1"); + data.put("key2", "value2"); + data.put("key3", "value3"); + + byteString = toByteString(data); + chunk1 = getNextChunk(byteString, 0, 10); + chunk2 = getNextChunk(byteString, 10, 10); + chunk3 = getNextChunk(byteString, 20, byteString.size()); + } + + @Test + public void testAddChunk() throws SnapshotTracker.InvalidChunkException { + SnapshotTracker tracker1 = new SnapshotTracker(mock(LoggingAdapter.class), 5); + + tracker1.addChunk(1, chunk1, Optional.absent()); + tracker1.addChunk(2, chunk2, Optional.absent()); + tracker1.addChunk(3, chunk3, Optional.absent()); + + // Verify that an InvalidChunkException is thrown when we try to add a chunk to a sealed tracker + SnapshotTracker tracker2 = new SnapshotTracker(mock(LoggingAdapter.class), 2); + + tracker2.addChunk(1, chunk1, Optional.absent()); + tracker2.addChunk(2, chunk2, Optional.absent()); + + try { + tracker2.addChunk(3, chunk3, Optional.absent()); + Assert.fail(); + } catch(SnapshotTracker.InvalidChunkException e){ + e.getMessage().startsWith("Invalid chunk"); + } + + // The first chunk's index must at least be FIRST_CHUNK_INDEX + SnapshotTracker tracker3 = new SnapshotTracker(mock(LoggingAdapter.class), 2); + + try { + tracker3.addChunk(AbstractLeader.FIRST_CHUNK_INDEX - 1, chunk1, Optional.absent()); + Assert.fail(); + } catch(SnapshotTracker.InvalidChunkException e){ + + } + + // Out of sequence chunk indexes won't work + SnapshotTracker tracker4 = new SnapshotTracker(mock(LoggingAdapter.class), 2); + + tracker4.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.absent()); + + try { + tracker4.addChunk(AbstractLeader.FIRST_CHUNK_INDEX+2, chunk2, Optional.absent()); + Assert.fail(); + } catch(SnapshotTracker.InvalidChunkException e){ + + } + + // No exceptions will be thrown when invalid chunk is added with the right sequence + // If the lastChunkHashCode is missing + SnapshotTracker tracker5 = new SnapshotTracker(mock(LoggingAdapter.class), 2); + + tracker5.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.absent()); + // Look I can add the same chunk again + tracker5.addChunk(AbstractLeader.FIRST_CHUNK_INDEX + 1, chunk1, Optional.absent()); + + // An exception will be thrown when an invalid chunk is addedd with the right sequence + // when the lastChunkHashCode is present + SnapshotTracker tracker6 = new SnapshotTracker(mock(LoggingAdapter.class), 2); + + tracker6.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.of(-1)); + + try { + // Here we add a second chunk and tell addChunk that the previous chunk had a hash code 777 + tracker6.addChunk(AbstractLeader.FIRST_CHUNK_INDEX + 1, chunk2, Optional.of(777)); + Assert.fail(); + }catch(SnapshotTracker.InvalidChunkException e){ + + } + + } + + @Test + public void testGetSnapShot() throws SnapshotTracker.InvalidChunkException { + + // Trying to get a snapshot before all chunks have been received will throw an exception + SnapshotTracker tracker1 = new SnapshotTracker(mock(LoggingAdapter.class), 5); + + tracker1.addChunk(1, chunk1, Optional.absent()); + try { + tracker1.getSnapshot(); + Assert.fail(); + } catch(IllegalStateException e){ + + } + + SnapshotTracker tracker2 = new SnapshotTracker(mock(LoggingAdapter.class), 3); + + tracker2.addChunk(1, chunk1, Optional.absent()); + tracker2.addChunk(2, chunk2, Optional.absent()); + tracker2.addChunk(3, chunk3, Optional.absent()); + + byte[] snapshot = tracker2.getSnapshot(); + + assertEquals(byteString, ByteString.copyFrom(snapshot)); + } + + @Test + public void testGetCollectedChunks() throws SnapshotTracker.InvalidChunkException { + SnapshotTracker tracker1 = new SnapshotTracker(mock(LoggingAdapter.class), 5); + + ByteString chunks = chunk1.concat(chunk2); + + tracker1.addChunk(1, chunk1, Optional.absent()); + tracker1.addChunk(2, chunk2, Optional.absent()); + + assertEquals(chunks, tracker1.getCollectedChunks()); + } + + public ByteString getNextChunk (ByteString bs, int offset, int size){ + int snapshotLength = bs.size(); + int start = offset; + if (size > snapshotLength) { + size = snapshotLength; + } else { + if ((start + size) > snapshotLength) { + size = snapshotLength - start; + } + } + return bs.substring(start, start + size); + } + + private ByteString toByteString(Map state) { + ByteArrayOutputStream b = null; + ObjectOutputStream o = null; + try { + try { + b = new ByteArrayOutputStream(); + o = new ObjectOutputStream(b); + o.writeObject(state); + byte[] snapshotBytes = b.toByteArray(); + return ByteString.copyFrom(snapshotBytes); + } finally { + if (o != null) { + o.flush(); + o.close(); + } + if (b != null) { + b.close(); + } + } + } catch (IOException e) { + org.junit.Assert.fail("IOException in converting Hashmap to Bytestring:" + e); + } + return null; + } + + +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java index 9fbdd4587f..3469a956c3 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java @@ -13,15 +13,14 @@ import akka.actor.UntypedActor; import akka.pattern.Patterns; import akka.util.Timeout; import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; - public class MessageCollectorActor extends UntypedActor { private List messages = new ArrayList<>(); diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/cluster/raft/InstallSnapshotMessages.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/cluster/raft/InstallSnapshotMessages.java index b93be3e009..de7f44e213 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/cluster/raft/InstallSnapshotMessages.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/cluster/raft/InstallSnapshotMessages.java @@ -85,6 +85,16 @@ public final class InstallSnapshotMessages { * optional int32 totalChunks = 7; */ int getTotalChunks(); + + // optional int32 lastChunkHashCode = 8; + /** + * optional int32 lastChunkHashCode = 8; + */ + boolean hasLastChunkHashCode(); + /** + * optional int32 lastChunkHashCode = 8; + */ + int getLastChunkHashCode(); } /** * Protobuf type {@code org.opendaylight.controller.cluster.raft.InstallSnapshot} @@ -172,6 +182,11 @@ public final class InstallSnapshotMessages { totalChunks_ = input.readInt32(); break; } + case 64: { + bitField0_ |= 0x00000080; + lastChunkHashCode_ = input.readInt32(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -351,6 +366,22 @@ public final class InstallSnapshotMessages { return totalChunks_; } + // optional int32 lastChunkHashCode = 8; + public static final int LASTCHUNKHASHCODE_FIELD_NUMBER = 8; + private int lastChunkHashCode_; + /** + * optional int32 lastChunkHashCode = 8; + */ + public boolean hasLastChunkHashCode() { + return ((bitField0_ & 0x00000080) == 0x00000080); + } + /** + * optional int32 lastChunkHashCode = 8; + */ + public int getLastChunkHashCode() { + return lastChunkHashCode_; + } + private void initFields() { term_ = 0L; leaderId_ = ""; @@ -359,6 +390,7 @@ public final class InstallSnapshotMessages { data_ = com.google.protobuf.ByteString.EMPTY; chunkIndex_ = 0; totalChunks_ = 0; + lastChunkHashCode_ = 0; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -393,6 +425,9 @@ public final class InstallSnapshotMessages { if (((bitField0_ & 0x00000040) == 0x00000040)) { output.writeInt32(7, totalChunks_); } + if (((bitField0_ & 0x00000080) == 0x00000080)) { + output.writeInt32(8, lastChunkHashCode_); + } getUnknownFields().writeTo(output); } @@ -430,6 +465,10 @@ public final class InstallSnapshotMessages { size += com.google.protobuf.CodedOutputStream .computeInt32Size(7, totalChunks_); } + if (((bitField0_ & 0x00000080) == 0x00000080)) { + size += com.google.protobuf.CodedOutputStream + .computeInt32Size(8, lastChunkHashCode_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -560,6 +599,8 @@ public final class InstallSnapshotMessages { bitField0_ = (bitField0_ & ~0x00000020); totalChunks_ = 0; bitField0_ = (bitField0_ & ~0x00000040); + lastChunkHashCode_ = 0; + bitField0_ = (bitField0_ & ~0x00000080); return this; } @@ -616,6 +657,10 @@ public final class InstallSnapshotMessages { to_bitField0_ |= 0x00000040; } result.totalChunks_ = totalChunks_; + if (((from_bitField0_ & 0x00000080) == 0x00000080)) { + to_bitField0_ |= 0x00000080; + } + result.lastChunkHashCode_ = lastChunkHashCode_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -655,6 +700,9 @@ public final class InstallSnapshotMessages { if (other.hasTotalChunks()) { setTotalChunks(other.getTotalChunks()); } + if (other.hasLastChunkHashCode()) { + setLastChunkHashCode(other.getLastChunkHashCode()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -957,6 +1005,39 @@ public final class InstallSnapshotMessages { return this; } + // optional int32 lastChunkHashCode = 8; + private int lastChunkHashCode_ ; + /** + * optional int32 lastChunkHashCode = 8; + */ + public boolean hasLastChunkHashCode() { + return ((bitField0_ & 0x00000080) == 0x00000080); + } + /** + * optional int32 lastChunkHashCode = 8; + */ + public int getLastChunkHashCode() { + return lastChunkHashCode_; + } + /** + * optional int32 lastChunkHashCode = 8; + */ + public Builder setLastChunkHashCode(int value) { + bitField0_ |= 0x00000080; + lastChunkHashCode_ = value; + onChanged(); + return this; + } + /** + * optional int32 lastChunkHashCode = 8; + */ + public Builder clearLastChunkHashCode() { + bitField0_ = (bitField0_ & ~0x00000080); + lastChunkHashCode_ = 0; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:org.opendaylight.controller.cluster.raft.InstallSnapshot) } @@ -983,13 +1064,14 @@ public final class InstallSnapshotMessages { static { java.lang.String[] descriptorData = { "\n\025InstallSnapshot.proto\022(org.opendayligh" + - "t.controller.cluster.raft\"\235\001\n\017InstallSna" + + "t.controller.cluster.raft\"\270\001\n\017InstallSna" + "pshot\022\014\n\004term\030\001 \001(\003\022\020\n\010leaderId\030\002 \001(\t\022\031\n" + "\021lastIncludedIndex\030\003 \001(\003\022\030\n\020lastIncluded" + "Term\030\004 \001(\003\022\014\n\004data\030\005 \001(\014\022\022\n\nchunkIndex\030\006" + - " \001(\005\022\023\n\013totalChunks\030\007 \001(\005BX\n;org.openday" + - "light.controller.protobuff.messages.clus" + - "ter.raftB\027InstallSnapshotMessagesH\001" + " \001(\005\022\023\n\013totalChunks\030\007 \001(\005\022\031\n\021lastChunkHa" + + "shCode\030\010 \001(\005BX\n;org.opendaylight.control" + + "ler.protobuff.messages.cluster.raftB\027Ins" + + "tallSnapshotMessagesH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -1001,7 +1083,7 @@ public final class InstallSnapshotMessages { internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor, - new java.lang.String[] { "Term", "LeaderId", "LastIncludedIndex", "LastIncludedTerm", "Data", "ChunkIndex", "TotalChunks", }); + new java.lang.String[] { "Term", "LeaderId", "LastIncludedIndex", "LastIncludedTerm", "Data", "ChunkIndex", "TotalChunks", "LastChunkHashCode", }); return null; } }; diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/resources/InstallSnapshot.proto b/opendaylight/md-sal/sal-clustering-commons/src/main/resources/InstallSnapshot.proto index 4198644b13..adb58ae4af 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/resources/InstallSnapshot.proto +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/resources/InstallSnapshot.proto @@ -12,4 +12,5 @@ message InstallSnapshot { optional bytes data = 5; optional int32 chunkIndex = 6; optional int32 totalChunks = 7; + optional int32 lastChunkHashCode = 8; }