This patch adds a new protocol to InstallSnapshot. It the InstallSnapshotReply returns
a failure and the chunkIndex is -1 then the Leader will reset the FollowerSnapshot so
that when the next heartbeat occurs the Leader would start sending chunks from the beginning.
Change-Id: I0d5f0a4230209856ecf9bcef46220ae348f52b5d
Signed-off-by: Moiz Raja <moraja@cisco.com>
* set commitIndex = N (§5.3, §5.4).
*/
public abstract class AbstractLeader extends AbstractRaftActorBehavior {
+
+ // The index of the first chunk that is sent when installing a snapshot
+ public static final int FIRST_CHUNK_INDEX = 1;
+
+ // The index that the follower should respond with if it needs the install snapshot to be reset
+ public static final int INVALID_CHUNK_INDEX = -1;
+
+ // This would be passed as the hash code of the last chunk when sending the first chunk
+ public static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
+
protected final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
protected final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
"sending snapshot chunk failed, Will retry, Chunk:{}",
reply.getChunkIndex()
);
+
followerToSnapshot.markSendStatus(false);
}
" or Chunk Index in InstallSnapshotReply not matching {} != {}",
followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
);
+
+ if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){
+ // Since the Follower did not find this index to be valid we should reset the follower snapshot
+ // so that Installing the snapshot can resume from the beginning
+ followerToSnapshot.reset();
+ }
}
}
context.getReplicatedLog().getSnapshotTerm(),
getNextSnapshotChunk(followerId,snapshot.get()),
mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
- mapFollowerToSnapshot.get(followerId).getTotalChunks()
+ mapFollowerToSnapshot.get(followerId).getTotalChunks(),
+ Optional.of(mapFollowerToSnapshot.get(followerId).getLastChunkHashCode())
).toSerializable(),
actor()
);
private boolean replyStatus = false;
private int chunkIndex;
private int totalChunks;
+ private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
+ private int nextChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
public FollowerToSnapshot(ByteString snapshotBytes) {
this.snapshotBytes = snapshotBytes;
- replyReceivedForOffset = -1;
- chunkIndex = 1;
int size = snapshotBytes.size();
totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
LOG.debug("Snapshot {} bytes, total chunks to send:{}",
size, totalChunks);
}
+ replyReceivedForOffset = -1;
+ chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
}
public ByteString getSnapshotBytes() {
// if the chunk sent was successful
replyReceivedForOffset = offset;
replyStatus = true;
+ lastChunkHashCode = nextChunkHashCode;
} else {
// if the chunk sent was failure
replyReceivedForOffset = offset;
LOG.debug("length={}, offset={},size={}",
snapshotLength, start, size);
}
- return getSnapshotBytes().substring(start, start + size);
+ ByteString substring = getSnapshotBytes().substring(start, start + size);
+ nextChunkHashCode = substring.hashCode();
+ return substring;
+ }
+
+ /**
+ * reset should be called when the Follower needs to be sent the snapshot from the beginning
+ */
+ public void reset(){
+ offset = 0;
+ replyStatus = false;
+ replyReceivedForOffset = offset;
+ chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
+ lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
+ }
+ public int getLastChunkHashCode() {
+ return lastChunkHashCode;
}
}
package org.opendaylight.controller.cluster.raft.behaviors;
import akka.actor.ActorRef;
+import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
+import java.util.ArrayList;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
-import java.util.ArrayList;
-
/**
* The behavior of a RaftActor in the Follower state
* <p/>
* </ul>
*/
public class Follower extends AbstractRaftActorBehavior {
- private ByteString snapshotChunksCollected = ByteString.EMPTY;
+
+ private SnapshotTracker snapshotTracker = null;
public Follower(RaftActorContext context) {
super(context);
);
}
- try {
- if (installSnapshot.getChunkIndex() == installSnapshot.getTotalChunks()) {
- // this is the last chunk, create a snapshot object and apply
-
- snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
- if(LOG.isDebugEnabled()) {
- LOG.debug("Last chunk received: snapshotChunksCollected.size:{}",
- snapshotChunksCollected.size());
- }
+ if(snapshotTracker == null){
+ snapshotTracker = new SnapshotTracker(LOG, installSnapshot.getTotalChunks());
+ }
- Snapshot snapshot = Snapshot.create(snapshotChunksCollected.toByteArray(),
- new ArrayList<ReplicatedLogEntry>(),
- installSnapshot.getLastIncludedIndex(),
- installSnapshot.getLastIncludedTerm(),
- installSnapshot.getLastIncludedIndex(),
- installSnapshot.getLastIncludedTerm());
+ try {
+ if(snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(),
+ installSnapshot.getLastChunkHashCode())){
+ Snapshot snapshot = Snapshot.create(snapshotTracker.getSnapshot(),
+ new ArrayList<ReplicatedLogEntry>(),
+ installSnapshot.getLastIncludedIndex(),
+ installSnapshot.getLastIncludedTerm(),
+ installSnapshot.getLastIncludedIndex(),
+ installSnapshot.getLastIncludedTerm());
actor().tell(new ApplySnapshot(snapshot), actor());
- } else {
- // we have more to go
- snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
+ snapshotTracker = null;
- if(LOG.isDebugEnabled()) {
- LOG.debug("Chunk={},snapshotChunksCollected.size:{}",
- installSnapshot.getChunkIndex(), snapshotChunksCollected.size());
- }
}
sender.tell(new InstallSnapshotReply(
- currentTerm(), context.getId(), installSnapshot.getChunkIndex(),
- true), actor());
+ currentTerm(), context.getId(), installSnapshot.getChunkIndex(),
+ true), actor());
+
+ } catch (SnapshotTracker.InvalidChunkException e) {
+
+ sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
+ -1, false), actor());
+ snapshotTracker = null;
+
+ } catch (Exception e){
- } catch (Exception e) {
LOG.error(e, "Exception in InstallSnapshot of follower:");
//send reply with success as false. The chunk will be sent again on failure
sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
- installSnapshot.getChunkIndex(), false), actor());
+ installSnapshot.getChunkIndex(), false), actor());
+
}
}
@Override public void close() throws Exception {
stopElection();
}
+
+ @VisibleForTesting
+ ByteString getSnapshotChunksCollected(){
+ return snapshotTracker != null ? snapshotTracker.getCollectedChunks() : ByteString.EMPTY;
+ }
+
+
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import akka.event.LoggingAdapter;
+import com.google.common.base.Optional;
+import com.google.protobuf.ByteString;
+
+/**
+ * SnapshotTracker does house keeping for a snapshot that is being installed in chunks on the Follower
+ */
+public class SnapshotTracker {
+ private final LoggingAdapter LOG;
+ private final int totalChunks;
+ private ByteString collectedChunks = ByteString.EMPTY;
+ private int lastChunkIndex = AbstractLeader.FIRST_CHUNK_INDEX - 1;
+ private boolean sealed = false;
+ private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
+
+ SnapshotTracker(LoggingAdapter LOG, int totalChunks){
+ this.LOG = LOG;
+ this.totalChunks = totalChunks;
+ }
+
+ /**
+ * Adds a chunk to the tracker
+ *
+ * @param chunkIndex
+ * @param chunk
+ * @return true when the lastChunk is received
+ * @throws InvalidChunkException
+ */
+ boolean addChunk(int chunkIndex, ByteString chunk, Optional<Integer> lastChunkHashCode) throws InvalidChunkException{
+ if(sealed){
+ throw new InvalidChunkException("Invalid chunk received with chunkIndex " + chunkIndex + " all chunks already received");
+ }
+
+ if(lastChunkIndex + 1 != chunkIndex){
+ throw new InvalidChunkException("Expected chunkIndex " + (lastChunkIndex + 1) + " got " + chunkIndex);
+ }
+
+ if(lastChunkHashCode.isPresent()){
+ if(lastChunkHashCode.get() != this.lastChunkHashCode){
+ throw new InvalidChunkException("The hash code of the recorded last chunk does not match " +
+ "the senders hash code expected " + lastChunkHashCode + " was " + lastChunkHashCode.get());
+ }
+ }
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Chunk={},collectedChunks.size:{}",
+ chunkIndex, collectedChunks.size());
+ }
+
+ sealed = (chunkIndex == totalChunks);
+ lastChunkIndex = chunkIndex;
+ collectedChunks = collectedChunks.concat(chunk);
+ this.lastChunkHashCode = chunk.hashCode();
+ return sealed;
+ }
+
+ byte[] getSnapshot(){
+ if(!sealed) {
+ throw new IllegalStateException("lastChunk not received yet");
+ }
+
+ return collectedChunks.toByteArray();
+ }
+
+ ByteString getCollectedChunks(){
+ return collectedChunks;
+ }
+
+ public static class InvalidChunkException extends Exception {
+ InvalidChunkException(String message){
+ super(message);
+ }
+ }
+
+}
package org.opendaylight.controller.cluster.raft.messages;
+import com.google.common.base.Optional;
import com.google.protobuf.ByteString;
import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
private final ByteString data;
private final int chunkIndex;
private final int totalChunks;
+ private final Optional<Integer> lastChunkHashCode;
public InstallSnapshot(long term, String leaderId, long lastIncludedIndex,
- long lastIncludedTerm, ByteString data, int chunkIndex, int totalChunks) {
+ long lastIncludedTerm, ByteString data, int chunkIndex, int totalChunks, Optional<Integer> lastChunkHashCode) {
super(term);
this.leaderId = leaderId;
this.lastIncludedIndex = lastIncludedIndex;
this.data = data;
this.chunkIndex = chunkIndex;
this.totalChunks = totalChunks;
+ this.lastChunkHashCode = lastChunkHashCode;
}
+ public InstallSnapshot(long term, String leaderId, long lastIncludedIndex,
+ long lastIncludedTerm, ByteString data, int chunkIndex, int totalChunks) {
+ this(term, leaderId, lastIncludedIndex, lastIncludedTerm, data, chunkIndex, totalChunks, Optional.<Integer>absent());
+ }
+
+
public String getLeaderId() {
return leaderId;
}
return totalChunks;
}
- public <T extends Object> Object toSerializable(){
- return InstallSnapshotMessages.InstallSnapshot.newBuilder()
- .setLeaderId(this.getLeaderId())
- .setChunkIndex(this.getChunkIndex())
- .setData(this.getData())
- .setLastIncludedIndex(this.getLastIncludedIndex())
- .setLastIncludedTerm(this.getLastIncludedTerm())
- .setTotalChunks(this.getTotalChunks()).build();
+ public Optional<Integer> getLastChunkHashCode() {
+ return lastChunkHashCode;
+ }
+ public <T extends Object> Object toSerializable(){
+ InstallSnapshotMessages.InstallSnapshot.Builder builder = InstallSnapshotMessages.InstallSnapshot.newBuilder()
+ .setLeaderId(this.getLeaderId())
+ .setChunkIndex(this.getChunkIndex())
+ .setData(this.getData())
+ .setLastIncludedIndex(this.getLastIncludedIndex())
+ .setLastIncludedTerm(this.getLastIncludedTerm())
+ .setTotalChunks(this.getTotalChunks());
+
+ if(lastChunkHashCode.isPresent()){
+ builder.setLastChunkHashCode(lastChunkHashCode.get());
+ }
+ return builder.build();
}
public static InstallSnapshot fromSerializable (Object o) {
InstallSnapshotMessages.InstallSnapshot from =
(InstallSnapshotMessages.InstallSnapshot) o;
+ Optional<Integer> lastChunkHashCode = Optional.absent();
+ if(from.hasLastChunkHashCode()){
+ lastChunkHashCode = Optional.of(from.getLastChunkHashCode());
+ }
+
InstallSnapshot installSnapshot = new InstallSnapshot(from.getTerm(),
from.getLeaderId(), from.getLastIncludedIndex(),
from.getLastIncludedTerm(), from.getData(),
- from.getChunkIndex(), from.getTotalChunks());
+ from.getChunkIndex(), from.getTotalChunks(), lastChunkHashCode);
return installSnapshot;
}
package org.opendaylight.controller.cluster.raft.behaviors;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
import com.google.protobuf.ByteString;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
public class FollowerTest extends AbstractRaftActorBehaviorTest {
private final ActorRef followerActor = getSystem().actorOf(Props.create(
int offset = 0;
int snapshotLength = bsSnapshot.size();
int i = 1;
+ int chunkIndex = 1;
do {
chunkData = getNextChunk(bsSnapshot, offset);
final InstallSnapshot installSnapshot =
new InstallSnapshot(1, "leader-1", i, 1,
- chunkData, i, 3);
+ chunkData, chunkIndex, 3);
follower.handleMessage(leaderActor, installSnapshot);
offset = offset + 50;
i++;
+ chunkIndex++;
} while ((offset+50) < snapshotLength);
- final InstallSnapshot installSnapshot3 = new InstallSnapshot(1, "leader-1", 3, 1, chunkData, 3, 3);
+ final InstallSnapshot installSnapshot3 = new InstallSnapshot(1, "leader-1", 3, 1, chunkData, chunkIndex, 3);
follower.handleMessage(leaderActor, installSnapshot3);
String[] matches = new ReceiveWhile<String>(String.class, duration("2 seconds")) {
}
}.get();
+ // Verify that after a snapshot is successfully applied the collected snapshot chunks is reset to empty
+ assertEquals(ByteString.EMPTY, follower.getSnapshotChunksCollected());
+
String applySnapshotMatch = "";
for (String reply: matches) {
if (reply.startsWith("applySnapshot")) {
}};
}
+ @Test
+ public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
+ JavaTestKit javaTestKit = new JavaTestKit(getSystem()) {
+ {
+
+ ActorRef leaderActor = getSystem().actorOf(Props.create(
+ MessageCollectorActor.class));
+
+ MockRaftActorContext context = (MockRaftActorContext)
+ createActorContext(getRef());
+
+ Follower follower = (Follower) createBehavior(context);
+
+ HashMap<String, String> followerSnapshot = new HashMap<>();
+ followerSnapshot.put("1", "A");
+ followerSnapshot.put("2", "B");
+ followerSnapshot.put("3", "C");
+
+ ByteString bsSnapshot = toByteString(followerSnapshot);
+
+ final InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader-1", 3, 1, getNextChunk(bsSnapshot, 10), 3, 3);
+ follower.handleMessage(leaderActor, installSnapshot);
+
+ Object messages = executeLocalOperation(leaderActor, "get-all-messages");
+
+ assertNotNull(messages);
+ assertTrue(messages instanceof List);
+ List<Object> listMessages = (List<Object>) messages;
+
+ int installSnapshotReplyReceivedCount = 0;
+ for (Object message: listMessages) {
+ if (message instanceof InstallSnapshotReply) {
+ ++installSnapshotReplyReceivedCount;
+ }
+ }
+
+ assertEquals(1, installSnapshotReplyReceivedCount);
+ InstallSnapshotReply reply = (InstallSnapshotReply) listMessages.get(0);
+ assertEquals(false, reply.isSuccess());
+ assertEquals(-1, reply.getChunkIndex());
+ assertEquals(ByteString.EMPTY, follower.getSnapshotChunksCollected());
+
+
+ }};
+ }
+
public Object executeLocalOperation(ActorRef actor, Object message) throws Exception {
return MessageCollectorActor.getAllMessages(actor);
}
package org.opendaylight.controller.cluster.raft.behaviors;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.ByteString;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
import scala.concurrent.duration.FiniteDuration;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
public class LeaderTest extends AbstractRaftActorBehaviorTest {
Map<String, String> peerAddresses = new HashMap<>();
peerAddresses.put(followerActor.path().toString(),
- followerActor.path().toString());
+ followerActor.path().toString());
actorContext.setPeerAddresses(peerAddresses);
}};
}
+ @Test
+ public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
+ new JavaTestKit(getSystem()) {{
+
+ TestActorRef<MessageCollectorActor> followerActor =
+ TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put(followerActor.path().toString(),
+ followerActor.path().toString());
+
+ final int followersLastIndex = 2;
+ final int snapshotIndex = 3;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
+
+ MockRaftActorContext actorContext =
+ (MockRaftActorContext) createActorContext();
+
+ actorContext.setConfigParams(new DefaultConfigParamsImpl(){
+ @Override
+ public int getSnapshotChunkSize() {
+ return 50;
+ }
+ });
+ actorContext.setPeerAddresses(peerAddresses);
+ actorContext.setCommitIndex(followersLastIndex);
+
+ MockLeader leader = new MockLeader(actorContext);
+
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
+
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+ ByteString bs = toByteString(leadersSnapshot);
+ leader.setSnapshot(Optional.of(bs));
+
+ leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+
+ Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
+
+ assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+ InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+ assertEquals(1, installSnapshot.getChunkIndex());
+ assertEquals(3, installSnapshot.getTotalChunks());
+
+
+ leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), followerActor.path().toString(), -1, false));
+
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+
+ o = MessageCollectorActor.getAllMessages(followerActor).get(1);
+
+ assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+ installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+ assertEquals(1, installSnapshot.getChunkIndex());
+ assertEquals(3, installSnapshot.getTotalChunks());
+
+ followerActor.tell(PoisonPill.getInstance(), getRef());
+ }};
+ }
+
+ @Test
+ public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
+ new JavaTestKit(getSystem()) {
+ {
+
+ TestActorRef<MessageCollectorActor> followerActor =
+ TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put(followerActor.path().toString(),
+ followerActor.path().toString());
+
+ final int followersLastIndex = 2;
+ final int snapshotIndex = 3;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
+
+ MockRaftActorContext actorContext =
+ (MockRaftActorContext) createActorContext();
+
+ actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+ @Override
+ public int getSnapshotChunkSize() {
+ return 50;
+ }
+ });
+ actorContext.setPeerAddresses(peerAddresses);
+ actorContext.setCommitIndex(followersLastIndex);
+
+ MockLeader leader = new MockLeader(actorContext);
+
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
+
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+ ByteString bs = toByteString(leadersSnapshot);
+ leader.setSnapshot(Optional.of(bs));
+
+ leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+
+ Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
+
+ assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+ InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+ assertEquals(1, installSnapshot.getChunkIndex());
+ assertEquals(3, installSnapshot.getTotalChunks());
+ assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
+
+ int hashCode = installSnapshot.getData().hashCode();
+
+ leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
+
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+
+ o = MessageCollectorActor.getAllMessages(followerActor).get(1);
+
+ assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+ installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+ assertEquals(2, installSnapshot.getChunkIndex());
+ assertEquals(3, installSnapshot.getTotalChunks());
+ assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
+
+ followerActor.tell(PoisonPill.getInstance(), getRef());
+ }};
+ }
+
@Test
public void testFollowerToSnapshotLogic() {
--- /dev/null
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import akka.event.LoggingAdapter;
+import com.google.common.base.Optional;
+import com.google.protobuf.ByteString;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SnapshotTrackerTest {
+
+ Map<String, String> data;
+ ByteString byteString;
+ ByteString chunk1;
+ ByteString chunk2;
+ ByteString chunk3;
+
+ @Before
+ public void setup(){
+ data = new HashMap<>();
+ data.put("key1", "value1");
+ data.put("key2", "value2");
+ data.put("key3", "value3");
+
+ byteString = toByteString(data);
+ chunk1 = getNextChunk(byteString, 0, 10);
+ chunk2 = getNextChunk(byteString, 10, 10);
+ chunk3 = getNextChunk(byteString, 20, byteString.size());
+ }
+
+ @Test
+ public void testAddChunk() throws SnapshotTracker.InvalidChunkException {
+ SnapshotTracker tracker1 = new SnapshotTracker(mock(LoggingAdapter.class), 5);
+
+ tracker1.addChunk(1, chunk1, Optional.<Integer>absent());
+ tracker1.addChunk(2, chunk2, Optional.<Integer>absent());
+ tracker1.addChunk(3, chunk3, Optional.<Integer>absent());
+
+ // Verify that an InvalidChunkException is thrown when we try to add a chunk to a sealed tracker
+ SnapshotTracker tracker2 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+
+ tracker2.addChunk(1, chunk1, Optional.<Integer>absent());
+ tracker2.addChunk(2, chunk2, Optional.<Integer>absent());
+
+ try {
+ tracker2.addChunk(3, chunk3, Optional.<Integer>absent());
+ Assert.fail();
+ } catch(SnapshotTracker.InvalidChunkException e){
+ e.getMessage().startsWith("Invalid chunk");
+ }
+
+ // The first chunk's index must at least be FIRST_CHUNK_INDEX
+ SnapshotTracker tracker3 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+
+ try {
+ tracker3.addChunk(AbstractLeader.FIRST_CHUNK_INDEX - 1, chunk1, Optional.<Integer>absent());
+ Assert.fail();
+ } catch(SnapshotTracker.InvalidChunkException e){
+
+ }
+
+ // Out of sequence chunk indexes won't work
+ SnapshotTracker tracker4 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+
+ tracker4.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.<Integer>absent());
+
+ try {
+ tracker4.addChunk(AbstractLeader.FIRST_CHUNK_INDEX+2, chunk2, Optional.<Integer>absent());
+ Assert.fail();
+ } catch(SnapshotTracker.InvalidChunkException e){
+
+ }
+
+ // No exceptions will be thrown when invalid chunk is added with the right sequence
+ // If the lastChunkHashCode is missing
+ SnapshotTracker tracker5 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+
+ tracker5.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.<Integer>absent());
+ // Look I can add the same chunk again
+ tracker5.addChunk(AbstractLeader.FIRST_CHUNK_INDEX + 1, chunk1, Optional.<Integer>absent());
+
+ // An exception will be thrown when an invalid chunk is addedd with the right sequence
+ // when the lastChunkHashCode is present
+ SnapshotTracker tracker6 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+
+ tracker6.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.of(-1));
+
+ try {
+ // Here we add a second chunk and tell addChunk that the previous chunk had a hash code 777
+ tracker6.addChunk(AbstractLeader.FIRST_CHUNK_INDEX + 1, chunk2, Optional.of(777));
+ Assert.fail();
+ }catch(SnapshotTracker.InvalidChunkException e){
+
+ }
+
+ }
+
+ @Test
+ public void testGetSnapShot() throws SnapshotTracker.InvalidChunkException {
+
+ // Trying to get a snapshot before all chunks have been received will throw an exception
+ SnapshotTracker tracker1 = new SnapshotTracker(mock(LoggingAdapter.class), 5);
+
+ tracker1.addChunk(1, chunk1, Optional.<Integer>absent());
+ try {
+ tracker1.getSnapshot();
+ Assert.fail();
+ } catch(IllegalStateException e){
+
+ }
+
+ SnapshotTracker tracker2 = new SnapshotTracker(mock(LoggingAdapter.class), 3);
+
+ tracker2.addChunk(1, chunk1, Optional.<Integer>absent());
+ tracker2.addChunk(2, chunk2, Optional.<Integer>absent());
+ tracker2.addChunk(3, chunk3, Optional.<Integer>absent());
+
+ byte[] snapshot = tracker2.getSnapshot();
+
+ assertEquals(byteString, ByteString.copyFrom(snapshot));
+ }
+
+ @Test
+ public void testGetCollectedChunks() throws SnapshotTracker.InvalidChunkException {
+ SnapshotTracker tracker1 = new SnapshotTracker(mock(LoggingAdapter.class), 5);
+
+ ByteString chunks = chunk1.concat(chunk2);
+
+ tracker1.addChunk(1, chunk1, Optional.<Integer>absent());
+ tracker1.addChunk(2, chunk2, Optional.<Integer>absent());
+
+ assertEquals(chunks, tracker1.getCollectedChunks());
+ }
+
+ public ByteString getNextChunk (ByteString bs, int offset, int size){
+ int snapshotLength = bs.size();
+ int start = offset;
+ if (size > snapshotLength) {
+ size = snapshotLength;
+ } else {
+ if ((start + size) > snapshotLength) {
+ size = snapshotLength - start;
+ }
+ }
+ return bs.substring(start, start + size);
+ }
+
+ private ByteString toByteString(Map<String, String> state) {
+ ByteArrayOutputStream b = null;
+ ObjectOutputStream o = null;
+ try {
+ try {
+ b = new ByteArrayOutputStream();
+ o = new ObjectOutputStream(b);
+ o.writeObject(state);
+ byte[] snapshotBytes = b.toByteArray();
+ return ByteString.copyFrom(snapshotBytes);
+ } finally {
+ if (o != null) {
+ o.flush();
+ o.close();
+ }
+ if (b != null) {
+ b.close();
+ }
+ }
+ } catch (IOException e) {
+ org.junit.Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
+ }
+ return null;
+ }
+
+
+}
\ No newline at end of file
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
public class MessageCollectorActor extends UntypedActor {
private List<Object> messages = new ArrayList<>();
* <code>optional int32 totalChunks = 7;</code>
*/
int getTotalChunks();
+
+ // optional int32 lastChunkHashCode = 8;
+ /**
+ * <code>optional int32 lastChunkHashCode = 8;</code>
+ */
+ boolean hasLastChunkHashCode();
+ /**
+ * <code>optional int32 lastChunkHashCode = 8;</code>
+ */
+ int getLastChunkHashCode();
}
/**
* Protobuf type {@code org.opendaylight.controller.cluster.raft.InstallSnapshot}
totalChunks_ = input.readInt32();
break;
}
+ case 64: {
+ bitField0_ |= 0x00000080;
+ lastChunkHashCode_ = input.readInt32();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
return totalChunks_;
}
+ // optional int32 lastChunkHashCode = 8;
+ public static final int LASTCHUNKHASHCODE_FIELD_NUMBER = 8;
+ private int lastChunkHashCode_;
+ /**
+ * <code>optional int32 lastChunkHashCode = 8;</code>
+ */
+ public boolean hasLastChunkHashCode() {
+ return ((bitField0_ & 0x00000080) == 0x00000080);
+ }
+ /**
+ * <code>optional int32 lastChunkHashCode = 8;</code>
+ */
+ public int getLastChunkHashCode() {
+ return lastChunkHashCode_;
+ }
+
private void initFields() {
term_ = 0L;
leaderId_ = "";
data_ = com.google.protobuf.ByteString.EMPTY;
chunkIndex_ = 0;
totalChunks_ = 0;
+ lastChunkHashCode_ = 0;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
if (((bitField0_ & 0x00000040) == 0x00000040)) {
output.writeInt32(7, totalChunks_);
}
+ if (((bitField0_ & 0x00000080) == 0x00000080)) {
+ output.writeInt32(8, lastChunkHashCode_);
+ }
getUnknownFields().writeTo(output);
}
size += com.google.protobuf.CodedOutputStream
.computeInt32Size(7, totalChunks_);
}
+ if (((bitField0_ & 0x00000080) == 0x00000080)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt32Size(8, lastChunkHashCode_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
bitField0_ = (bitField0_ & ~0x00000020);
totalChunks_ = 0;
bitField0_ = (bitField0_ & ~0x00000040);
+ lastChunkHashCode_ = 0;
+ bitField0_ = (bitField0_ & ~0x00000080);
return this;
}
to_bitField0_ |= 0x00000040;
}
result.totalChunks_ = totalChunks_;
+ if (((from_bitField0_ & 0x00000080) == 0x00000080)) {
+ to_bitField0_ |= 0x00000080;
+ }
+ result.lastChunkHashCode_ = lastChunkHashCode_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
if (other.hasTotalChunks()) {
setTotalChunks(other.getTotalChunks());
}
+ if (other.hasLastChunkHashCode()) {
+ setLastChunkHashCode(other.getLastChunkHashCode());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
return this;
}
+ // optional int32 lastChunkHashCode = 8;
+ private int lastChunkHashCode_ ;
+ /**
+ * <code>optional int32 lastChunkHashCode = 8;</code>
+ */
+ public boolean hasLastChunkHashCode() {
+ return ((bitField0_ & 0x00000080) == 0x00000080);
+ }
+ /**
+ * <code>optional int32 lastChunkHashCode = 8;</code>
+ */
+ public int getLastChunkHashCode() {
+ return lastChunkHashCode_;
+ }
+ /**
+ * <code>optional int32 lastChunkHashCode = 8;</code>
+ */
+ public Builder setLastChunkHashCode(int value) {
+ bitField0_ |= 0x00000080;
+ lastChunkHashCode_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional int32 lastChunkHashCode = 8;</code>
+ */
+ public Builder clearLastChunkHashCode() {
+ bitField0_ = (bitField0_ & ~0x00000080);
+ lastChunkHashCode_ = 0;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:org.opendaylight.controller.cluster.raft.InstallSnapshot)
}
static {
java.lang.String[] descriptorData = {
"\n\025InstallSnapshot.proto\022(org.opendayligh" +
- "t.controller.cluster.raft\"\235\001\n\017InstallSna" +
+ "t.controller.cluster.raft\"\270\001\n\017InstallSna" +
"pshot\022\014\n\004term\030\001 \001(\003\022\020\n\010leaderId\030\002 \001(\t\022\031\n" +
"\021lastIncludedIndex\030\003 \001(\003\022\030\n\020lastIncluded" +
"Term\030\004 \001(\003\022\014\n\004data\030\005 \001(\014\022\022\n\nchunkIndex\030\006" +
- " \001(\005\022\023\n\013totalChunks\030\007 \001(\005BX\n;org.openday" +
- "light.controller.protobuff.messages.clus" +
- "ter.raftB\027InstallSnapshotMessagesH\001"
+ " \001(\005\022\023\n\013totalChunks\030\007 \001(\005\022\031\n\021lastChunkHa" +
+ "shCode\030\010 \001(\005BX\n;org.opendaylight.control" +
+ "ler.protobuff.messages.cluster.raftB\027Ins" +
+ "tallSnapshotMessagesH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor,
- new java.lang.String[] { "Term", "LeaderId", "LastIncludedIndex", "LastIncludedTerm", "Data", "ChunkIndex", "TotalChunks", });
+ new java.lang.String[] { "Term", "LeaderId", "LastIncludedIndex", "LastIncludedTerm", "Data", "ChunkIndex", "TotalChunks", "LastChunkHashCode", });
return null;
}
};
optional bytes data = 5;
optional int32 chunkIndex = 6;
optional int32 totalChunks = 7;
+ optional int32 lastChunkHashCode = 8;
}