public class SerializationUtils {
public static Object fromSerializable(Object serializable){
- if (serializable.getClass().equals(InstallSnapshot.SERIALIZABLE_CLASS)) {
+ if (InstallSnapshot.isSerializedType(serializable)) {
return InstallSnapshot.fromSerializable(serializable);
}
return serializable;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
try {
if (snapshot.isPresent()) {
- ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId, snapshot.get().getSnapshotBytes());
+ byte[] nextSnapshotChunk = getNextSnapshotChunk(followerId, snapshot.get().getSnapshotBytes());
// Note: the previous call to getNextSnapshotChunk has the side-effect of adding
// followerId to the followerToSnapshot map.
followerToSnapshot.incrementChunkIndex(),
followerToSnapshot.getTotalChunks(),
Optional.of(followerToSnapshot.getLastChunkHashCode())
- ).toSerializable(),
+ ).toSerializable(followerToLog.get(followerId).getRaftVersion()),
actor()
);
* Acccepts snaphot as ByteString, enters into map for future chunks
* creates and return a ByteString chunk
*/
- private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
+ private byte[] getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
if (followerToSnapshot == null) {
followerToSnapshot = new FollowerToSnapshot(snapshotBytes);
mapFollowerToSnapshot.put(followerId, followerToSnapshot);
}
- ByteString nextChunk = followerToSnapshot.getNextChunk();
+ byte[] nextChunk = followerToSnapshot.getNextChunk();
- LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerId, nextChunk.size());
+ LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerId, nextChunk.length);
return nextChunk;
}
}
}
- public ByteString getNextChunk() {
+ public byte[] getNextChunk() {
int snapshotLength = getSnapshotBytes().size();
int start = incrementOffset();
int size = context.getConfigParams().getSnapshotChunkSize();
if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) {
size = snapshotLength;
- } else {
- if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
- size = snapshotLength - start;
- }
+ } else if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
+ size = snapshotLength - start;
}
+ byte[] nextChunk = new byte[size];
+ getSnapshotBytes().copyTo(nextChunk, start, 0, size);
+ nextChunkHashCode = Arrays.hashCode(nextChunk);
- LOG.debug("{}: Next chunk: length={}, offset={},size={}", logName(),
- snapshotLength, start, size);
-
- ByteString substring = getSnapshotBytes().substring(start, start + size);
- nextChunkHashCode = substring.hashCode();
- return substring;
+ LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName(),
+ snapshotLength, start, size, nextChunkHashCode);
+ return nextChunk;
}
/**
private void handleInstallSnapshot(final ActorRef sender, InstallSnapshot installSnapshot) {
- LOG.debug("{}: InstallSnapshot received from leader {}, datasize: {} , Chunk: {}/{}",
- logName(), installSnapshot.getLeaderId(), installSnapshot.getData().size(),
- installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks());
+ LOG.debug("{}: handleInstallSnapshot: {}", logName(), installSnapshot);
leaderId = installSnapshot.getLeaderId();
import com.google.common.base.Optional;
import com.google.protobuf.ByteString;
+import java.util.Arrays;
import org.slf4j.Logger;
/**
* @return true when the lastChunk is received
* @throws InvalidChunkException
*/
- boolean addChunk(int chunkIndex, ByteString chunk, Optional<Integer> lastChunkHashCode) throws InvalidChunkException{
+ boolean addChunk(int chunkIndex, byte[] chunk, Optional<Integer> lastChunkHashCode) throws InvalidChunkException{
+ LOG.debug("addChunk: chunkIndex={}, lastChunkIndex={}, collectedChunks.size={}, lastChunkHashCode={}",
+ chunkIndex, lastChunkIndex, collectedChunks.size(), this.lastChunkHashCode);
+
if(sealed){
throw new InvalidChunkException("Invalid chunk received with chunkIndex " + chunkIndex + " all chunks already received");
}
if(lastChunkHashCode.isPresent()){
if(lastChunkHashCode.get() != this.lastChunkHashCode){
throw new InvalidChunkException("The hash code of the recorded last chunk does not match " +
- "the senders hash code expected " + lastChunkHashCode + " was " + lastChunkHashCode.get());
+ "the senders hash code, expected " + this.lastChunkHashCode + " was " + lastChunkHashCode.get());
}
}
- if(LOG.isDebugEnabled()) {
- LOG.debug("Chunk={},collectedChunks.size:{}",
- chunkIndex, collectedChunks.size());
- }
-
sealed = (chunkIndex == totalChunks);
lastChunkIndex = chunkIndex;
- collectedChunks = collectedChunks.concat(chunk);
- this.lastChunkHashCode = chunk.hashCode();
+ collectedChunks = collectedChunks.concat(ByteString.copyFrom(chunk));
+ this.lastChunkHashCode = Arrays.hashCode(chunk);
return sealed;
}
return term;
}
- public void setTerm(long term) {
+ protected void setTerm(long term) {
this.term = term;
}
}
import com.google.common.base.Optional;
import com.google.protobuf.ByteString;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.raft.RaftVersions;
import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
-public class InstallSnapshot extends AbstractRaftRPC {
-
- public static final Class<InstallSnapshotMessages.InstallSnapshot> SERIALIZABLE_CLASS = InstallSnapshotMessages.InstallSnapshot.class;
+public class InstallSnapshot extends AbstractRaftRPC implements Externalizable {
private static final long serialVersionUID = 1L;
+ public static final Class<InstallSnapshotMessages.InstallSnapshot> SERIALIZABLE_CLASS = InstallSnapshotMessages.InstallSnapshot.class;
- private final String leaderId;
- private final long lastIncludedIndex;
- private final long lastIncludedTerm;
- private final ByteString data;
- private final int chunkIndex;
- private final int totalChunks;
- private final Optional<Integer> lastChunkHashCode;
+ private String leaderId;
+ private long lastIncludedIndex;
+ private long lastIncludedTerm;
+ private byte[] data;
+ private int chunkIndex;
+ private int totalChunks;
+ private Optional<Integer> lastChunkHashCode;
+
+ /**
+ * Empty constructor to satisfy Externalizable.
+ */
+ public InstallSnapshot() {
+ }
public InstallSnapshot(long term, String leaderId, long lastIncludedIndex,
- long lastIncludedTerm, ByteString data, int chunkIndex, int totalChunks, Optional<Integer> lastChunkHashCode) {
+ long lastIncludedTerm, byte[] data, int chunkIndex, int totalChunks, Optional<Integer> lastChunkHashCode) {
super(term);
this.leaderId = leaderId;
this.lastIncludedIndex = lastIncludedIndex;
}
public InstallSnapshot(long term, String leaderId, long lastIncludedIndex,
- long lastIncludedTerm, ByteString data, int chunkIndex, int totalChunks) {
+ long lastIncludedTerm, byte[] data, int chunkIndex, int totalChunks) {
this(term, leaderId, lastIncludedIndex, lastIncludedTerm, data, chunkIndex, totalChunks, Optional.<Integer>absent());
}
-
public String getLeaderId() {
return leaderId;
}
return lastIncludedTerm;
}
- public ByteString getData() {
+ public byte[] getData() {
return data;
}
return lastChunkHashCode;
}
- public <T extends Object> Object toSerializable(){
- InstallSnapshotMessages.InstallSnapshot.Builder builder = InstallSnapshotMessages.InstallSnapshot.newBuilder()
- .setTerm(this.getTerm())
- .setLeaderId(this.getLeaderId())
- .setChunkIndex(this.getChunkIndex())
- .setData(this.getData())
- .setLastIncludedIndex(this.getLastIncludedIndex())
- .setLastIncludedTerm(this.getLastIncludedTerm())
- .setTotalChunks(this.getTotalChunks());
-
- if(lastChunkHashCode.isPresent()){
- builder.setLastChunkHashCode(lastChunkHashCode.get());
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeShort(RaftVersions.CURRENT_VERSION);
+ out.writeLong(getTerm());
+ out.writeUTF(leaderId);
+ out.writeLong(lastIncludedIndex);
+ out.writeLong(lastIncludedTerm);
+ out.writeInt(chunkIndex);
+ out.writeInt(totalChunks);
+
+ out.writeByte(lastChunkHashCode.isPresent() ? 1 : 0);
+ if(lastChunkHashCode.isPresent()) {
+ out.writeInt(lastChunkHashCode.get().intValue());
}
- return builder.build();
- }
- public static InstallSnapshot fromSerializable (Object o) {
- InstallSnapshotMessages.InstallSnapshot from =
- (InstallSnapshotMessages.InstallSnapshot) o;
+ out.writeObject(data);
+ }
- Optional<Integer> lastChunkHashCode = Optional.absent();
- if(from.hasLastChunkHashCode()){
- lastChunkHashCode = Optional.of(from.getLastChunkHashCode());
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ in.readShort(); // raft version - not currently used
+ setTerm(in.readLong());
+ leaderId = in.readUTF();
+ lastIncludedIndex = in.readLong();
+ lastIncludedTerm = in.readLong();
+ chunkIndex = in.readInt();
+ totalChunks = in.readInt();
+
+ lastChunkHashCode = Optional.absent();
+ boolean chunkHashCodePresent = in.readByte() == 1;
+ if(chunkHashCodePresent) {
+ lastChunkHashCode = Optional.of(in.readInt());
}
- InstallSnapshot installSnapshot = new InstallSnapshot(from.getTerm(),
- from.getLeaderId(), from.getLastIncludedIndex(),
- from.getLastIncludedTerm(), from.getData(),
- from.getChunkIndex(), from.getTotalChunks(), lastChunkHashCode);
+ data = (byte[])in.readObject();
+ }
- return installSnapshot;
+ public <T extends Object> Object toSerializable(short version) {
+ if(version >= RaftVersions.BORON_VERSION) {
+ return this;
+ } else {
+ InstallSnapshotMessages.InstallSnapshot.Builder builder = InstallSnapshotMessages.InstallSnapshot.newBuilder()
+ .setTerm(this.getTerm())
+ .setLeaderId(this.getLeaderId())
+ .setChunkIndex(this.getChunkIndex())
+ .setData(ByteString.copyFrom(getData()))
+ .setLastIncludedIndex(this.getLastIncludedIndex())
+ .setLastIncludedTerm(this.getLastIncludedTerm())
+ .setTotalChunks(this.getTotalChunks());
+
+ if(lastChunkHashCode.isPresent()){
+ builder.setLastChunkHashCode(lastChunkHashCode.get());
+ }
+ return builder.build();
+ }
}
@Override
public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("InstallSnapshot [term=").append(getTerm()).append(", leaderId=").append(leaderId)
- .append(", lastIncludedIndex=").append(lastIncludedIndex).append(", lastIncludedTerm=")
- .append(lastIncludedTerm).append(", data=").append(data).append(", chunkIndex=").append(chunkIndex)
- .append(", totalChunks=").append(totalChunks).append(", lastChunkHashCode=").append(lastChunkHashCode)
- .append("]");
- return builder.toString();
+ return "InstallSnapshot [term=" + getTerm() + ", leaderId=" + leaderId + ", lastIncludedIndex="
+ + lastIncludedIndex + ", lastIncludedTerm=" + lastIncludedTerm + ", datasize=" + data.length
+ + ", Chunk=" + chunkIndex + "/" + totalChunks + ", lastChunkHashCode=" + lastChunkHashCode + "]";
+ }
+
+ public static InstallSnapshot fromSerializable (Object o) {
+ if(o instanceof InstallSnapshot) {
+ return (InstallSnapshot)o;
+ } else {
+ InstallSnapshotMessages.InstallSnapshot from =
+ (InstallSnapshotMessages.InstallSnapshot) o;
+
+ Optional<Integer> lastChunkHashCode = Optional.absent();
+ if(from.hasLastChunkHashCode()){
+ lastChunkHashCode = Optional.of(from.getLastChunkHashCode());
+ }
+
+ InstallSnapshot installSnapshot = new InstallSnapshot(from.getTerm(),
+ from.getLeaderId(), from.getLastIncludedIndex(),
+ from.getLastIncludedTerm(), from.getData().toByteArray(),
+ from.getChunkIndex(), from.getTotalChunks(), lastChunkHashCode);
+
+ return installSnapshot;
+ }
+ }
+
+ public static boolean isSerializedType(Object message) {
+ return message instanceof InstallSnapshot || message instanceof InstallSnapshotMessages.InstallSnapshot;
}
}
}
}
+ protected static final int SNAPSHOT_CHUNK_SIZE = 100;
+
protected final Logger testLog = LoggerFactory.getLogger(getClass());
protected final TestActorFactory factory = new TestActorFactory(getSystem());
configParams.setSnapshotBatchCount(snapshotBatchCount);
configParams.setSnapshotDataThresholdPercentage(70);
configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ configParams.setSnapshotChunkSize(SNAPSHOT_CHUNK_SIZE);
return configParams;
}
List<ReplicatedLogEntry> unAppliedEntry;
ApplySnapshot applySnapshot;
InstallSnapshot installSnapshot;
- InstallSnapshotReply installSnapshotReply;
testLog.info("testInstallSnapshotToLaggingFollower starting");
+ MessageCollectorActor.clearMessages(leaderCollectorActor);
+
// Now stop dropping AppendEntries in follower 2.
follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
+
+ MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+
+ // Verify the leader's persisted snapshot. The previous snapshot (currently) won't be deleted from
+ // the snapshot store because the second snapshot was initiated by the follower install snapshot and
+ // not because the batch count was reached so the persisted journal sequence number wasn't advanced
+ // far enough to cause the previous snapshot to be deleted. This is because
+ // RaftActor#trimPersistentData subtracts the snapshotBatchCount from the snapshot's sequence number.
+ // This is OK - the next snapshot should delete it. In production, even if the system restarted
+ // before another snapshot, they would both get applied which wouldn't hurt anything.
+ persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
+ Assert.assertTrue("Expected at least 1 persisted snapshots", persistedSnapshots.size() > 0);
+ Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
+ verifySnapshot("Persisted", persistedSnapshot, currentTerm, lastAppliedIndex, currentTerm, lastAppliedIndex);
+ unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
+ assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, unAppliedEntry.size());
+
+ int snapshotSize = persistedSnapshot.getState().length;
+ int expTotalChunks = (snapshotSize / SNAPSHOT_CHUNK_SIZE) + ((snapshotSize % SNAPSHOT_CHUNK_SIZE) > 0 ? 1 : 0);
+
installSnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, InstallSnapshot.class);
assertEquals("InstallSnapshot getTerm", currentTerm, installSnapshot.getTerm());
assertEquals("InstallSnapshot getLeaderId", leaderId, installSnapshot.getLeaderId());
assertEquals("InstallSnapshot getChunkIndex", 1, installSnapshot.getChunkIndex());
- assertEquals("InstallSnapshot getTotalChunks", 1, installSnapshot.getTotalChunks());
+ assertEquals("InstallSnapshot getTotalChunks", expTotalChunks, installSnapshot.getTotalChunks());
assertEquals("InstallSnapshot getLastIncludedTerm", currentTerm, installSnapshot.getLastIncludedTerm());
assertEquals("InstallSnapshot getLastIncludedIndex", lastAppliedIndex, installSnapshot.getLastIncludedIndex());
//assertArrayEquals("InstallSnapshot getData", snapshot, installSnapshot.getData().toByteArray());
- installSnapshotReply = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, InstallSnapshotReply.class);
- assertEquals("InstallSnapshotReply getTerm", currentTerm, installSnapshotReply.getTerm());
- assertEquals("InstallSnapshotReply getChunkIndex", 1, installSnapshotReply.getChunkIndex());
- assertEquals("InstallSnapshotReply getFollowerId", follower2Id, installSnapshotReply.getFollowerId());
- assertEquals("InstallSnapshotReply isSuccess", true, installSnapshotReply.isSuccess());
+ List<InstallSnapshotReply> installSnapshotReplies = MessageCollectorActor.expectMatching(
+ leaderCollectorActor, InstallSnapshotReply.class, expTotalChunks);
+ int index = 1;
+ for(InstallSnapshotReply installSnapshotReply: installSnapshotReplies) {
+ assertEquals("InstallSnapshotReply getTerm", currentTerm, installSnapshotReply.getTerm());
+ assertEquals("InstallSnapshotReply getChunkIndex", index++, installSnapshotReply.getChunkIndex());
+ assertEquals("InstallSnapshotReply getFollowerId", follower2Id, installSnapshotReply.getFollowerId());
+ assertEquals("InstallSnapshotReply isSuccess", true, installSnapshotReply.isSuccess());
+ }
// Verify follower 2 applies the snapshot.
applySnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplySnapshot.class);
// the log. In addition replicatedToAllIndex should've advanced.
verifyLeadersTrimmedLog(lastAppliedIndex);
- // Verify the leader's persisted snapshot. The previous snapshot (currently) won't be deleted from
- // the snapshot store because the second snapshot was initiated by the follower install snapshot and
- // not because the batch count was reached so the persisted journal sequence number wasn't advanced
- // far enough to cause the previous snapshot to be deleted. This is because
- // RaftActor#trimPersistentData subtracts the snapshotBatchCount from the snapshot's sequence number.
- // This is OK - the next snapshot should delete it. In production, even if the system restarted
- // before another snapshot, they would both get applied which wouldn't hurt anything.
- persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
- Assert.assertTrue("Expected at least 1 persisted snapshots", persistedSnapshots.size() > 0);
- Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
- verifySnapshot("Persisted", persistedSnapshot, currentTerm, lastAppliedIndex, currentTerm, lastAppliedIndex);
- unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
- assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, unAppliedEntry.size());
-
MessageCollectorActor.clearMessages(leaderCollectorActor);
MessageCollectorActor.clearMessages(follower1CollectorActor);
MessageCollectorActor.clearMessages(follower2CollectorActor);
InstallSnapshot lastInstallSnapshot = null;
for(int i = 0; i < totalChunks; i++) {
- ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
+ byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
chunkData, chunkIndex, totalChunks);
follower.handleMessage(leaderActor, lastInstallSnapshot);
assertTrue(totalChunks > 1);
// Send an install snapshot with the first chunk to start the process of installing a snapshot
- ByteString chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
+ byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
chunkData, 1, totalChunks));
InstallSnapshot lastInstallSnapshot = null;
for(int i = 0; i < totalChunks; i++) {
- ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
+ byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
chunkData, chunkIndex, totalChunks);
follower.handleMessage(leaderActor, lastInstallSnapshot);
assertEquals("schedule election", 0, getElectionTimeoutCount(follower));
}
- public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){
+ public byte[] getNextChunk (ByteString bs, int offset, int chunkSize){
int snapshotLength = bs.size();
int start = offset;
int size = chunkSize;
size = snapshotLength - start;
}
}
- return bs.substring(start, start + size);
+
+ byte[] nextChunk = new byte[size];
+ bs.copyTo(nextChunk, start, 0, size);
+ return nextChunk;
}
private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.ByteString;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
assertEquals(3, installSnapshot.getTotalChunks());
assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
- int hashCode = installSnapshot.getData().hashCode();
+ int hashCode = Arrays.hashCode(installSnapshot.getData());
followerActor.underlyingActor().clear();
j = barray.length;
}
- ByteString chunk = fts.getNextChunk();
- assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
+ byte[] chunk = fts.getNextChunk();
+ assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.length);
assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
fts.markSendStatus(true);
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.junit.Assert;
Map<String, String> data;
ByteString byteString;
- ByteString chunk1;
- ByteString chunk2;
- ByteString chunk3;
+ byte[] chunk1;
+ byte[] chunk2;
+ byte[] chunk3;
@Before
public void setup(){
SnapshotTracker tracker2 = new SnapshotTracker(logger, 3);
- tracker2.addChunk(1, chunk1, Optional.<Integer>absent());
- tracker2.addChunk(2, chunk2, Optional.<Integer>absent());
- tracker2.addChunk(3, chunk3, Optional.<Integer>absent());
+ tracker2.addChunk(1, chunk1, Optional.of(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE));
+ tracker2.addChunk(2, chunk2, Optional.of(Arrays.hashCode(chunk1)));
+ tracker2.addChunk(3, chunk3, Optional.of(Arrays.hashCode(chunk2)));
byte[] snapshot = tracker2.getSnapshot();
public void testGetCollectedChunks() throws SnapshotTracker.InvalidChunkException {
SnapshotTracker tracker1 = new SnapshotTracker(logger, 5);
- ByteString chunks = chunk1.concat(chunk2);
+ ByteString chunks = ByteString.copyFrom(chunk1).concat(ByteString.copyFrom(chunk2));
- tracker1.addChunk(1, chunk1, Optional.<Integer>absent());
- tracker1.addChunk(2, chunk2, Optional.<Integer>absent());
+ tracker1.addChunk(1, chunk1, Optional.of(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE));
+ tracker1.addChunk(2, chunk2, Optional.of(Arrays.hashCode(chunk1)));
assertEquals(chunks, tracker1.getCollectedChunks());
}
- public ByteString getNextChunk (ByteString bs, int offset, int size){
+ public byte[] getNextChunk (ByteString bs, int offset, int size){
int snapshotLength = bs.size();
int start = offset;
if (size > snapshotLength) {
size = snapshotLength - start;
}
}
- return bs.substring(start, start + size);
+
+ byte[] nextChunk = new byte[size];
+ bs.copyTo(nextChunk, start, 0, size);
+ return nextChunk;
}
private static ByteString toByteString(Map<String, String> state) {
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.messages;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import com.google.common.base.Optional;
+import java.io.Serializable;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.RaftVersions;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
+
+/**
+ * Unit tests for InstallSnapshot.
+ *
+ * @author Thomas Pantelis
+ */
+public class InstallSnapshotTest {
+
+ @Test
+ public void testSerialization() {
+ byte[] data = new byte[1000];
+ int j = 0;
+ for(int i = 0; i < data.length; i++) {
+ data[i] = (byte)j;
+ if(++j >= 255) {
+ j = 0;
+ }
+ }
+
+ InstallSnapshot expected = new InstallSnapshot(3L, "leaderId", 11L, 2L, data, 5, 6, Optional.<Integer>of(54321));
+
+ Object serialized = expected.toSerializable(RaftVersions.CURRENT_VERSION);
+ assertEquals("Serialized type", InstallSnapshot.class, serialized.getClass());
+
+ InstallSnapshot actual = InstallSnapshot.fromSerializable(SerializationUtils.clone((Serializable) serialized));
+ verifyInstallSnapshot(expected, actual);
+
+ expected = new InstallSnapshot(3L, "leaderId", 11L, 2L, data, 5, 6);
+ actual = InstallSnapshot.fromSerializable(SerializationUtils.clone(
+ (Serializable) expected.toSerializable(RaftVersions.CURRENT_VERSION)));
+ verifyInstallSnapshot(expected, actual);
+ }
+
+ @Test
+ public void testSerializationWithPreBoronVersion() {
+ byte[] data = {0,1,2,3,4,5,7,8,9};
+ InstallSnapshot expected = new InstallSnapshot(3L, "leaderId", 11L, 2L, data, 5, 6, Optional.<Integer>of(54321));
+
+ Object serialized = expected.toSerializable(RaftVersions.LITHIUM_VERSION);
+ assertEquals("Serialized type", InstallSnapshot.SERIALIZABLE_CLASS, serialized.getClass());
+
+ InstallSnapshot actual = InstallSnapshot.fromSerializable(SerializationUtils.clone((Serializable) serialized));
+ verifyInstallSnapshot(expected, actual);
+ }
+
+ @Test
+ public void testIsSerializedType() {
+ assertEquals("isSerializedType", true, InstallSnapshot.isSerializedType(
+ InstallSnapshotMessages.InstallSnapshot.newBuilder().build()));
+ assertEquals("isSerializedType", true, InstallSnapshot.isSerializedType(new InstallSnapshot()));
+ assertEquals("isSerializedType", false, InstallSnapshot.isSerializedType(new Object()));
+ }
+
+ private void verifyInstallSnapshot(InstallSnapshot expected, InstallSnapshot actual) {
+ assertEquals("getTerm", expected.getTerm(), actual.getTerm());
+ assertEquals("getChunkIndex", expected.getChunkIndex(), actual.getChunkIndex());
+ assertEquals("getTotalChunks", expected.getTotalChunks(), actual.getTotalChunks());
+ assertEquals("getLastIncludedTerm", expected.getLastIncludedTerm(), actual.getLastIncludedTerm());
+ assertEquals("getLastIncludedIndex", expected.getLastIncludedIndex(), actual.getLastIncludedIndex());
+ assertEquals("getLeaderId", expected.getLeaderId(), actual.getLeaderId());
+ assertEquals("getChunkIndex", expected.getChunkIndex(), actual.getChunkIndex());
+ assertArrayEquals("getData", expected.getData(), actual.getData());
+ assertEquals("getLastChunkHashCode present", expected.getLastChunkHashCode().isPresent(),
+ actual.getLastChunkHashCode().isPresent());
+ if(expected.getLastChunkHashCode().isPresent()) {
+ assertEquals("getLastChunkHashCode", expected.getLastChunkHashCode().get(),
+ actual.getLastChunkHashCode().get());
+ }
+ }
+}
package org.opendaylight.controller.protobuff.messages.cluster.raft;
+@Deprecated
public final class InstallSnapshotMessages {
private InstallSnapshotMessages() {}
public static void registerAllExtensions(
return defaultInstance;
}
+ @Override
public InstallSnapshot getDefaultInstanceForType() {
return defaultInstance;
}
return org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
}
+ @Override
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable
public static com.google.protobuf.Parser<InstallSnapshot> PARSER =
new com.google.protobuf.AbstractParser<InstallSnapshot>() {
- public InstallSnapshot parsePartialFrom(
+ @Override
+ public InstallSnapshot parsePartialFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
/**
* <code>optional int64 term = 1;</code>
*/
+ @Override
public boolean hasTerm() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
* <code>optional int64 term = 1;</code>
*/
+ @Override
public long getTerm() {
return term_;
}
/**
* <code>optional string leaderId = 2;</code>
*/
+ @Override
public boolean hasLeaderId() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
/**
* <code>optional string leaderId = 2;</code>
*/
+ @Override
public java.lang.String getLeaderId() {
java.lang.Object ref = leaderId_;
if (ref instanceof java.lang.String) {
/**
* <code>optional string leaderId = 2;</code>
*/
+ @Override
public com.google.protobuf.ByteString
getLeaderIdBytes() {
java.lang.Object ref = leaderId_;
/**
* <code>optional int64 lastIncludedIndex = 3;</code>
*/
+ @Override
public boolean hasLastIncludedIndex() {
return ((bitField0_ & 0x00000004) == 0x00000004);
}
/**
* <code>optional int64 lastIncludedIndex = 3;</code>
*/
+ @Override
public long getLastIncludedIndex() {
return lastIncludedIndex_;
}
/**
* <code>optional int64 lastIncludedTerm = 4;</code>
*/
+ @Override
public boolean hasLastIncludedTerm() {
return ((bitField0_ & 0x00000008) == 0x00000008);
}
/**
* <code>optional int64 lastIncludedTerm = 4;</code>
*/
+ @Override
public long getLastIncludedTerm() {
return lastIncludedTerm_;
}
/**
* <code>optional bytes data = 5;</code>
*/
+ @Override
public boolean hasData() {
return ((bitField0_ & 0x00000010) == 0x00000010);
}
/**
* <code>optional bytes data = 5;</code>
*/
+ @Override
public com.google.protobuf.ByteString getData() {
return data_;
}
/**
* <code>optional int32 chunkIndex = 6;</code>
*/
+ @Override
public boolean hasChunkIndex() {
return ((bitField0_ & 0x00000020) == 0x00000020);
}
/**
* <code>optional int32 chunkIndex = 6;</code>
*/
+ @Override
public int getChunkIndex() {
return chunkIndex_;
}
/**
* <code>optional int32 totalChunks = 7;</code>
*/
+ @Override
public boolean hasTotalChunks() {
return ((bitField0_ & 0x00000040) == 0x00000040);
}
/**
* <code>optional int32 totalChunks = 7;</code>
*/
+ @Override
public int getTotalChunks() {
return totalChunks_;
}
/**
* <code>optional int32 lastChunkHashCode = 8;</code>
*/
+ @Override
public boolean hasLastChunkHashCode() {
return ((bitField0_ & 0x00000080) == 0x00000080);
}
/**
* <code>optional int32 lastChunkHashCode = 8;</code>
*/
+ @Override
public int getLastChunkHashCode() {
return lastChunkHashCode_;
}
lastChunkHashCode_ = 0;
}
private byte memoizedIsInitialized = -1;
+ @Override
public final boolean isInitialized() {
byte isInitialized = memoizedIsInitialized;
- if (isInitialized != -1) return isInitialized == 1;
+ if (isInitialized != -1) {
+ return isInitialized == 1;
+ }
memoizedIsInitialized = 1;
return true;
}
+ @Override
public void writeTo(com.google.protobuf.CodedOutputStream output)
throws java.io.IOException {
getSerializedSize();
}
private int memoizedSerializedSize = -1;
+ @Override
public int getSerializedSize() {
int size = memoizedSerializedSize;
- if (size != -1) return size;
+ if (size != -1) {
+ return size;
+ }
size = 0;
if (((bitField0_ & 0x00000001) == 0x00000001)) {
}
public static Builder newBuilder() { return Builder.create(); }
+ @Override
public Builder newBuilderForType() { return newBuilder(); }
public static Builder newBuilder(org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot prototype) {
return newBuilder().mergeFrom(prototype);
}
+ @Override
public Builder toBuilder() { return newBuilder(this); }
@java.lang.Override
return org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
}
- protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ @Override
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable
.ensureFieldAccessorsInitialized(
return new Builder();
}
- public Builder clear() {
+ @Override
+ public Builder clear() {
super.clear();
term_ = 0L;
bitField0_ = (bitField0_ & ~0x00000001);
return this;
}
- public Builder clone() {
+ @Override
+ public Builder clone() {
return create().mergeFrom(buildPartial());
}
- public com.google.protobuf.Descriptors.Descriptor
+ @Override
+ public com.google.protobuf.Descriptors.Descriptor
getDescriptorForType() {
return org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
}
- public org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot getDefaultInstanceForType() {
+ @Override
+ public org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot getDefaultInstanceForType() {
return org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot.getDefaultInstance();
}
- public org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot build() {
+ @Override
+ public org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot build() {
org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(result);
return result;
}
- public org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot buildPartial() {
+ @Override
+ public org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot buildPartial() {
org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot result = new org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot(this);
int from_bitField0_ = bitField0_;
int to_bitField0_ = 0;
return result;
}
- public Builder mergeFrom(com.google.protobuf.Message other) {
+ @Override
+ public Builder mergeFrom(com.google.protobuf.Message other) {
if (other instanceof org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot) {
return mergeFrom((org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot)other);
} else {
}
public Builder mergeFrom(org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot other) {
- if (other == org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot.getDefaultInstance()) return this;
+ if (other == org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot.getDefaultInstance()) {
+ return this;
+ }
if (other.hasTerm()) {
setTerm(other.getTerm());
}
return this;
}
- public final boolean isInitialized() {
+ @Override
+ public final boolean isInitialized() {
return true;
}
- public Builder mergeFrom(
+ @Override
+ public Builder mergeFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
/**
* <code>optional int64 term = 1;</code>
*/
- public boolean hasTerm() {
+ @Override
+ public boolean hasTerm() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
* <code>optional int64 term = 1;</code>
*/
- public long getTerm() {
+ @Override
+ public long getTerm() {
return term_;
}
/**
/**
* <code>optional string leaderId = 2;</code>
*/
- public boolean hasLeaderId() {
+ @Override
+ public boolean hasLeaderId() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
/**
* <code>optional string leaderId = 2;</code>
*/
- public java.lang.String getLeaderId() {
+ @Override
+ public java.lang.String getLeaderId() {
java.lang.Object ref = leaderId_;
if (!(ref instanceof java.lang.String)) {
java.lang.String s = ((com.google.protobuf.ByteString) ref)
/**
* <code>optional string leaderId = 2;</code>
*/
- public com.google.protobuf.ByteString
+ @Override
+ public com.google.protobuf.ByteString
getLeaderIdBytes() {
java.lang.Object ref = leaderId_;
if (ref instanceof String) {
/**
* <code>optional int64 lastIncludedIndex = 3;</code>
*/
- public boolean hasLastIncludedIndex() {
+ @Override
+ public boolean hasLastIncludedIndex() {
return ((bitField0_ & 0x00000004) == 0x00000004);
}
/**
* <code>optional int64 lastIncludedIndex = 3;</code>
*/
- public long getLastIncludedIndex() {
+ @Override
+ public long getLastIncludedIndex() {
return lastIncludedIndex_;
}
/**
/**
* <code>optional int64 lastIncludedTerm = 4;</code>
*/
- public boolean hasLastIncludedTerm() {
+ @Override
+ public boolean hasLastIncludedTerm() {
return ((bitField0_ & 0x00000008) == 0x00000008);
}
/**
* <code>optional int64 lastIncludedTerm = 4;</code>
*/
- public long getLastIncludedTerm() {
+ @Override
+ public long getLastIncludedTerm() {
return lastIncludedTerm_;
}
/**
/**
* <code>optional bytes data = 5;</code>
*/
- public boolean hasData() {
+ @Override
+ public boolean hasData() {
return ((bitField0_ & 0x00000010) == 0x00000010);
}
/**
* <code>optional bytes data = 5;</code>
*/
- public com.google.protobuf.ByteString getData() {
+ @Override
+ public com.google.protobuf.ByteString getData() {
return data_;
}
/**
/**
* <code>optional int32 chunkIndex = 6;</code>
*/
- public boolean hasChunkIndex() {
+ @Override
+ public boolean hasChunkIndex() {
return ((bitField0_ & 0x00000020) == 0x00000020);
}
/**
* <code>optional int32 chunkIndex = 6;</code>
*/
- public int getChunkIndex() {
+ @Override
+ public int getChunkIndex() {
return chunkIndex_;
}
/**
/**
* <code>optional int32 totalChunks = 7;</code>
*/
- public boolean hasTotalChunks() {
+ @Override
+ public boolean hasTotalChunks() {
return ((bitField0_ & 0x00000040) == 0x00000040);
}
/**
* <code>optional int32 totalChunks = 7;</code>
*/
- public int getTotalChunks() {
+ @Override
+ public int getTotalChunks() {
return totalChunks_;
}
/**
/**
* <code>optional int32 lastChunkHashCode = 8;</code>
*/
- public boolean hasLastChunkHashCode() {
+ @Override
+ public boolean hasLastChunkHashCode() {
return ((bitField0_ & 0x00000080) == 0x00000080);
}
/**
* <code>optional int32 lastChunkHashCode = 8;</code>
*/
- public int getLastChunkHashCode() {
+ @Override
+ public int getLastChunkHashCode() {
return lastChunkHashCode_;
}
/**
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
+ @Override
public com.google.protobuf.ExtensionRegistry assignDescriptors(
com.google.protobuf.Descriptors.FileDescriptor root) {
descriptor = root;
sender().tell(new RequestVoteReply(req.getTerm(), true), self());
} else if(o instanceof AppendEntries) {
handleAppendEntries((AppendEntries)o);
- } else if(InstallSnapshot.SERIALIZABLE_CLASS.equals(o.getClass())) {
+ } else if(InstallSnapshot.isSerializedType(o)) {
InstallSnapshot req = InstallSnapshot.fromSerializable(o);
handleInstallSnapshot(req);
} else if(o instanceof InstallSnapshot){