* Sets the payload data version of the follower.
*/
void setPayloadVersion(short payloadVersion);
+
+ /**
+ * @return the raft version of the follower.
+ */
+ short getRaftVersion();
+
+ /**
+ * Sets the raft version of the follower.
+ */
+ void setRaftVersion(short payloadVersion);
}
private short payloadVersion = -1;
+ // Assume the HELIUM_VERSION version initially for backwards compatibility until we obtain the follower's
+ // actual version via AppendEntriesReply. Although we no longer support the Helium version, a pre-Boron
+ // follower will not have the version field in AppendEntriesReply so it will be set to 0 which is
+ // HELIUM_VERSION.
+ private short raftVersion = RaftVersions.HELIUM_VERSION;
+
private final PeerInfo peerInfo;
public FollowerLogInformationImpl(PeerInfo peerInfo, long matchIndex, RaftActorContext context) {
this.payloadVersion = payloadVersion;
}
+ @Override
+ public short getRaftVersion() {
+ return raftVersion;
+ }
+
+ @Override
+ public void setRaftVersion(short raftVersion) {
+ this.raftVersion = raftVersion;
+ }
+
@Override
public String toString() {
return "FollowerLogInformationImpl [id=" + getId() + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex
public interface RaftVersions {
short HELIUM_VERSION = 0;
short LITHIUM_VERSION = 1;
- short CURRENT_VERSION = LITHIUM_VERSION;
+ short BERYLLIUM_SR3_VERSION = 3;
+ short CURRENT_VERSION = BERYLLIUM_SR3_VERSION;
}
if(AppendEntries.isSerializedType(serializable)){
return AppendEntries.fromSerializable(serializable);
- } else if (serializable.getClass().equals(InstallSnapshot.SERIALIZABLE_CLASS)) {
+ } else if (InstallSnapshot.isSerializedType(serializable)) {
return InstallSnapshot.fromSerializable(serializable);
}
return serializable;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
followerLogInformation.markFollowerActive();
followerLogInformation.setPayloadVersion(appendEntriesReply.getPayloadVersion());
+ followerLogInformation.setRaftVersion(appendEntriesReply.getRaftVersion());
boolean updated = false;
if (appendEntriesReply.isSuccess()) {
private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
try {
if (snapshot.isPresent()) {
- ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId, snapshot.get().getSnapshotBytes());
+ byte[] nextSnapshotChunk = getNextSnapshotChunk(followerId, snapshot.get().getSnapshotBytes());
// Note: the previous call to getNextSnapshotChunk has the side-effect of adding
// followerId to the followerToSnapshot map.
followerToSnapshot.incrementChunkIndex(),
followerToSnapshot.getTotalChunks(),
Optional.of(followerToSnapshot.getLastChunkHashCode())
- ).toSerializable(),
+ ).toSerializable(followerToLog.get(followerId).getRaftVersion()),
actor()
);
* Acccepts snaphot as ByteString, enters into map for future chunks
* creates and return a ByteString chunk
*/
- private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
+ private byte[] getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
if (followerToSnapshot == null) {
followerToSnapshot = new FollowerToSnapshot(snapshotBytes);
mapFollowerToSnapshot.put(followerId, followerToSnapshot);
}
- ByteString nextChunk = followerToSnapshot.getNextChunk();
+ byte[] nextChunk = followerToSnapshot.getNextChunk();
- LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerId, nextChunk.size());
+ LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerId, nextChunk.length);
return nextChunk;
}
}
}
- public ByteString getNextChunk() {
+ public byte[] getNextChunk() {
int snapshotLength = getSnapshotBytes().size();
int start = incrementOffset();
int size = context.getConfigParams().getSnapshotChunkSize();
if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) {
size = snapshotLength;
- } else {
- if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
- size = snapshotLength - start;
- }
+ } else if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
+ size = snapshotLength - start;
}
+ byte[] nextChunk = new byte[size];
+ getSnapshotBytes().copyTo(nextChunk, start, 0, size);
+ nextChunkHashCode = Arrays.hashCode(nextChunk);
- LOG.debug("{}: Next chunk: length={}, offset={},size={}", logName(),
- snapshotLength, start, size);
-
- ByteString substring = getSnapshotBytes().substring(start, start + size);
- nextChunkHashCode = substring.hashCode();
- return substring;
+ LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName(),
+ snapshotLength, start, size, nextChunkHashCode);
+ return nextChunk;
}
/**
private void handleInstallSnapshot(final ActorRef sender, InstallSnapshot installSnapshot) {
- LOG.debug("{}: InstallSnapshot received from leader {}, datasize: {} , Chunk: {}/{}",
- logName(), installSnapshot.getLeaderId(), installSnapshot.getData().size(),
- installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks());
+ LOG.debug("{}: handleInstallSnapshot: {}", logName(), installSnapshot);
leaderId = installSnapshot.getLeaderId();
import com.google.common.base.Optional;
import com.google.protobuf.ByteString;
+import java.util.Arrays;
import org.slf4j.Logger;
/**
* @return true when the lastChunk is received
* @throws InvalidChunkException
*/
- boolean addChunk(int chunkIndex, ByteString chunk, Optional<Integer> lastChunkHashCode) throws InvalidChunkException{
+ boolean addChunk(int chunkIndex, byte[] chunk, Optional<Integer> lastChunkHashCode) throws InvalidChunkException{
+ LOG.debug("addChunk: chunkIndex={}, lastChunkIndex={}, collectedChunks.size={}, lastChunkHashCode={}",
+ chunkIndex, lastChunkIndex, collectedChunks.size(), this.lastChunkHashCode);
+
if(sealed){
throw new InvalidChunkException("Invalid chunk received with chunkIndex " + chunkIndex + " all chunks already received");
}
if(lastChunkHashCode.isPresent()){
if(lastChunkHashCode.get() != this.lastChunkHashCode){
throw new InvalidChunkException("The hash code of the recorded last chunk does not match " +
- "the senders hash code expected " + lastChunkHashCode + " was " + lastChunkHashCode.get());
+ "the senders hash code, expected " + this.lastChunkHashCode + " was " + lastChunkHashCode.get());
}
}
- if(LOG.isDebugEnabled()) {
- LOG.debug("Chunk={},collectedChunks.size:{}",
- chunkIndex, collectedChunks.size());
- }
-
sealed = (chunkIndex == totalChunks);
lastChunkIndex = chunkIndex;
- collectedChunks = collectedChunks.concat(chunk);
- this.lastChunkHashCode = chunk.hashCode();
+ collectedChunks = collectedChunks.concat(ByteString.copyFrom(chunk));
+ this.lastChunkHashCode = Arrays.hashCode(chunk);
return sealed;
}
return term;
}
- public void setTerm(long term) {
+ protected void setTerm(long term) {
this.term = term;
}
}
package org.opendaylight.controller.cluster.raft.messages;
+import org.opendaylight.controller.cluster.raft.RaftVersions;
+
/**
* Reply for the AppendEntriesRpc message
*/
private final short payloadVersion;
+ private final short raftVersion = RaftVersions.CURRENT_VERSION;
+
private final boolean forceInstallSnapshot;
public AppendEntriesReply(String followerId, long term, boolean success, long logLastIndex, long logLastTerm,
this.forceInstallSnapshot = forceInstallSnapshot;
}
-
- @Override
- public long getTerm() {
- return term;
- }
-
public boolean isSuccess() {
return success;
}
return payloadVersion;
}
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("AppendEntriesReply [success=").append(success).append(", logLastIndex=").append(logLastIndex)
- .append(", logLastTerm=").append(logLastTerm).append(", followerId=").append(followerId)
- .append(", payloadVersion=").append(", forceInstallSnapshot=").append(forceInstallSnapshot)
- .append(payloadVersion).append("]");
- return builder.toString();
+ public short getRaftVersion() {
+ return raftVersion;
}
public boolean isForceInstallSnapshot() {
return forceInstallSnapshot;
}
+
+ @Override
+ public String toString() {
+ return "AppendEntriesReply [term=" + getTerm() + ", success=" + success + ", followerId=" + followerId
+ + ", logLastIndex=" + logLastIndex + ", logLastTerm=" + logLastTerm + ", forceInstallSnapshot="
+ + forceInstallSnapshot + ", payloadVersion=" + payloadVersion + ", raftVersion=" + raftVersion + "]";
+ }
}
import com.google.common.base.Optional;
import com.google.protobuf.ByteString;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.raft.RaftVersions;
import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
-public class InstallSnapshot extends AbstractRaftRPC {
-
- public static final Class<InstallSnapshotMessages.InstallSnapshot> SERIALIZABLE_CLASS = InstallSnapshotMessages.InstallSnapshot.class;
+public class InstallSnapshot extends AbstractRaftRPC implements Externalizable {
private static final long serialVersionUID = 1L;
+ public static final Class<InstallSnapshotMessages.InstallSnapshot> SERIALIZABLE_CLASS = InstallSnapshotMessages.InstallSnapshot.class;
- private final String leaderId;
- private final long lastIncludedIndex;
- private final long lastIncludedTerm;
- private final ByteString data;
- private final int chunkIndex;
- private final int totalChunks;
- private final Optional<Integer> lastChunkHashCode;
+ private String leaderId;
+ private long lastIncludedIndex;
+ private long lastIncludedTerm;
+ private byte[] data;
+ private int chunkIndex;
+ private int totalChunks;
+ private Optional<Integer> lastChunkHashCode;
+
+ /**
+ * Empty constructor to satisfy Externalizable.
+ */
+ public InstallSnapshot() {
+ }
public InstallSnapshot(long term, String leaderId, long lastIncludedIndex,
- long lastIncludedTerm, ByteString data, int chunkIndex, int totalChunks, Optional<Integer> lastChunkHashCode) {
+ long lastIncludedTerm, byte[] data, int chunkIndex, int totalChunks, Optional<Integer> lastChunkHashCode) {
super(term);
this.leaderId = leaderId;
this.lastIncludedIndex = lastIncludedIndex;
}
public InstallSnapshot(long term, String leaderId, long lastIncludedIndex,
- long lastIncludedTerm, ByteString data, int chunkIndex, int totalChunks) {
+ long lastIncludedTerm, byte[] data, int chunkIndex, int totalChunks) {
this(term, leaderId, lastIncludedIndex, lastIncludedTerm, data, chunkIndex, totalChunks, Optional.<Integer>absent());
}
-
public String getLeaderId() {
return leaderId;
}
return lastIncludedTerm;
}
- public ByteString getData() {
+ public byte[] getData() {
return data;
}
return lastChunkHashCode;
}
- public <T extends Object> Object toSerializable(){
- InstallSnapshotMessages.InstallSnapshot.Builder builder = InstallSnapshotMessages.InstallSnapshot.newBuilder()
- .setTerm(this.getTerm())
- .setLeaderId(this.getLeaderId())
- .setChunkIndex(this.getChunkIndex())
- .setData(this.getData())
- .setLastIncludedIndex(this.getLastIncludedIndex())
- .setLastIncludedTerm(this.getLastIncludedTerm())
- .setTotalChunks(this.getTotalChunks());
-
- if(lastChunkHashCode.isPresent()){
- builder.setLastChunkHashCode(lastChunkHashCode.get());
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeShort(RaftVersions.CURRENT_VERSION);
+ out.writeLong(getTerm());
+ out.writeUTF(leaderId);
+ out.writeLong(lastIncludedIndex);
+ out.writeLong(lastIncludedTerm);
+ out.writeInt(chunkIndex);
+ out.writeInt(totalChunks);
+
+ out.writeByte(lastChunkHashCode.isPresent() ? 1 : 0);
+ if(lastChunkHashCode.isPresent()) {
+ out.writeInt(lastChunkHashCode.get().intValue());
}
- return builder.build();
- }
- public static InstallSnapshot fromSerializable (Object o) {
- InstallSnapshotMessages.InstallSnapshot from =
- (InstallSnapshotMessages.InstallSnapshot) o;
+ out.writeObject(data);
+ }
- Optional<Integer> lastChunkHashCode = Optional.absent();
- if(from.hasLastChunkHashCode()){
- lastChunkHashCode = Optional.of(from.getLastChunkHashCode());
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ in.readShort(); // raft version - not currently used
+ setTerm(in.readLong());
+ leaderId = in.readUTF();
+ lastIncludedIndex = in.readLong();
+ lastIncludedTerm = in.readLong();
+ chunkIndex = in.readInt();
+ totalChunks = in.readInt();
+
+ lastChunkHashCode = Optional.absent();
+ boolean chunkHashCodePresent = in.readByte() == 1;
+ if(chunkHashCodePresent) {
+ lastChunkHashCode = Optional.of(in.readInt());
}
- InstallSnapshot installSnapshot = new InstallSnapshot(from.getTerm(),
- from.getLeaderId(), from.getLastIncludedIndex(),
- from.getLastIncludedTerm(), from.getData(),
- from.getChunkIndex(), from.getTotalChunks(), lastChunkHashCode);
+ data = (byte[])in.readObject();
+ }
- return installSnapshot;
+ public <T extends Object> Object toSerializable(short version) {
+ if(version >= RaftVersions.CURRENT_VERSION) {
+ return this;
+ } else {
+ InstallSnapshotMessages.InstallSnapshot.Builder builder = InstallSnapshotMessages.InstallSnapshot.newBuilder()
+ .setTerm(this.getTerm())
+ .setLeaderId(this.getLeaderId())
+ .setChunkIndex(this.getChunkIndex())
+ .setData(ByteString.copyFrom(getData()))
+ .setLastIncludedIndex(this.getLastIncludedIndex())
+ .setLastIncludedTerm(this.getLastIncludedTerm())
+ .setTotalChunks(this.getTotalChunks());
+
+ if(lastChunkHashCode.isPresent()){
+ builder.setLastChunkHashCode(lastChunkHashCode.get());
+ }
+ return builder.build();
+ }
}
@Override
public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("InstallSnapshot [term=").append(term).append(", leaderId=").append(leaderId)
- .append(", lastIncludedIndex=").append(lastIncludedIndex).append(", lastIncludedTerm=")
- .append(lastIncludedTerm).append(", data=").append(data).append(", chunkIndex=").append(chunkIndex)
- .append(", totalChunks=").append(totalChunks).append(", lastChunkHashCode=").append(lastChunkHashCode)
- .append("]");
- return builder.toString();
+ return "InstallSnapshot [term=" + getTerm() + ", leaderId=" + leaderId + ", lastIncludedIndex="
+ + lastIncludedIndex + ", lastIncludedTerm=" + lastIncludedTerm + ", datasize=" + data.length
+ + ", Chunk=" + chunkIndex + "/" + totalChunks + ", lastChunkHashCode=" + lastChunkHashCode + "]";
+ }
+
+ public static InstallSnapshot fromSerializable (Object o) {
+ if(o instanceof InstallSnapshot) {
+ return (InstallSnapshot)o;
+ } else {
+ InstallSnapshotMessages.InstallSnapshot from =
+ (InstallSnapshotMessages.InstallSnapshot) o;
+
+ Optional<Integer> lastChunkHashCode = Optional.absent();
+ if(from.hasLastChunkHashCode()){
+ lastChunkHashCode = Optional.of(from.getLastChunkHashCode());
+ }
+
+ InstallSnapshot installSnapshot = new InstallSnapshot(from.getTerm(),
+ from.getLeaderId(), from.getLastIncludedIndex(),
+ from.getLastIncludedTerm(), from.getData().toByteArray(),
+ from.getChunkIndex(), from.getTotalChunks(), lastChunkHashCode);
+
+ return installSnapshot;
+ }
+ }
+
+ public static boolean isSerializedType(Object message) {
+ return message instanceof InstallSnapshot || message instanceof InstallSnapshotMessages.InstallSnapshot;
}
}
}
}
+ protected static final int SNAPSHOT_CHUNK_SIZE = 100;
+
protected final Logger testLog = LoggerFactory.getLogger(getClass());
protected final TestActorFactory factory = new TestActorFactory(getSystem());
configParams.setSnapshotBatchCount(snapshotBatchCount);
configParams.setSnapshotDataThresholdPercentage(70);
configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ configParams.setSnapshotChunkSize(SNAPSHOT_CHUNK_SIZE);
return configParams;
}
List<ReplicatedLogEntry> unAppliedEntry;
ApplySnapshot applySnapshot;
InstallSnapshot installSnapshot;
- InstallSnapshotReply installSnapshotReply;
testLog.info("testInstallSnapshotToLaggingFollower starting");
+ MessageCollectorActor.clearMessages(leaderCollectorActor);
+
// Now stop dropping AppendEntries in follower 2.
follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
+
+ MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+
+ // Verify the leader's persisted snapshot. The previous snapshot (currently) won't be deleted from
+ // the snapshot store because the second snapshot was initiated by the follower install snapshot and
+ // not because the batch count was reached so the persisted journal sequence number wasn't advanced
+ // far enough to cause the previous snapshot to be deleted. This is because
+ // RaftActor#trimPersistentData subtracts the snapshotBatchCount from the snapshot's sequence number.
+ // This is OK - the next snapshot should delete it. In production, even if the system restarted
+ // before another snapshot, they would both get applied which wouldn't hurt anything.
+ persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
+ Assert.assertTrue("Expected at least 1 persisted snapshots", persistedSnapshots.size() > 0);
+ Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
+ verifySnapshot("Persisted", persistedSnapshot, currentTerm, lastAppliedIndex, currentTerm, lastAppliedIndex);
+ unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
+ assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, unAppliedEntry.size());
+
+ int snapshotSize = persistedSnapshot.getState().length;
+ int expTotalChunks = (snapshotSize / SNAPSHOT_CHUNK_SIZE) + ((snapshotSize % SNAPSHOT_CHUNK_SIZE) > 0 ? 1 : 0);
+
installSnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, InstallSnapshot.class);
assertEquals("InstallSnapshot getTerm", currentTerm, installSnapshot.getTerm());
assertEquals("InstallSnapshot getLeaderId", leaderId, installSnapshot.getLeaderId());
assertEquals("InstallSnapshot getChunkIndex", 1, installSnapshot.getChunkIndex());
- assertEquals("InstallSnapshot getTotalChunks", 1, installSnapshot.getTotalChunks());
+ assertEquals("InstallSnapshot getTotalChunks", expTotalChunks, installSnapshot.getTotalChunks());
assertEquals("InstallSnapshot getLastIncludedTerm", currentTerm, installSnapshot.getLastIncludedTerm());
assertEquals("InstallSnapshot getLastIncludedIndex", lastAppliedIndex, installSnapshot.getLastIncludedIndex());
//assertArrayEquals("InstallSnapshot getData", snapshot, installSnapshot.getData().toByteArray());
- installSnapshotReply = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, InstallSnapshotReply.class);
- assertEquals("InstallSnapshotReply getTerm", currentTerm, installSnapshotReply.getTerm());
- assertEquals("InstallSnapshotReply getChunkIndex", 1, installSnapshotReply.getChunkIndex());
- assertEquals("InstallSnapshotReply getFollowerId", follower2Id, installSnapshotReply.getFollowerId());
- assertEquals("InstallSnapshotReply isSuccess", true, installSnapshotReply.isSuccess());
+ List<InstallSnapshotReply> installSnapshotReplies = MessageCollectorActor.expectMatching(
+ leaderCollectorActor, InstallSnapshotReply.class, expTotalChunks);
+ int index = 1;
+ for(InstallSnapshotReply installSnapshotReply: installSnapshotReplies) {
+ assertEquals("InstallSnapshotReply getTerm", currentTerm, installSnapshotReply.getTerm());
+ assertEquals("InstallSnapshotReply getChunkIndex", index++, installSnapshotReply.getChunkIndex());
+ assertEquals("InstallSnapshotReply getFollowerId", follower2Id, installSnapshotReply.getFollowerId());
+ assertEquals("InstallSnapshotReply isSuccess", true, installSnapshotReply.isSuccess());
+ }
// Verify follower 2 applies the snapshot.
applySnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplySnapshot.class);
// the log. In addition replicatedToAllIndex should've advanced.
verifyLeadersTrimmedLog(lastAppliedIndex);
- // Verify the leader's persisted snapshot. The previous snapshot (currently) won't be deleted from
- // the snapshot store because the second snapshot was initiated by the follower install snapshot and
- // not because the batch count was reached so the persisted journal sequence number wasn't advanced
- // far enough to cause the previous snapshot to be deleted. This is because
- // RaftActor#trimPersistentData subtracts the snapshotBatchCount from the snapshot's sequence number.
- // This is OK - the next snapshot should delete it. In production, even if the system restarted
- // before another snapshot, they would both get applied which wouldn't hurt anything.
- persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
- Assert.assertTrue("Expected at least 1 persisted snapshots", persistedSnapshots.size() > 0);
- Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
- verifySnapshot("Persisted", persistedSnapshot, currentTerm, lastAppliedIndex, currentTerm, lastAppliedIndex);
- unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
- assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, unAppliedEntry.size());
-
MessageCollectorActor.clearMessages(leaderCollectorActor);
MessageCollectorActor.clearMessages(follower1CollectorActor);
MessageCollectorActor.clearMessages(follower2CollectorActor);
InstallSnapshot lastInstallSnapshot = null;
for(int i = 0; i < totalChunks; i++) {
- ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
+ byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
chunkData, chunkIndex, totalChunks);
follower.handleMessage(leaderActor, lastInstallSnapshot);
assertTrue(totalChunks > 1);
// Send an install snapshot with the first chunk to start the process of installing a snapshot
- ByteString chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
+ byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
chunkData, 1, totalChunks));
InstallSnapshot lastInstallSnapshot = null;
for(int i = 0; i < totalChunks; i++) {
- ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
+ byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
chunkData, chunkIndex, totalChunks);
follower.handleMessage(leaderActor, lastInstallSnapshot);
assertEquals("schedule election", 0, getElectionTimeoutCount(follower));
}
- public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){
+ public byte[] getNextChunk (ByteString bs, int offset, int chunkSize){
int snapshotLength = bs.size();
int start = offset;
int size = chunkSize;
size = snapshotLength - start;
}
}
- return bs.substring(start, start + size);
+
+ byte[] nextChunk = new byte[size];
+ bs.copyTo(nextChunk, start, 0, size);
+ return nextChunk;
}
private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.ByteString;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.RaftVersions;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
import org.opendaylight.controller.cluster.raft.SerializationUtils;
assertEquals(3, installSnapshot.getTotalChunks());
assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
- int hashCode = installSnapshot.getData().hashCode();
+ int hashCode = Arrays.hashCode(installSnapshot.getData());
followerActor.underlyingActor().clear();
j = barray.length;
}
- ByteString chunk = fts.getNextChunk();
- assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
+ byte[] chunk = fts.getNextChunk();
+ assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.length);
assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
fts.markSendStatus(true);
leader = new Leader(leaderActorContext);
+ FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+
assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
+ assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
short payloadVersion = 5;
AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
- FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+ assertEquals(2, followerInfo.getMatchIndex());
+ assertEquals(3, followerInfo.getNextIndex());
assertEquals(payloadVersion, followerInfo.getPayloadVersion());
+ assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
}
@Test
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.junit.Assert;
Map<String, String> data;
ByteString byteString;
- ByteString chunk1;
- ByteString chunk2;
- ByteString chunk3;
+ byte[] chunk1;
+ byte[] chunk2;
+ byte[] chunk3;
@Before
public void setup(){
SnapshotTracker tracker2 = new SnapshotTracker(logger, 3);
- tracker2.addChunk(1, chunk1, Optional.<Integer>absent());
- tracker2.addChunk(2, chunk2, Optional.<Integer>absent());
- tracker2.addChunk(3, chunk3, Optional.<Integer>absent());
+ tracker2.addChunk(1, chunk1, Optional.of(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE));
+ tracker2.addChunk(2, chunk2, Optional.of(Arrays.hashCode(chunk1)));
+ tracker2.addChunk(3, chunk3, Optional.of(Arrays.hashCode(chunk2)));
byte[] snapshot = tracker2.getSnapshot();
public void testGetCollectedChunks() throws SnapshotTracker.InvalidChunkException {
SnapshotTracker tracker1 = new SnapshotTracker(logger, 5);
- ByteString chunks = chunk1.concat(chunk2);
+ ByteString chunks = ByteString.copyFrom(chunk1).concat(ByteString.copyFrom(chunk2));
- tracker1.addChunk(1, chunk1, Optional.<Integer>absent());
- tracker1.addChunk(2, chunk2, Optional.<Integer>absent());
+ tracker1.addChunk(1, chunk1, Optional.of(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE));
+ tracker1.addChunk(2, chunk2, Optional.of(Arrays.hashCode(chunk1)));
assertEquals(chunks, tracker1.getCollectedChunks());
}
- public ByteString getNextChunk (ByteString bs, int offset, int size){
+ public byte[] getNextChunk (ByteString bs, int offset, int size){
int snapshotLength = bs.size();
int start = offset;
if (size > snapshotLength) {
size = snapshotLength - start;
}
}
- return bs.substring(start, start + size);
+
+ byte[] nextChunk = new byte[size];
+ bs.copyTo(nextChunk, start, 0, size);
+ return nextChunk;
}
private static ByteString toByteString(Map<String, String> state) {
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.messages;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import com.google.common.base.Optional;
+import java.io.Serializable;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.RaftVersions;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
+
+/**
+ * Unit tests for InstallSnapshot.
+ *
+ * @author Thomas Pantelis
+ */
+public class InstallSnapshotTest {
+
+ @Test
+ public void testSerialization() {
+ byte[] data = new byte[1000];
+ int j = 0;
+ for(int i = 0; i < data.length; i++) {
+ data[i] = (byte)j;
+ if(++j >= 255) {
+ j = 0;
+ }
+ }
+
+ InstallSnapshot expected = new InstallSnapshot(3L, "leaderId", 11L, 2L, data, 5, 6, Optional.<Integer>of(54321));
+
+ Object serialized = expected.toSerializable(RaftVersions.CURRENT_VERSION);
+ assertEquals("Serialized type", InstallSnapshot.class, serialized.getClass());
+
+ InstallSnapshot actual = InstallSnapshot.fromSerializable(SerializationUtils.clone((Serializable) serialized));
+ verifyInstallSnapshot(expected, actual);
+
+ expected = new InstallSnapshot(3L, "leaderId", 11L, 2L, data, 5, 6);
+ actual = InstallSnapshot.fromSerializable(SerializationUtils.clone(
+ (Serializable) expected.toSerializable(RaftVersions.CURRENT_VERSION)));
+ verifyInstallSnapshot(expected, actual);
+ }
+
+ @Test
+ public void testSerializationWithPreBeSR3Version() {
+ byte[] data = {0,1,2,3,4,5,7,8,9};
+ InstallSnapshot expected = new InstallSnapshot(3L, "leaderId", 11L, 2L, data, 5, 6, Optional.<Integer>of(54321));
+
+ Object serialized = expected.toSerializable(RaftVersions.LITHIUM_VERSION);
+ assertEquals("Serialized type", InstallSnapshot.SERIALIZABLE_CLASS, serialized.getClass());
+
+ InstallSnapshot actual = InstallSnapshot.fromSerializable(SerializationUtils.clone((Serializable) serialized));
+ verifyInstallSnapshot(expected, actual);
+ }
+
+ @Test
+ public void testIsSerializedType() {
+ assertEquals("isSerializedType", true, InstallSnapshot.isSerializedType(
+ InstallSnapshotMessages.InstallSnapshot.newBuilder().build()));
+ assertEquals("isSerializedType", true, InstallSnapshot.isSerializedType(new InstallSnapshot()));
+ assertEquals("isSerializedType", false, InstallSnapshot.isSerializedType(new Object()));
+ }
+
+ private void verifyInstallSnapshot(InstallSnapshot expected, InstallSnapshot actual) {
+ assertEquals("getTerm", expected.getTerm(), actual.getTerm());
+ assertEquals("getChunkIndex", expected.getChunkIndex(), actual.getChunkIndex());
+ assertEquals("getTotalChunks", expected.getTotalChunks(), actual.getTotalChunks());
+ assertEquals("getLastIncludedTerm", expected.getLastIncludedTerm(), actual.getLastIncludedTerm());
+ assertEquals("getLastIncludedIndex", expected.getLastIncludedIndex(), actual.getLastIncludedIndex());
+ assertEquals("getLeaderId", expected.getLeaderId(), actual.getLeaderId());
+ assertEquals("getChunkIndex", expected.getChunkIndex(), actual.getChunkIndex());
+ assertArrayEquals("getData", expected.getData(), actual.getData());
+ assertEquals("getLastChunkHashCode present", expected.getLastChunkHashCode().isPresent(),
+ actual.getLastChunkHashCode().isPresent());
+ if(expected.getLastChunkHashCode().isPresent()) {
+ assertEquals("getLastChunkHashCode", expected.getLastChunkHashCode().get(),
+ actual.getLastChunkHashCode().get());
+ }
+ }
+}
} else if(AppendEntries.LEGACY_SERIALIZABLE_CLASS.equals(o.getClass()) || o instanceof AppendEntries) {
AppendEntries req = AppendEntries.fromSerializable(o);
handleAppendEntries(req);
- } else if(InstallSnapshot.SERIALIZABLE_CLASS.equals(o.getClass())) {
+ } else if(InstallSnapshot.isSerializedType(o)) {
InstallSnapshot req = InstallSnapshot.fromSerializable(o);
handleInstallSnapshot(req);
} else if(o instanceof InstallSnapshot){