import akka.actor.Props;
import akka.japi.Creator;
import com.google.common.base.Optional;
+import com.google.protobuf.ByteString;
import org.opendaylight.controller.cluster.example.messages.KeyValue;
import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
import org.opendaylight.controller.cluster.example.messages.PrintRole;
import org.opendaylight.controller.cluster.example.messages.PrintState;
import org.opendaylight.controller.cluster.raft.ConfigParams;
import org.opendaylight.controller.cluster.raft.RaftActor;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.util.HashMap;
import java.util.Map;
}
}
- @Override protected Object createSnapshot() {
- return state;
+ @Override protected void createSnapshot() {
+ ByteString bs = null;
+ try {
+ bs = fromObject(state);
+ } catch (Exception e) {
+ LOG.error("Exception in creating snapshot", e);
+ }
+ getSelf().tell(new CaptureSnapshotReply(bs), null);
}
- @Override protected void applySnapshot(Object snapshot) {
+ @Override protected void applySnapshot(ByteString snapshot) {
state.clear();
- state.putAll((HashMap) snapshot);
- LOG.debug("Snapshot applied to state :" + ((HashMap) snapshot).size());
+ try {
+ state.putAll((HashMap) toObject(snapshot));
+ } catch (Exception e) {
+ LOG.error("Exception in applying snapshot", e);
+ }
+ LOG.debug("Snapshot applied to state :" + ((HashMap) state).size());
+ }
+
+ private ByteString fromObject(Object snapshot) throws Exception {
+ ByteArrayOutputStream b = null;
+ ObjectOutputStream o = null;
+ try {
+ b = new ByteArrayOutputStream();
+ o = new ObjectOutputStream(b);
+ o.writeObject(snapshot);
+ byte[] snapshotBytes = b.toByteArray();
+ return ByteString.copyFrom(snapshotBytes);
+ } finally {
+ if (o != null) {
+ o.flush();
+ o.close();
+ }
+ if (b != null) {
+ b.close();
+ }
+ }
+ }
+
+ private Object toObject(ByteString bs) throws ClassNotFoundException, IOException {
+ Object obj = null;
+ ByteArrayInputStream bis = null;
+ ObjectInputStream ois = null;
+ try {
+ bis = new ByteArrayInputStream(bs.toByteArray());
+ ois = new ObjectInputStream(bis);
+ obj = ois.readObject();
+ } finally {
+ if (bis != null) {
+ bis.close();
+ }
+ if (ois != null) {
+ ois.close();
+ }
+ }
+ return obj;
}
@Override protected void onStateChanged() {
public long getSnapshotBatchCount() {
return 50;
}
+
+ @Override
+ public int getSnapshotChunkSize() {
+ return 50;
+ }
}
td.printState();
} else if (command.startsWith("printNodes")) {
td.printNodes();
+ } else {
+ System.out.println("Invalid command:" + command);
}
}
*/
package org.opendaylight.controller.cluster.raft;
+import com.google.protobuf.ByteString;
+
import java.util.ArrayList;
import java.util.List;
*/
public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
- protected final List<ReplicatedLogEntry> journal;
- protected final Object snapshot;
+ protected List<ReplicatedLogEntry> journal;
+ protected ByteString snapshot;
protected long snapshotIndex = -1;
protected long snapshotTerm = -1;
- public AbstractReplicatedLogImpl(Object state, long snapshotIndex,
+ // to be used for rollback during save snapshot failure
+ protected List<ReplicatedLogEntry> snapshottedJournal;
+ protected ByteString previousSnapshot;
+ protected long previousSnapshotIndex = -1;
+ protected long previousSnapshotTerm = -1;
+
+ public AbstractReplicatedLogImpl(ByteString state, long snapshotIndex,
long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries) {
this.snapshot = state;
this.snapshotIndex = snapshotIndex;
@Override
public boolean isInSnapshot(long logEntryIndex) {
- return logEntryIndex <= snapshotIndex;
+ return logEntryIndex <= snapshotIndex && snapshotIndex != -1;
}
@Override
- public Object getSnapshot() {
+ public ByteString getSnapshot() {
return snapshot;
}
@Override
public abstract void removeFromAndPersist(long index);
+
+ @Override
+ public void setSnapshotIndex(long snapshotIndex) {
+ this.snapshotIndex = snapshotIndex;
+ }
+
+ @Override
+ public void setSnapshotTerm(long snapshotTerm) {
+ this.snapshotTerm = snapshotTerm;
+ }
+
+ @Override
+ public void setSnapshot(ByteString snapshot) {
+ this.snapshot = snapshot;
+ }
+
+ @Override
+ public void clear(int startIndex, int endIndex) {
+ journal.subList(startIndex, endIndex).clear();
+ }
+
+ @Override
+ public void snapshotPreCommit(ByteString snapshot, long snapshotCapturedIndex, long snapshotCapturedTerm) {
+ snapshottedJournal = new ArrayList<>(journal.size());
+
+ snapshottedJournal.addAll(journal.subList(0, (int)(snapshotCapturedIndex - snapshotIndex)));
+ clear(0, (int) (snapshotCapturedIndex - snapshotIndex));
+
+ previousSnapshotIndex = snapshotIndex;
+ setSnapshotIndex(snapshotCapturedIndex);
+
+ previousSnapshotTerm = snapshotTerm;
+ setSnapshotTerm(snapshotCapturedTerm);
+
+ previousSnapshot = getSnapshot();
+ setSnapshot(snapshot);
+ }
+
+ @Override
+ public void snapshotCommit() {
+ snapshottedJournal.clear();
+ snapshottedJournal = null;
+ previousSnapshotIndex = -1;
+ previousSnapshotTerm = -1;
+ previousSnapshot = null;
+ }
+
+ @Override
+ public void snapshotRollback() {
+ snapshottedJournal.addAll(journal);
+ journal.clear();
+ journal = snapshottedJournal;
+ snapshottedJournal = null;
+
+ snapshotIndex = previousSnapshotIndex;
+ previousSnapshotIndex = -1;
+
+ snapshotTerm = previousSnapshotTerm;
+ previousSnapshotTerm = -1;
+
+ snapshot = previousSnapshot;
+ previousSnapshot = null;
+
+ }
}
* @return int
*/
public int getElectionTimeVariance();
+
+ /**
+ * The size (in bytes) of the snapshot chunk sent from Leader
+ */
+ public int getSnapshotChunkSize();
}
*/
private static final int ELECTION_TIME_MAX_VARIANCE = 100;
+ private final int SNAPSHOT_CHUNK_SIZE = 2048 * 1000; //2MB
+
/**
* The interval at which a heart beat message will be sent to the remote
public int getElectionTimeVariance() {
return ELECTION_TIME_MAX_VARIANCE;
}
+
+ @Override
+ public int getSnapshotChunkSize() {
+ return SNAPSHOT_CHUNK_SIZE;
+ }
}
import akka.persistence.SnapshotOffer;
import akka.persistence.SnapshotSelectionCriteria;
import akka.persistence.UntypedPersistentActor;
+import com.google.common.base.Optional;
+import com.google.protobuf.ByteString;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
-import com.google.common.base.Optional;
+import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.behaviors.Candidate;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
import org.opendaylight.controller.cluster.raft.behaviors.Leader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
import java.io.Serializable;
-import java.util.List;
import java.util.Map;
/**
*/
private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl();
+ private CaptureSnapshot captureSnapshot = null;
+
+ private volatile boolean hasSnapshotCaptureInitiated = false;
public RaftActor(String id, Map<String, String> peerAddresses) {
this(id, peerAddresses, Optional.<ConfigParams>absent());
replicatedLog = new ReplicatedLogImpl(snapshot);
context.setReplicatedLog(replicatedLog);
+ context.setLastApplied(snapshot.getLastAppliedIndex());
LOG.debug("Applied snapshot to replicatedLog. " +
"snapshotIndex={}, snapshotTerm={}, journal-size={}",
replicatedLog.size());
// Apply the snapshot to the actors state
- applySnapshot(snapshot.getState());
+ applySnapshot(ByteString.copyFrom(snapshot.getState()));
} else if (message instanceof ReplicatedLogEntry) {
replicatedLog.append((ReplicatedLogEntry) message);
applyState.getReplicatedLogEntry().getData());
} else if(message instanceof ApplySnapshot ) {
- applySnapshot(((ApplySnapshot) message).getSnapshot());
+ Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
+
+ LOG.debug("ApplySnapshot called on Follower Actor " +
+ "snapshotIndex:{}, snapshotTerm:{}", snapshot.getLastAppliedIndex(),
+ snapshot.getLastAppliedTerm());
+ applySnapshot(ByteString.copyFrom(snapshot.getState()));
+
+ //clears the followers log, sets the snapshot index to ensure adjusted-index works
+ replicatedLog = new ReplicatedLogImpl(snapshot);
+ context.setReplicatedLog(replicatedLog);
+ context.setLastApplied(snapshot.getLastAppliedIndex());
} else if (message instanceof FindLeader) {
getSender().tell(
} else if (message instanceof SaveSnapshotSuccess) {
SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
+ LOG.info("SaveSnapshotSuccess received for snapshot");
+
+ context.getReplicatedLog().snapshotCommit();
// TODO: Not sure if we want to be this aggressive with trimming stuff
trimPersistentData(success.metadata().sequenceNr());
} else if (message instanceof SaveSnapshotFailure) {
+ SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message;
+
+ LOG.info("saveSnapshotFailure.metadata():{}", saveSnapshotFailure.metadata().toString());
+ LOG.error(saveSnapshotFailure.cause(), "SaveSnapshotFailure received for snapshot Cause:");
- // TODO: Handle failure in saving the snapshot
+ context.getReplicatedLog().snapshotRollback();
+
+ LOG.info("Replicated Log rollbacked. Snapshot will be attempted in the next cycle." +
+ "snapshotIndex:{}, snapshotTerm:{}, log-size:{}",
+ context.getReplicatedLog().getSnapshotIndex(),
+ context.getReplicatedLog().getSnapshotTerm(),
+ context.getReplicatedLog().size());
} else if (message instanceof AddRaftPeer){
RemoveRaftPeer rrp = (RemoveRaftPeer)message;
context.removePeer(rrp.getName());
+ } else if (message instanceof CaptureSnapshot) {
+ LOG.debug("CaptureSnapshot received by actor");
+ CaptureSnapshot cs = (CaptureSnapshot)message;
+ captureSnapshot = cs;
+ createSnapshot();
+
+ } else if (message instanceof CaptureSnapshotReply){
+ LOG.debug("CaptureSnapshotReply received by actor");
+ CaptureSnapshotReply csr = (CaptureSnapshotReply) message;
+
+ ByteString stateInBytes = csr.getSnapshot();
+ LOG.debug("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size());
+ handleCaptureSnapshotReply(stateInBytes);
+
} else {
+ if (!(message instanceof AppendEntriesMessages.AppendEntries)
+ && !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) {
+ LOG.debug("onReceiveCommand: message:" + message.getClass());
+ }
RaftState state =
currentBehavior.handleMessage(getSender(), message);
*
* @return The current state of the actor
*/
- protected abstract Object createSnapshot();
+ protected abstract void createSnapshot();
/**
* This method will be called by the RaftActor during recovery to
*
* @param snapshot A snapshot of the state of the actor
*/
- protected abstract void applySnapshot(Object snapshot);
+ protected abstract void applySnapshot(ByteString snapshot);
/**
* This method will be called by the RaftActor when the state of the
return peerAddress;
}
+ private void handleCaptureSnapshotReply(ByteString stateInBytes) {
+ // create a snapshot object from the state provided and save it
+ // when snapshot is saved async, SaveSnapshotSuccess is raised.
+
+ Snapshot sn = Snapshot.create(stateInBytes.toByteArray(),
+ context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
+ captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
+ captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
+
+ saveSnapshot(sn);
+
+ LOG.info("Persisting of snapshot done:{}", sn.getLogMessage());
+
+ //be greedy and remove entries from in-mem journal which are in the snapshot
+ // and update snapshotIndex and snapshotTerm without waiting for the success,
+
+ context.getReplicatedLog().snapshotPreCommit(stateInBytes,
+ captureSnapshot.getLastAppliedIndex(),
+ captureSnapshot.getLastAppliedTerm());
+
+ LOG.info("Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
+ "and term:{}", captureSnapshot.getLastAppliedIndex(),
+ captureSnapshot.getLastAppliedTerm());
+
+ captureSnapshot = null;
+ hasSnapshotCaptureInitiated = false;
+ }
+
private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
public ReplicatedLogImpl(Snapshot snapshot) {
- super(snapshot.getState(),
+ super(ByteString.copyFrom(snapshot.getState()),
snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
snapshot.getUnAppliedEntries());
}
persist(replicatedLogEntry,
new Procedure<ReplicatedLogEntry>() {
public void apply(ReplicatedLogEntry evt) throws Exception {
- // FIXME : Tentatively create a snapshot every hundred thousand entries. To be tuned.
- if (journal.size() > context.getConfigParams().getSnapshotBatchCount()) {
+ // when a snaphsot is being taken, captureSnapshot != null
+ if (hasSnapshotCaptureInitiated == false &&
+ journal.size() % context.getConfigParams().getSnapshotBatchCount() == 0) {
+
LOG.info("Initiating Snapshot Capture..");
long lastAppliedIndex = -1;
long lastAppliedTerm = -1;
LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex);
LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm);
- // create a snapshot object from the state provided and save it
- // when snapshot is saved async, SaveSnapshotSuccess is raised.
- Snapshot sn = Snapshot.create(createSnapshot(),
- getFrom(context.getLastApplied() + 1),
- lastIndex(), lastTerm(), lastAppliedIndex,
- lastAppliedTerm);
- saveSnapshot(sn);
-
- LOG.info("Persisting of snapshot done:{}", sn.getLogMessage());
-
- //be greedy and remove entries from in-mem journal which are in the snapshot
- // and update snapshotIndex and snapshotTerm without waiting for the success,
- // TODO: damage-recovery to be done on failure
- journal.subList(0, (int) (lastAppliedIndex - snapshotIndex)).clear();
- snapshotIndex = lastAppliedIndex;
- snapshotTerm = lastAppliedTerm;
-
- LOG.info("Removed in-memory snapshotted entries, " +
- "adjusted snaphsotIndex:{}" +
- "and term:{}", snapshotIndex, lastAppliedTerm);
+ // send a CaptureSnapshot to self to make the expensive operation async.
+ getSelf().tell(new CaptureSnapshot(
+ lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm),
+ null);
+ hasSnapshotCaptureInitiated = true;
}
// Send message for replication
if (clientActor != null) {
}
- private static class Snapshot implements Serializable {
- private final Object state;
- private final List<ReplicatedLogEntry> unAppliedEntries;
- private final long lastIndex;
- private final long lastTerm;
- private final long lastAppliedIndex;
- private final long lastAppliedTerm;
-
- private Snapshot(Object state,
- List<ReplicatedLogEntry> unAppliedEntries, long lastIndex,
- long lastTerm, long lastAppliedIndex, long lastAppliedTerm) {
- this.state = state;
- this.unAppliedEntries = unAppliedEntries;
- this.lastIndex = lastIndex;
- this.lastTerm = lastTerm;
- this.lastAppliedIndex = lastAppliedIndex;
- this.lastAppliedTerm = lastAppliedTerm;
- }
-
-
- public static Snapshot create(Object state,
- List<ReplicatedLogEntry> entries, long lastIndex, long lastTerm,
- long lastAppliedIndex, long lastAppliedTerm) {
- return new Snapshot(state, entries, lastIndex, lastTerm,
- lastAppliedIndex, lastAppliedTerm);
- }
-
- public Object getState() {
- return state;
- }
-
- public List<ReplicatedLogEntry> getUnAppliedEntries() {
- return unAppliedEntries;
- }
-
- public long getLastTerm() {
- return lastTerm;
- }
-
- public long getLastAppliedIndex() {
- return lastAppliedIndex;
- }
-
- public long getLastAppliedTerm() {
- return lastAppliedTerm;
- }
-
- public String getLogMessage() {
- StringBuilder sb = new StringBuilder();
- return sb.append("Snapshot={")
- .append("lastTerm:" + this.getLastTerm() + ", ")
- .append("LastAppliedIndex:" + this.getLastAppliedIndex() + ", ")
- .append("LastAppliedTerm:" + this.getLastAppliedTerm() + ", ")
- .append("UnAppliedEntries size:" + this.getUnAppliedEntries().size() + "}")
- .toString();
-
- }
- }
-
private class ElectionTermImpl implements ElectionTerm {
/**
* Identifier of the actor whose election term information this is
package org.opendaylight.controller.cluster.raft;
+import com.google.protobuf.ByteString;
+
import java.util.List;
/**
*
* @return an object representing the snapshot if it exists. null otherwise
*/
- Object getSnapshot();
+ ByteString getSnapshot();
/**
* Get the index of the snapshot
* otherwise
*/
long getSnapshotTerm();
+
+ /**
+ * sets the snapshot index in the replicated log
+ * @param snapshotIndex
+ */
+ void setSnapshotIndex(long snapshotIndex);
+
+ /**
+ * sets snapshot term
+ * @param snapshotTerm
+ */
+ public void setSnapshotTerm(long snapshotTerm);
+
+ /**
+ * sets the snapshot in bytes
+ * @param snapshot
+ */
+ public void setSnapshot(ByteString snapshot);
+
+ /**
+ * Clears the journal entries with startIndex(inclusive) and endIndex (exclusive)
+ * @param startIndex
+ * @param endIndex
+ */
+ public void clear(int startIndex, int endIndex);
+
+ /**
+ * Handles all the bookkeeping in order to perform a rollback in the
+ * event of SaveSnapshotFailure
+ * @param snapshot
+ * @param snapshotCapturedIndex
+ * @param snapshotCapturedTerm
+ */
+ public void snapshotPreCommit(ByteString snapshot,
+ long snapshotCapturedIndex, long snapshotCapturedTerm);
+
+ /**
+ * Sets the Replicated log to state after snapshot success.
+ */
+ public void snapshotCommit();
+
+ /**
+ * Restores the replicated log to a state in the event of a save snapshot failure
+ */
+ public void snapshotRollback();
}
package org.opendaylight.controller.cluster.raft;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
public class SerializationUtils {
public static Object fromSerializable(Object serializable){
if(serializable.getClass().equals(AppendEntries.SERIALIZABLE_CLASS)){
return AppendEntries.fromSerializable(serializable);
+
+ } else if (serializable.getClass().equals(InstallSnapshot.SERIALIZABLE_CLASS)) {
+ return InstallSnapshot.fromSerializable(serializable);
}
return serializable;
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import java.io.Serializable;
+import java.util.List;
+
+
+public class Snapshot implements Serializable {
+ private final byte[] state;
+ private final List<ReplicatedLogEntry> unAppliedEntries;
+ private final long lastIndex;
+ private final long lastTerm;
+ private final long lastAppliedIndex;
+ private final long lastAppliedTerm;
+
+ private Snapshot(byte[] state,
+ List<ReplicatedLogEntry> unAppliedEntries, long lastIndex,
+ long lastTerm, long lastAppliedIndex, long lastAppliedTerm) {
+ this.state = state;
+ this.unAppliedEntries = unAppliedEntries;
+ this.lastIndex = lastIndex;
+ this.lastTerm = lastTerm;
+ this.lastAppliedIndex = lastAppliedIndex;
+ this.lastAppliedTerm = lastAppliedTerm;
+ }
+
+
+ public static Snapshot create(byte[] state,
+ List<ReplicatedLogEntry> entries, long lastIndex, long lastTerm,
+ long lastAppliedIndex, long lastAppliedTerm) {
+ return new Snapshot(state, entries, lastIndex, lastTerm,
+ lastAppliedIndex, lastAppliedTerm);
+ }
+
+ public byte[] getState() {
+ return state;
+ }
+
+ public List<ReplicatedLogEntry> getUnAppliedEntries() {
+ return unAppliedEntries;
+ }
+
+ public long getLastTerm() {
+ return lastTerm;
+ }
+
+ public long getLastAppliedIndex() {
+ return lastAppliedIndex;
+ }
+
+ public long getLastAppliedTerm() {
+ return lastAppliedTerm;
+ }
+
+ public long getLastIndex() {
+ return this.lastIndex;
+ }
+
+ public String getLogMessage() {
+ StringBuilder sb = new StringBuilder();
+ return sb.append("Snapshot={")
+ .append("lastTerm:" + this.getLastTerm() + ", ")
+ .append("lastIndex:" + this.getLastIndex() + ", ")
+ .append("LastAppliedIndex:" + this.getLastAppliedIndex() + ", ")
+ .append("LastAppliedTerm:" + this.getLastAppliedTerm() + ", ")
+ .append("UnAppliedEntries size:" + this.getUnAppliedEntries().size() + "}")
+ .toString();
+
+ }
+}
package org.opendaylight.controller.cluster.raft.base.messages;
+import org.opendaylight.controller.cluster.raft.Snapshot;
+
import java.io.Serializable;
+/**
+ * Internal message, issued by follower to its actor
+ */
public class ApplySnapshot implements Serializable {
- private final Object snapshot;
+ private final Snapshot snapshot;
- public ApplySnapshot(Object snapshot) {
+ public ApplySnapshot(Snapshot snapshot) {
this.snapshot = snapshot;
}
- public Object getSnapshot() {
+ public Snapshot getSnapshot() {
return snapshot;
}
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.base.messages;
+
+public class CaptureSnapshot {
+ private long lastAppliedIndex;
+ private long lastAppliedTerm;
+ private long lastIndex;
+ private long lastTerm;
+
+ public CaptureSnapshot(long lastIndex, long lastTerm,
+ long lastAppliedIndex, long lastAppliedTerm) {
+ this.lastIndex = lastIndex;
+ this.lastTerm = lastTerm;
+ this.lastAppliedIndex = lastAppliedIndex;
+ this.lastAppliedTerm = lastAppliedTerm;
+ }
+
+ public long getLastAppliedIndex() {
+ return lastAppliedIndex;
+ }
+
+ public long getLastAppliedTerm() {
+ return lastAppliedTerm;
+ }
+
+ public long getLastIndex() {
+ return lastIndex;
+ }
+
+ public long getLastTerm() {
+ return lastTerm;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.base.messages;
+
+import com.google.protobuf.ByteString;
+
+public class CaptureSnapshotReply {
+ private ByteString snapshot;
+
+ public CaptureSnapshotReply(ByteString snapshot) {
+ this.snapshot = snapshot;
+ }
+
+ public ByteString getSnapshot() {
+ return snapshot;
+ }
+
+ public void setSnapshot(ByteString snapshot) {
+ this.snapshot = snapshot;
+ }
+}
* @param index a log index that is known to be committed
*/
protected void applyLogToStateMachine(final long index) {
+ long newLastApplied = context.getLastApplied();
// Now maybe we apply to the state machine
for (long i = context.getLastApplied() + 1;
i < index + 1; i++) {
if (replicatedLogEntry != null) {
actor().tell(new ApplyState(clientActor, identifier,
replicatedLogEntry), actor());
+ newLastApplied = i;
} else {
+ //if one index is not present in the log, no point in looping
+ // around as the rest wont be present either
context.getLogger().error(
- "Missing index " + i + " from log. Cannot apply state.");
+ "Missing index {} from log. Cannot apply state. Ignoring {} to {}", i, i, index );
+ break;
}
}
// Send a local message to the local RaftActor (it's derived class to be
// specific to apply the log to it's index)
- context.getLogger().debug("Setting last applied to {}", index);
- context.setLastApplied(index);
+ context.getLogger().debug("Setting last applied to {}", newLastApplied);
+ context.setLastApplied(newLastApplied);
}
protected Object fromSerializableMessage(Object serializable){
package org.opendaylight.controller.cluster.raft.behaviors;
import akka.actor.ActorRef;
+import com.google.protobuf.ByteString;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import java.util.ArrayList;
+
/**
* The behavior of a RaftActor in the Follower state
* <p/>
* </ul>
*/
public class Follower extends AbstractRaftActorBehavior {
+ private ByteString snapshotChunksCollected = ByteString.EMPTY;
+
public Follower(RaftActorContext context) {
super(context);
if (outOfSync) {
// We found that the log was out of sync so just send a negative
// reply and return
+ context.getLogger().debug("Follower is out-of-sync, " +
+ "so sending negative reply, lastIndex():{}, lastTerm():{}",
+ lastIndex(), lastTerm());
sender.tell(
new AppendEntriesReply(context.getId(), currentTerm(), false,
lastIndex(), lastTerm()), actor()
// If commitIndex > lastApplied: increment lastApplied, apply
// log[lastApplied] to state machine (§5.3)
- if (appendEntries.getLeaderCommit() > context.getLastApplied()) {
+ // check if there are any entries to be applied. last-applied can be equal to last-index
+ if (appendEntries.getLeaderCommit() > context.getLastApplied() &&
+ context.getLastApplied() < lastIndex()) {
+ context.getLogger().debug("applyLogToStateMachine, " +
+ "appendEntries.getLeaderCommit():{}," +
+ "context.getLastApplied():{}, lastIndex():{}",
+ appendEntries.getLeaderCommit(), context.getLastApplied(), lastIndex());
applyLogToStateMachine(appendEntries.getLeaderCommit());
}
} else if (message instanceof InstallSnapshot) {
InstallSnapshot installSnapshot = (InstallSnapshot) message;
- actor().tell(new ApplySnapshot(installSnapshot.getData()), actor());
+ handleInstallSnapshot(sender, installSnapshot);
}
scheduleElection(electionDuration());
return super.handleMessage(sender, message);
}
+ private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) {
+ context.getLogger().debug("InstallSnapshot received by follower " +
+ "datasize:{} , Chunk:{}/{}", installSnapshot.getData().size(),
+ installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks());
+
+ try {
+ if (installSnapshot.getChunkIndex() == installSnapshot.getTotalChunks()) {
+ // this is the last chunk, create a snapshot object and apply
+
+ snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
+ context.getLogger().debug("Last chunk received: snapshotChunksCollected.size:{}",
+ snapshotChunksCollected.size());
+
+ Snapshot snapshot = Snapshot.create(snapshotChunksCollected.toByteArray(),
+ new ArrayList<ReplicatedLogEntry>(),
+ installSnapshot.getLastIncludedIndex(),
+ installSnapshot.getLastIncludedTerm(),
+ installSnapshot.getLastIncludedIndex(),
+ installSnapshot.getLastIncludedTerm());
+
+ actor().tell(new ApplySnapshot(snapshot), actor());
+
+ } else {
+ // we have more to go
+ snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
+ context.getLogger().debug("Chunk={},snapshotChunksCollected.size:{}",
+ installSnapshot.getChunkIndex(), snapshotChunksCollected.size());
+ }
+
+ sender.tell(new InstallSnapshotReply(
+ currentTerm(), context.getId(), installSnapshot.getChunkIndex(),
+ true), actor());
+
+ } catch (Exception e) {
+ context.getLogger().error("Exception in InstallSnapshot of follower", e);
+ //send reply with success as false. The chunk will be sent again on failure
+ sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
+ installSnapshot.getChunkIndex(), false), actor());
+ }
+ }
+
@Override public void close() throws Exception {
stopElection();
}
import akka.actor.ActorSelection;
import akka.actor.Cancellable;
import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import scala.concurrent.duration.FiniteDuration;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
public class Leader extends AbstractRaftActorBehavior {
- private final Map<String, FollowerLogInformation> followerToLog =
+ protected final Map<String, FollowerLogInformation> followerToLog =
new HashMap();
+ protected final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
private final Set<String> followers;
return super.handleMessage(sender, message);
}
- private void handleInstallSnapshotReply(InstallSnapshotReply message) {
- InstallSnapshotReply reply = message;
+ private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
String followerId = reply.getFollowerId();
- FollowerLogInformation followerLogInformation =
- followerToLog.get(followerId);
+ FollowerToSnapshot followerToSnapshot =
+ mapFollowerToSnapshot.get(followerId);
+
+ if (followerToSnapshot != null &&
+ followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
+
+ if (reply.isSuccess()) {
+ if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
+ //this was the last chunk reply
+ context.getLogger().debug("InstallSnapshotReply received, " +
+ "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}",
+ reply.getChunkIndex(), followerId,
+ context.getReplicatedLog().getSnapshotIndex() + 1);
+
+ FollowerLogInformation followerLogInformation =
+ followerToLog.get(followerId);
+ followerLogInformation.setMatchIndex(
+ context.getReplicatedLog().getSnapshotIndex());
+ followerLogInformation.setNextIndex(
+ context.getReplicatedLog().getSnapshotIndex() + 1);
+ mapFollowerToSnapshot.remove(followerId);
+ context.getLogger().debug("followerToLog.get(followerId).getNextIndex().get()=" +
+ followerToLog.get(followerId).getNextIndex().get());
+
+ } else {
+ followerToSnapshot.markSendStatus(true);
+ }
+ } else {
+ context.getLogger().info("InstallSnapshotReply received, " +
+ "sending snapshot chunk failed, Will retry, Chunk:{}",
+ reply.getChunkIndex());
+ followerToSnapshot.markSendStatus(false);
+ }
- followerLogInformation
- .setMatchIndex(context.getReplicatedLog().getSnapshotIndex());
- followerLogInformation
- .setNextIndex(context.getReplicatedLog().getSnapshotIndex() + 1);
+ } else {
+ context.getLogger().error("ERROR!!" +
+ "FollowerId in InstallSnapshotReply not known to Leader" +
+ " or Chunk Index in InstallSnapshotReply not matching {} != {}",
+ followerToSnapshot.getChunkIndex(), reply.getChunkIndex() );
+ }
}
private void replicate(Replicate replicate) {
private void sendAppendEntries() {
// Send an AppendEntries to all followers
for (String followerId : followers) {
- ActorSelection followerActor =
- context.getPeerActorSelection(followerId);
+ ActorSelection followerActor = context.getPeerActorSelection(followerId);
if (followerActor != null) {
- FollowerLogInformation followerLogInformation =
- followerToLog.get(followerId);
-
- long nextIndex = followerLogInformation.getNextIndex().get();
-
+ FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
+ long followerNextIndex = followerLogInformation.getNextIndex().get();
List<ReplicatedLogEntry> entries = Collections.emptyList();
- if (context.getReplicatedLog().isPresent(nextIndex)) {
- // FIXME : Sending one entry at a time
- entries =
- context.getReplicatedLog().getFrom(nextIndex, 1);
+ if (mapFollowerToSnapshot.get(followerId) != null) {
+ if (mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
+ sendSnapshotChunk(followerActor, followerId);
+ }
+
+ } else {
+
+ if (context.getReplicatedLog().isPresent(followerNextIndex)) {
+ // FIXME : Sending one entry at a time
+ entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
+
+ followerActor.tell(
+ new AppendEntries(currentTerm(), context.getId(),
+ prevLogIndex(followerNextIndex),
+ prevLogTerm(followerNextIndex), entries,
+ context.getCommitIndex()).toSerializable(),
+ actor()
+ );
+
+ } else {
+ // if the followers next index is not present in the leaders log, then snapshot should be sent
+ long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
+ long leaderLastIndex = context.getReplicatedLog().lastIndex();
+ if (followerNextIndex >= 0 && leaderLastIndex >= followerNextIndex ) {
+ // if the follower is just not starting and leader's index
+ // is more than followers index
+ context.getLogger().debug("SendInstallSnapshot to follower:{}," +
+ "follower-nextIndex:{}, leader-snapshot-index:{}, " +
+ "leader-last-index:{}", followerId,
+ followerNextIndex, leaderSnapShotIndex, leaderLastIndex);
+
+ actor().tell(new SendInstallSnapshot(), actor());
+ } else {
+ followerActor.tell(
+ new AppendEntries(currentTerm(), context.getId(),
+ prevLogIndex(followerNextIndex),
+ prevLogTerm(followerNextIndex), entries,
+ context.getCommitIndex()).toSerializable(),
+ actor()
+ );
+ }
+ }
}
-
- followerActor.tell(
- new AppendEntries(currentTerm(), context.getId(),
- prevLogIndex(nextIndex),
- prevLogTerm(nextIndex), entries,
- context.getCommitIndex()).toSerializable(),
- actor()
- );
}
}
}
long nextIndex = followerLogInformation.getNextIndex().get();
- if (!context.getReplicatedLog().isPresent(nextIndex) && context
- .getReplicatedLog().isInSnapshot(nextIndex)) {
- followerActor.tell(
- new InstallSnapshot(currentTerm(), context.getId(),
- context.getReplicatedLog().getSnapshotIndex(),
- context.getReplicatedLog().getSnapshotTerm(),
- context.getReplicatedLog().getSnapshot()
- ),
- actor()
- );
+ if (!context.getReplicatedLog().isPresent(nextIndex) &&
+ context.getReplicatedLog().isInSnapshot(nextIndex)) {
+ sendSnapshotChunk(followerActor, followerId);
}
}
}
}
+ /**
+ * Sends a snapshot chunk to a given follower
+ * InstallSnapshot should qualify as a heartbeat too.
+ */
+ private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
+ try {
+ followerActor.tell(
+ new InstallSnapshot(currentTerm(), context.getId(),
+ context.getReplicatedLog().getSnapshotIndex(),
+ context.getReplicatedLog().getSnapshotTerm(),
+ getNextSnapshotChunk(followerId,
+ context.getReplicatedLog().getSnapshot()),
+ mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
+ mapFollowerToSnapshot.get(followerId).getTotalChunks()
+ ).toSerializable(),
+ actor()
+ );
+ context.getLogger().info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
+ followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
+ mapFollowerToSnapshot.get(followerId).getTotalChunks());
+ } catch (IOException e) {
+ context.getLogger().error("InstallSnapshot failed for Leader.", e);
+ }
+ }
+
+ /**
+ * Acccepts snaphot as ByteString, enters into map for future chunks
+ * creates and return a ByteString chunk
+ */
+ private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
+ FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+ if (followerToSnapshot == null) {
+ followerToSnapshot = new FollowerToSnapshot(snapshotBytes);
+ mapFollowerToSnapshot.put(followerId, followerToSnapshot);
+ }
+ ByteString nextChunk = followerToSnapshot.getNextChunk();
+ context.getLogger().debug("Leader's snapshot nextChunk size:{}", nextChunk.size());
+
+ return nextChunk;
+ }
+
private RaftState sendHeartBeat() {
if (followers.size() > 0) {
sendAppendEntries();
return context.getId();
}
+ /**
+ * Encapsulates the snapshot bytestring and handles the logic of sending
+ * snapshot chunks
+ */
+ protected class FollowerToSnapshot {
+ private ByteString snapshotBytes;
+ private int offset = 0;
+ // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
+ private int replyReceivedForOffset;
+ // if replyStatus is false, the previous chunk is attempted
+ private boolean replyStatus = false;
+ private int chunkIndex;
+ private int totalChunks;
+
+ public FollowerToSnapshot(ByteString snapshotBytes) {
+ this.snapshotBytes = snapshotBytes;
+ replyReceivedForOffset = -1;
+ chunkIndex = 1;
+ int size = snapshotBytes.size();
+ totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
+ ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
+ context.getLogger().debug("Snapshot {} bytes, total chunks to send:{}",
+ size, totalChunks);
+ }
+
+ public ByteString getSnapshotBytes() {
+ return snapshotBytes;
+ }
+
+ public int incrementOffset() {
+ if(replyStatus) {
+ // if prev chunk failed, we would want to sent the same chunk again
+ offset = offset + context.getConfigParams().getSnapshotChunkSize();
+ }
+ return offset;
+ }
+
+ public int incrementChunkIndex() {
+ if (replyStatus) {
+ // if prev chunk failed, we would want to sent the same chunk again
+ chunkIndex = chunkIndex + 1;
+ }
+ return chunkIndex;
+ }
+
+ public int getChunkIndex() {
+ return chunkIndex;
+ }
+
+ public int getTotalChunks() {
+ return totalChunks;
+ }
+
+ public boolean canSendNextChunk() {
+ // we only send a false if a chunk is sent but we have not received a reply yet
+ return replyReceivedForOffset == offset;
+ }
+
+ public boolean isLastChunk(int chunkIndex) {
+ return totalChunks == chunkIndex;
+ }
+
+ public void markSendStatus(boolean success) {
+ if (success) {
+ // if the chunk sent was successful
+ replyReceivedForOffset = offset;
+ replyStatus = true;
+ } else {
+ // if the chunk sent was failure
+ replyReceivedForOffset = offset;
+ replyStatus = false;
+ }
+ }
+
+ public ByteString getNextChunk() {
+ int snapshotLength = getSnapshotBytes().size();
+ int start = incrementOffset();
+ int size = context.getConfigParams().getSnapshotChunkSize();
+ if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) {
+ size = snapshotLength;
+ } else {
+ if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
+ size = snapshotLength - start;
+ }
+ }
+
+ context.getLogger().debug("length={}, offset={},size={}",
+ snapshotLength, start, size);
+ return getSnapshotBytes().substring(start, start + size);
+
+ }
+ }
+
}
package org.opendaylight.controller.cluster.raft.messages;
+import com.google.protobuf.ByteString;
+import org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages;
+
public class InstallSnapshot extends AbstractRaftRPC {
+ public static final Class SERIALIZABLE_CLASS = InstallSnapshotMessages.InstallSnapshot.class;
+
private final String leaderId;
private final long lastIncludedIndex;
private final long lastIncludedTerm;
- private final Object data;
+ private final ByteString data;
+ private final int chunkIndex;
+ private final int totalChunks;
- public InstallSnapshot(long term, String leaderId, long lastIncludedIndex, long lastIncludedTerm, Object data) {
+ public InstallSnapshot(long term, String leaderId, long lastIncludedIndex,
+ long lastIncludedTerm, ByteString data, int chunkIndex, int totalChunks) {
super(term);
this.leaderId = leaderId;
this.lastIncludedIndex = lastIncludedIndex;
this.lastIncludedTerm = lastIncludedTerm;
this.data = data;
+ this.chunkIndex = chunkIndex;
+ this.totalChunks = totalChunks;
}
public String getLeaderId() {
return lastIncludedTerm;
}
- public Object getData() {
+ public ByteString getData() {
return data;
}
+
+ public int getChunkIndex() {
+ return chunkIndex;
+ }
+
+ public int getTotalChunks() {
+ return totalChunks;
+ }
+
+ public <T extends Object> Object toSerializable(){
+ return InstallSnapshotMessages.InstallSnapshot.newBuilder()
+ .setLeaderId(this.getLeaderId())
+ .setChunkIndex(this.getChunkIndex())
+ .setData(this.getData())
+ .setLastIncludedIndex(this.getLastIncludedIndex())
+ .setLastIncludedTerm(this.getLastIncludedTerm())
+ .setTotalChunks(this.getTotalChunks()).build();
+
+ }
+
+ public static InstallSnapshot fromSerializable (Object o) {
+ InstallSnapshotMessages.InstallSnapshot from =
+ (InstallSnapshotMessages.InstallSnapshot) o;
+
+ InstallSnapshot installSnapshot = new InstallSnapshot(from.getTerm(),
+ from.getLeaderId(), from.getLastIncludedIndex(),
+ from.getLastIncludedTerm(), from.getData(),
+ from.getChunkIndex(), from.getTotalChunks());
+
+ return installSnapshot;
+ }
}
// The followerId - this will be used to figure out which follower is
// responding
private final String followerId;
+ private final int chunkIndex;
+ private boolean success;
- protected InstallSnapshotReply(long term, String followerId) {
+ public InstallSnapshotReply(long term, String followerId, int chunkIndex,
+ boolean success) {
super(term);
this.followerId = followerId;
+ this.chunkIndex = chunkIndex;
+ this.success = success;
}
public String getFollowerId() {
return followerId;
}
+
+ public int getChunkIndex() {
+ return chunkIndex;
+ }
+
+ public boolean isSuccess() {
+ return success;
+ }
}
--- /dev/null
+// Generated by the protocol buffer compiler. DO NOT EDIT!
+// source: InstallSnapshot.proto
+
+package org.opendaylight.controller.cluster.raft.protobuff.messages;
+
+public final class InstallSnapshotMessages {
+ private InstallSnapshotMessages() {}
+ public static void registerAllExtensions(
+ com.google.protobuf.ExtensionRegistry registry) {
+ }
+ public interface InstallSnapshotOrBuilder
+ extends com.google.protobuf.MessageOrBuilder {
+
+ // optional int64 term = 1;
+ /**
+ * <code>optional int64 term = 1;</code>
+ */
+ boolean hasTerm();
+ /**
+ * <code>optional int64 term = 1;</code>
+ */
+ long getTerm();
+
+ // optional string leaderId = 2;
+ /**
+ * <code>optional string leaderId = 2;</code>
+ */
+ boolean hasLeaderId();
+ /**
+ * <code>optional string leaderId = 2;</code>
+ */
+ java.lang.String getLeaderId();
+ /**
+ * <code>optional string leaderId = 2;</code>
+ */
+ com.google.protobuf.ByteString
+ getLeaderIdBytes();
+
+ // optional int64 lastIncludedIndex = 3;
+ /**
+ * <code>optional int64 lastIncludedIndex = 3;</code>
+ */
+ boolean hasLastIncludedIndex();
+ /**
+ * <code>optional int64 lastIncludedIndex = 3;</code>
+ */
+ long getLastIncludedIndex();
+
+ // optional int64 lastIncludedTerm = 4;
+ /**
+ * <code>optional int64 lastIncludedTerm = 4;</code>
+ */
+ boolean hasLastIncludedTerm();
+ /**
+ * <code>optional int64 lastIncludedTerm = 4;</code>
+ */
+ long getLastIncludedTerm();
+
+ // optional bytes data = 5;
+ /**
+ * <code>optional bytes data = 5;</code>
+ */
+ boolean hasData();
+ /**
+ * <code>optional bytes data = 5;</code>
+ */
+ com.google.protobuf.ByteString getData();
+
+ // optional int32 chunkIndex = 6;
+ /**
+ * <code>optional int32 chunkIndex = 6;</code>
+ */
+ boolean hasChunkIndex();
+ /**
+ * <code>optional int32 chunkIndex = 6;</code>
+ */
+ int getChunkIndex();
+
+ // optional int32 totalChunks = 7;
+ /**
+ * <code>optional int32 totalChunks = 7;</code>
+ */
+ boolean hasTotalChunks();
+ /**
+ * <code>optional int32 totalChunks = 7;</code>
+ */
+ int getTotalChunks();
+ }
+ /**
+ * Protobuf type {@code org.opendaylight.controller.cluster.raft.InstallSnapshot}
+ */
+ public static final class InstallSnapshot extends
+ com.google.protobuf.GeneratedMessage
+ implements InstallSnapshotOrBuilder {
+ // Use InstallSnapshot.newBuilder() to construct.
+ private InstallSnapshot(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
+ super(builder);
+ this.unknownFields = builder.getUnknownFields();
+ }
+ private InstallSnapshot(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+ private static final InstallSnapshot defaultInstance;
+ public static InstallSnapshot getDefaultInstance() {
+ return defaultInstance;
+ }
+
+ public InstallSnapshot getDefaultInstanceForType() {
+ return defaultInstance;
+ }
+
+ private final com.google.protobuf.UnknownFieldSet unknownFields;
+ @java.lang.Override
+ public final com.google.protobuf.UnknownFieldSet
+ getUnknownFields() {
+ return this.unknownFields;
+ }
+ private InstallSnapshot(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ initFields();
+ int mutable_bitField0_ = 0;
+ com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+ com.google.protobuf.UnknownFieldSet.newBuilder();
+ try {
+ boolean done = false;
+ while (!done) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ done = true;
+ break;
+ default: {
+ if (!parseUnknownField(input, unknownFields,
+ extensionRegistry, tag)) {
+ done = true;
+ }
+ break;
+ }
+ case 8: {
+ bitField0_ |= 0x00000001;
+ term_ = input.readInt64();
+ break;
+ }
+ case 18: {
+ bitField0_ |= 0x00000002;
+ leaderId_ = input.readBytes();
+ break;
+ }
+ case 24: {
+ bitField0_ |= 0x00000004;
+ lastIncludedIndex_ = input.readInt64();
+ break;
+ }
+ case 32: {
+ bitField0_ |= 0x00000008;
+ lastIncludedTerm_ = input.readInt64();
+ break;
+ }
+ case 42: {
+ bitField0_ |= 0x00000010;
+ data_ = input.readBytes();
+ break;
+ }
+ case 48: {
+ bitField0_ |= 0x00000020;
+ chunkIndex_ = input.readInt32();
+ break;
+ }
+ case 56: {
+ bitField0_ |= 0x00000040;
+ totalChunks_ = input.readInt32();
+ break;
+ }
+ }
+ }
+ } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+ throw e.setUnfinishedMessage(this);
+ } catch (java.io.IOException e) {
+ throw new com.google.protobuf.InvalidProtocolBufferException(
+ e.getMessage()).setUnfinishedMessage(this);
+ } finally {
+ this.unknownFields = unknownFields.build();
+ makeExtensionsImmutable();
+ }
+ }
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.class, org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.Builder.class);
+ }
+
+ public static com.google.protobuf.Parser<InstallSnapshot> PARSER =
+ new com.google.protobuf.AbstractParser<InstallSnapshot>() {
+ public InstallSnapshot parsePartialFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return new InstallSnapshot(input, extensionRegistry);
+ }
+ };
+
+ @java.lang.Override
+ public com.google.protobuf.Parser<InstallSnapshot> getParserForType() {
+ return PARSER;
+ }
+
+ private int bitField0_;
+ // optional int64 term = 1;
+ public static final int TERM_FIELD_NUMBER = 1;
+ private long term_;
+ /**
+ * <code>optional int64 term = 1;</code>
+ */
+ public boolean hasTerm() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * <code>optional int64 term = 1;</code>
+ */
+ public long getTerm() {
+ return term_;
+ }
+
+ // optional string leaderId = 2;
+ public static final int LEADERID_FIELD_NUMBER = 2;
+ private java.lang.Object leaderId_;
+ /**
+ * <code>optional string leaderId = 2;</code>
+ */
+ public boolean hasLeaderId() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ /**
+ * <code>optional string leaderId = 2;</code>
+ */
+ public java.lang.String getLeaderId() {
+ java.lang.Object ref = leaderId_;
+ if (ref instanceof java.lang.String) {
+ return (java.lang.String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ java.lang.String s = bs.toStringUtf8();
+ if (bs.isValidUtf8()) {
+ leaderId_ = s;
+ }
+ return s;
+ }
+ }
+ /**
+ * <code>optional string leaderId = 2;</code>
+ */
+ public com.google.protobuf.ByteString
+ getLeaderIdBytes() {
+ java.lang.Object ref = leaderId_;
+ if (ref instanceof java.lang.String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ leaderId_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
+ // optional int64 lastIncludedIndex = 3;
+ public static final int LASTINCLUDEDINDEX_FIELD_NUMBER = 3;
+ private long lastIncludedIndex_;
+ /**
+ * <code>optional int64 lastIncludedIndex = 3;</code>
+ */
+ public boolean hasLastIncludedIndex() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ /**
+ * <code>optional int64 lastIncludedIndex = 3;</code>
+ */
+ public long getLastIncludedIndex() {
+ return lastIncludedIndex_;
+ }
+
+ // optional int64 lastIncludedTerm = 4;
+ public static final int LASTINCLUDEDTERM_FIELD_NUMBER = 4;
+ private long lastIncludedTerm_;
+ /**
+ * <code>optional int64 lastIncludedTerm = 4;</code>
+ */
+ public boolean hasLastIncludedTerm() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ /**
+ * <code>optional int64 lastIncludedTerm = 4;</code>
+ */
+ public long getLastIncludedTerm() {
+ return lastIncludedTerm_;
+ }
+
+ // optional bytes data = 5;
+ public static final int DATA_FIELD_NUMBER = 5;
+ private com.google.protobuf.ByteString data_;
+ /**
+ * <code>optional bytes data = 5;</code>
+ */
+ public boolean hasData() {
+ return ((bitField0_ & 0x00000010) == 0x00000010);
+ }
+ /**
+ * <code>optional bytes data = 5;</code>
+ */
+ public com.google.protobuf.ByteString getData() {
+ return data_;
+ }
+
+ // optional int32 chunkIndex = 6;
+ public static final int CHUNKINDEX_FIELD_NUMBER = 6;
+ private int chunkIndex_;
+ /**
+ * <code>optional int32 chunkIndex = 6;</code>
+ */
+ public boolean hasChunkIndex() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ /**
+ * <code>optional int32 chunkIndex = 6;</code>
+ */
+ public int getChunkIndex() {
+ return chunkIndex_;
+ }
+
+ // optional int32 totalChunks = 7;
+ public static final int TOTALCHUNKS_FIELD_NUMBER = 7;
+ private int totalChunks_;
+ /**
+ * <code>optional int32 totalChunks = 7;</code>
+ */
+ public boolean hasTotalChunks() {
+ return ((bitField0_ & 0x00000040) == 0x00000040);
+ }
+ /**
+ * <code>optional int32 totalChunks = 7;</code>
+ */
+ public int getTotalChunks() {
+ return totalChunks_;
+ }
+
+ private void initFields() {
+ term_ = 0L;
+ leaderId_ = "";
+ lastIncludedIndex_ = 0L;
+ lastIncludedTerm_ = 0L;
+ data_ = com.google.protobuf.ByteString.EMPTY;
+ chunkIndex_ = 0;
+ totalChunks_ = 0;
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void writeTo(com.google.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ output.writeInt64(1, term_);
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ output.writeBytes(2, getLeaderIdBytes());
+ }
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ output.writeInt64(3, lastIncludedIndex_);
+ }
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ output.writeInt64(4, lastIncludedTerm_);
+ }
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ output.writeBytes(5, data_);
+ }
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ output.writeInt32(6, chunkIndex_);
+ }
+ if (((bitField0_ & 0x00000040) == 0x00000040)) {
+ output.writeInt32(7, totalChunks_);
+ }
+ getUnknownFields().writeTo(output);
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt64Size(1, term_);
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(2, getLeaderIdBytes());
+ }
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt64Size(3, lastIncludedIndex_);
+ }
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt64Size(4, lastIncludedTerm_);
+ }
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(5, data_);
+ }
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt32Size(6, chunkIndex_);
+ }
+ if (((bitField0_ & 0x00000040) == 0x00000040)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt32Size(7, totalChunks_);
+ }
+ size += getUnknownFields().getSerializedSize();
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @java.lang.Override
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+ com.google.protobuf.ByteString data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+ com.google.protobuf.ByteString data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(byte[] data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+ byte[] data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+ public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input);
+ }
+ public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseDelimitedFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input, extensionRegistry);
+ }
+ public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+ com.google.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder(org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ @java.lang.Override
+ protected Builder newBuilderForType(
+ com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ Builder builder = new Builder(parent);
+ return builder;
+ }
+ /**
+ * Protobuf type {@code org.opendaylight.controller.cluster.raft.InstallSnapshot}
+ */
+ public static final class Builder extends
+ com.google.protobuf.GeneratedMessage.Builder<Builder>
+ implements org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshotOrBuilder {
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.class, org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.Builder.class);
+ }
+
+ // Construct using org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.newBuilder()
+ private Builder() {
+ maybeForceBuilderInitialization();
+ }
+
+ private Builder(
+ com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ super(parent);
+ maybeForceBuilderInitialization();
+ }
+ private void maybeForceBuilderInitialization() {
+ if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+ }
+ }
+ private static Builder create() {
+ return new Builder();
+ }
+
+ public Builder clear() {
+ super.clear();
+ term_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000001);
+ leaderId_ = "";
+ bitField0_ = (bitField0_ & ~0x00000002);
+ lastIncludedIndex_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000004);
+ lastIncludedTerm_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000008);
+ data_ = com.google.protobuf.ByteString.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000010);
+ chunkIndex_ = 0;
+ bitField0_ = (bitField0_ & ~0x00000020);
+ totalChunks_ = 0;
+ bitField0_ = (bitField0_ & ~0x00000040);
+ return this;
+ }
+
+ public Builder clone() {
+ return create().mergeFrom(buildPartial());
+ }
+
+ public com.google.protobuf.Descriptors.Descriptor
+ getDescriptorForType() {
+ return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
+ }
+
+ public org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot getDefaultInstanceForType() {
+ return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.getDefaultInstance();
+ }
+
+ public org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot build() {
+ org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ public org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot buildPartial() {
+ org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot result = new org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot(this);
+ int from_bitField0_ = bitField0_;
+ int to_bitField0_ = 0;
+ if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+ to_bitField0_ |= 0x00000001;
+ }
+ result.term_ = term_;
+ if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+ to_bitField0_ |= 0x00000002;
+ }
+ result.leaderId_ = leaderId_;
+ if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+ to_bitField0_ |= 0x00000004;
+ }
+ result.lastIncludedIndex_ = lastIncludedIndex_;
+ if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+ to_bitField0_ |= 0x00000008;
+ }
+ result.lastIncludedTerm_ = lastIncludedTerm_;
+ if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+ to_bitField0_ |= 0x00000010;
+ }
+ result.data_ = data_;
+ if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+ to_bitField0_ |= 0x00000020;
+ }
+ result.chunkIndex_ = chunkIndex_;
+ if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+ to_bitField0_ |= 0x00000040;
+ }
+ result.totalChunks_ = totalChunks_;
+ result.bitField0_ = to_bitField0_;
+ onBuilt();
+ return result;
+ }
+
+ public Builder mergeFrom(com.google.protobuf.Message other) {
+ if (other instanceof org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot) {
+ return mergeFrom((org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot)other);
+ } else {
+ super.mergeFrom(other);
+ return this;
+ }
+ }
+
+ public Builder mergeFrom(org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot other) {
+ if (other == org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.getDefaultInstance()) return this;
+ if (other.hasTerm()) {
+ setTerm(other.getTerm());
+ }
+ if (other.hasLeaderId()) {
+ bitField0_ |= 0x00000002;
+ leaderId_ = other.leaderId_;
+ onChanged();
+ }
+ if (other.hasLastIncludedIndex()) {
+ setLastIncludedIndex(other.getLastIncludedIndex());
+ }
+ if (other.hasLastIncludedTerm()) {
+ setLastIncludedTerm(other.getLastIncludedTerm());
+ }
+ if (other.hasData()) {
+ setData(other.getData());
+ }
+ if (other.hasChunkIndex()) {
+ setChunkIndex(other.getChunkIndex());
+ }
+ if (other.hasTotalChunks()) {
+ setTotalChunks(other.getTotalChunks());
+ }
+ this.mergeUnknownFields(other.getUnknownFields());
+ return this;
+ }
+
+ public final boolean isInitialized() {
+ return true;
+ }
+
+ public Builder mergeFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parsedMessage = null;
+ try {
+ parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+ } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+ parsedMessage = (org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot) e.getUnfinishedMessage();
+ throw e;
+ } finally {
+ if (parsedMessage != null) {
+ mergeFrom(parsedMessage);
+ }
+ }
+ return this;
+ }
+ private int bitField0_;
+
+ // optional int64 term = 1;
+ private long term_ ;
+ /**
+ * <code>optional int64 term = 1;</code>
+ */
+ public boolean hasTerm() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * <code>optional int64 term = 1;</code>
+ */
+ public long getTerm() {
+ return term_;
+ }
+ /**
+ * <code>optional int64 term = 1;</code>
+ */
+ public Builder setTerm(long value) {
+ bitField0_ |= 0x00000001;
+ term_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional int64 term = 1;</code>
+ */
+ public Builder clearTerm() {
+ bitField0_ = (bitField0_ & ~0x00000001);
+ term_ = 0L;
+ onChanged();
+ return this;
+ }
+
+ // optional string leaderId = 2;
+ private java.lang.Object leaderId_ = "";
+ /**
+ * <code>optional string leaderId = 2;</code>
+ */
+ public boolean hasLeaderId() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ /**
+ * <code>optional string leaderId = 2;</code>
+ */
+ public java.lang.String getLeaderId() {
+ java.lang.Object ref = leaderId_;
+ if (!(ref instanceof java.lang.String)) {
+ java.lang.String s = ((com.google.protobuf.ByteString) ref)
+ .toStringUtf8();
+ leaderId_ = s;
+ return s;
+ } else {
+ return (java.lang.String) ref;
+ }
+ }
+ /**
+ * <code>optional string leaderId = 2;</code>
+ */
+ public com.google.protobuf.ByteString
+ getLeaderIdBytes() {
+ java.lang.Object ref = leaderId_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ leaderId_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+ /**
+ * <code>optional string leaderId = 2;</code>
+ */
+ public Builder setLeaderId(
+ java.lang.String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000002;
+ leaderId_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional string leaderId = 2;</code>
+ */
+ public Builder clearLeaderId() {
+ bitField0_ = (bitField0_ & ~0x00000002);
+ leaderId_ = getDefaultInstance().getLeaderId();
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional string leaderId = 2;</code>
+ */
+ public Builder setLeaderIdBytes(
+ com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000002;
+ leaderId_ = value;
+ onChanged();
+ return this;
+ }
+
+ // optional int64 lastIncludedIndex = 3;
+ private long lastIncludedIndex_ ;
+ /**
+ * <code>optional int64 lastIncludedIndex = 3;</code>
+ */
+ public boolean hasLastIncludedIndex() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ /**
+ * <code>optional int64 lastIncludedIndex = 3;</code>
+ */
+ public long getLastIncludedIndex() {
+ return lastIncludedIndex_;
+ }
+ /**
+ * <code>optional int64 lastIncludedIndex = 3;</code>
+ */
+ public Builder setLastIncludedIndex(long value) {
+ bitField0_ |= 0x00000004;
+ lastIncludedIndex_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional int64 lastIncludedIndex = 3;</code>
+ */
+ public Builder clearLastIncludedIndex() {
+ bitField0_ = (bitField0_ & ~0x00000004);
+ lastIncludedIndex_ = 0L;
+ onChanged();
+ return this;
+ }
+
+ // optional int64 lastIncludedTerm = 4;
+ private long lastIncludedTerm_ ;
+ /**
+ * <code>optional int64 lastIncludedTerm = 4;</code>
+ */
+ public boolean hasLastIncludedTerm() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ /**
+ * <code>optional int64 lastIncludedTerm = 4;</code>
+ */
+ public long getLastIncludedTerm() {
+ return lastIncludedTerm_;
+ }
+ /**
+ * <code>optional int64 lastIncludedTerm = 4;</code>
+ */
+ public Builder setLastIncludedTerm(long value) {
+ bitField0_ |= 0x00000008;
+ lastIncludedTerm_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional int64 lastIncludedTerm = 4;</code>
+ */
+ public Builder clearLastIncludedTerm() {
+ bitField0_ = (bitField0_ & ~0x00000008);
+ lastIncludedTerm_ = 0L;
+ onChanged();
+ return this;
+ }
+
+ // optional bytes data = 5;
+ private com.google.protobuf.ByteString data_ = com.google.protobuf.ByteString.EMPTY;
+ /**
+ * <code>optional bytes data = 5;</code>
+ */
+ public boolean hasData() {
+ return ((bitField0_ & 0x00000010) == 0x00000010);
+ }
+ /**
+ * <code>optional bytes data = 5;</code>
+ */
+ public com.google.protobuf.ByteString getData() {
+ return data_;
+ }
+ /**
+ * <code>optional bytes data = 5;</code>
+ */
+ public Builder setData(com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000010;
+ data_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional bytes data = 5;</code>
+ */
+ public Builder clearData() {
+ bitField0_ = (bitField0_ & ~0x00000010);
+ data_ = getDefaultInstance().getData();
+ onChanged();
+ return this;
+ }
+
+ // optional int32 chunkIndex = 6;
+ private int chunkIndex_ ;
+ /**
+ * <code>optional int32 chunkIndex = 6;</code>
+ */
+ public boolean hasChunkIndex() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ /**
+ * <code>optional int32 chunkIndex = 6;</code>
+ */
+ public int getChunkIndex() {
+ return chunkIndex_;
+ }
+ /**
+ * <code>optional int32 chunkIndex = 6;</code>
+ */
+ public Builder setChunkIndex(int value) {
+ bitField0_ |= 0x00000020;
+ chunkIndex_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional int32 chunkIndex = 6;</code>
+ */
+ public Builder clearChunkIndex() {
+ bitField0_ = (bitField0_ & ~0x00000020);
+ chunkIndex_ = 0;
+ onChanged();
+ return this;
+ }
+
+ // optional int32 totalChunks = 7;
+ private int totalChunks_ ;
+ /**
+ * <code>optional int32 totalChunks = 7;</code>
+ */
+ public boolean hasTotalChunks() {
+ return ((bitField0_ & 0x00000040) == 0x00000040);
+ }
+ /**
+ * <code>optional int32 totalChunks = 7;</code>
+ */
+ public int getTotalChunks() {
+ return totalChunks_;
+ }
+ /**
+ * <code>optional int32 totalChunks = 7;</code>
+ */
+ public Builder setTotalChunks(int value) {
+ bitField0_ |= 0x00000040;
+ totalChunks_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional int32 totalChunks = 7;</code>
+ */
+ public Builder clearTotalChunks() {
+ bitField0_ = (bitField0_ & ~0x00000040);
+ totalChunks_ = 0;
+ onChanged();
+ return this;
+ }
+
+ // @@protoc_insertion_point(builder_scope:org.opendaylight.controller.cluster.raft.InstallSnapshot)
+ }
+
+ static {
+ defaultInstance = new InstallSnapshot(true);
+ defaultInstance.initFields();
+ }
+
+ // @@protoc_insertion_point(class_scope:org.opendaylight.controller.cluster.raft.InstallSnapshot)
+ }
+
+ private static com.google.protobuf.Descriptors.Descriptor
+ internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
+ private static
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable;
+
+ public static com.google.protobuf.Descriptors.FileDescriptor
+ getDescriptor() {
+ return descriptor;
+ }
+ private static com.google.protobuf.Descriptors.FileDescriptor
+ descriptor;
+ static {
+ java.lang.String[] descriptorData = {
+ "\n\025InstallSnapshot.proto\022(org.opendayligh" +
+ "t.controller.cluster.raft\"\235\001\n\017InstallSna" +
+ "pshot\022\014\n\004term\030\001 \001(\003\022\020\n\010leaderId\030\002 \001(\t\022\031\n" +
+ "\021lastIncludedIndex\030\003 \001(\003\022\030\n\020lastIncluded" +
+ "Term\030\004 \001(\003\022\014\n\004data\030\005 \001(\014\022\022\n\nchunkIndex\030\006" +
+ " \001(\005\022\023\n\013totalChunks\030\007 \001(\005BX\n;org.openday" +
+ "light.controller.cluster.raft.protobuff." +
+ "messagesB\027InstallSnapshotMessagesH\001"
+ };
+ com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
+ new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
+ public com.google.protobuf.ExtensionRegistry assignDescriptors(
+ com.google.protobuf.Descriptors.FileDescriptor root) {
+ descriptor = root;
+ internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor =
+ getDescriptor().getMessageTypes().get(0);
+ internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable = new
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+ internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor,
+ new java.lang.String[] { "Term", "LeaderId", "LastIncludedIndex", "LastIncludedTerm", "Data", "ChunkIndex", "TotalChunks", });
+ return null;
+ }
+ };
+ com.google.protobuf.Descriptors.FileDescriptor
+ .internalBuildGeneratedFileFrom(descriptorData,
+ new com.google.protobuf.Descriptors.FileDescriptor[] {
+ }, assigner);
+ }
+
+ // @@protoc_insertion_point(outer_class_scope)
+}
--- /dev/null
+package org.opendaylight.controller.cluster.raft;
+
+option java_package = "org.opendaylight.controller.cluster.raft.protobuff.messages";
+option java_outer_classname = "InstallSnapshotMessages";
+option optimize_for = SPEED;
+
+message InstallSnapshot {
+ optional int64 term = 1;
+ optional string leaderId = 2;
+ optional int64 lastIncludedIndex = 3;
+ optional int64 lastIncludedTerm = 4;
+ optional bytes data = 5;
+ optional int32 chunkIndex = 6;
+ optional int32 totalChunks = 7;
+}
import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;
+import com.google.common.base.Preconditions;
import com.google.protobuf.GeneratedMessage;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
import org.opendaylight.controller.protobuff.messages.cluster.raft.test.MockPayloadMessages;
-import com.google.common.base.Preconditions;
import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
public class MockRaftActorContext implements RaftActorContext {
private final ElectionTerm electionTerm;
private ReplicatedLog replicatedLog;
private Map<String, String> peerAddresses = new HashMap();
+ private ConfigParams configParams;
public MockRaftActorContext(){
electionTerm = null;
}
};
+ configParams = new DefaultConfigParamsImpl();
+
initReplicatedLog();
}
@Override
public ConfigParams getConfigParams() {
- return new DefaultConfigParamsImpl();
+ return configParams;
}
- public static class SimpleReplicatedLog implements ReplicatedLog {
- private final List<ReplicatedLogEntry> log = new ArrayList<>();
-
- @Override public ReplicatedLogEntry get(long index) {
- if(index >= log.size() || index < 0){
- return null;
- }
- return log.get((int) index);
- }
-
- @Override public ReplicatedLogEntry last() {
- if(log.size() == 0){
- return null;
- }
- return log.get(log.size()-1);
- }
-
- @Override public long lastIndex() {
- if(log.size() == 0){
- return -1;
- }
-
- return last().getIndex();
- }
-
- @Override public long lastTerm() {
- if(log.size() == 0){
- return -1;
- }
-
- return last().getTerm();
- }
-
- @Override public void removeFrom(long index) {
- if(index >= log.size() || index < 0){
- return;
- }
-
- log.subList((int) index, log.size()).clear();
- //log.remove((int) index);
- }
-
- @Override public void removeFromAndPersist(long index) {
- removeFrom(index);
- }
-
- @Override public void append(ReplicatedLogEntry replicatedLogEntry) {
- log.add(replicatedLogEntry);
- }
+ public void setConfigParams(ConfigParams configParams) {
+ this.configParams = configParams;
+ }
+ public static class SimpleReplicatedLog extends AbstractReplicatedLogImpl {
@Override public void appendAndPersist(
ReplicatedLogEntry replicatedLogEntry) {
append(replicatedLogEntry);
}
- @Override public List<ReplicatedLogEntry> getFrom(long index) {
- if(index >= log.size() || index < 0){
- return Collections.EMPTY_LIST;
- }
- List<ReplicatedLogEntry> entries = new ArrayList<>();
- for(int i=(int) index ; i < log.size() ; i++) {
- entries.add(get(i));
- }
- return entries;
- }
-
- @Override public List<ReplicatedLogEntry> getFrom(long index, int max) {
- if(index >= log.size() || index < 0){
- return Collections.EMPTY_LIST;
- }
- List<ReplicatedLogEntry> entries = new ArrayList<>();
- int maxIndex = (int) index + max;
- if(maxIndex > log.size()){
- maxIndex = log.size();
- }
-
- for(int i=(int) index ; i < maxIndex ; i++) {
- entries.add(get(i));
- }
- return entries;
-
- }
-
- @Override public long size() {
- return log.size();
- }
-
- @Override public boolean isPresent(long index) {
- if(index >= log.size() || index < 0){
- return false;
- }
-
- return true;
- }
-
- @Override public boolean isInSnapshot(long index) {
- return false;
- }
-
- @Override public Object getSnapshot() {
- return null;
- }
-
- @Override public long getSnapshotIndex() {
- return -1;
- }
-
- @Override public long getSnapshotTerm() {
- return -1;
+ @Override public void removeFromAndPersist(long index) {
+ removeFrom(index);
}
}
import akka.event.Logging;
import akka.japi.Creator;
import akka.testkit.JavaTestKit;
+import com.google.protobuf.ByteString;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
Object data) {
}
- @Override protected Object createSnapshot() {
+ @Override protected void createSnapshot() {
throw new UnsupportedOperationException("createSnapshot");
}
- @Override protected void applySnapshot(Object snapshot) {
+ @Override protected void applySnapshot(ByteString snapshot) {
throw new UnsupportedOperationException("applySnapshot");
}
createActorContext();
context.setLastApplied(100);
- setLastLogEntry((MockRaftActorContext) context, 0, 0, new MockRaftActorContext.MockPayload(""));
+ setLastLogEntry((MockRaftActorContext) context, 1, 100, new MockRaftActorContext.MockPayload(""));
+ ((MockRaftActorContext) context).getReplicatedLog().setSnapshotIndex(99);
List<ReplicatedLogEntry> entries =
Arrays.asList(
- (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(100, 101,
+ (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(2, 101,
new MockRaftActorContext.MockPayload("foo"))
);
// The new commitIndex is 101
AppendEntries appendEntries =
- new AppendEntries(100, "leader-1", 0, 0, entries, 101);
+ new AppendEntries(2, "leader-1", 100, 1, entries, 101);
RaftState raftState =
createBehavior(context).handleMessage(getRef(), appendEntries);
package org.opendaylight.controller.cluster.raft.behaviors;
import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
-import junit.framework.Assert;
+import com.google.protobuf.ByteString;
+import org.junit.Assert;
import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
+import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
+import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
+import org.opendaylight.controller.cluster.raft.SerializationUtils;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
+import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
+import org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages;
import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
public class LeaderTest extends AbstractRaftActorBehaviorTest {
assertEquals("match", out);
}
-
-
};
}};
}
assertEquals("match", out);
}
+ };
+ }};
+ }
+
+ @Test
+ public void testSendInstallSnapshot() {
+ new LeaderTestKit(getSystem()) {{
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+ ActorRef followerActor = getTestActor();
+
+ Map<String, String> peerAddresses = new HashMap();
+ peerAddresses.put(followerActor.path().toString(),
+ followerActor.path().toString());
+
+
+ MockRaftActorContext actorContext =
+ (MockRaftActorContext) createActorContext(getRef());
+ actorContext.setPeerAddresses(peerAddresses);
+
+
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
+
+ //clears leaders log
+ actorContext.getReplicatedLog().removeFrom(0);
+
+ final int followersLastIndex = 2;
+ final int snapshotIndex = 3;
+ final int newEntryIndex = 4;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
+
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshot(
+ toByteString(leadersSnapshot));
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+
+ MockLeader leader = new MockLeader(actorContext);
+ // set the follower info in leader
+ leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
+
+ // new entry
+ ReplicatedLogImplEntry entry =
+ new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+ new MockRaftActorContext.MockPayload("D"));
+
+ // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
+ RaftState raftState = leader.handleMessage(
+ senderActor, new Replicate(null, "state-id", entry));
+
+ assertEquals(RaftState.Leader, raftState);
+
+ // we might receive some heartbeat messages, so wait till we SendInstallSnapshot
+ Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
+ @Override
+ protected Boolean match(Object o) throws Exception {
+ if (o instanceof SendInstallSnapshot) {
+ return true;
+ }
+ return false;
+ }
+ }.get();
+
+ boolean sendInstallSnapshotReceived = false;
+ for (Boolean b: matches) {
+ sendInstallSnapshotReceived = b | sendInstallSnapshotReceived;
+ }
+
+ assertTrue(sendInstallSnapshotReceived);
+
+ }
+ };
+ }};
+ }
+
+ @Test
+ public void testInstallSnapshot() {
+ new LeaderTestKit(getSystem()) {{
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+ ActorRef followerActor = getTestActor();
+
+ Map<String, String> peerAddresses = new HashMap();
+ peerAddresses.put(followerActor.path().toString(),
+ followerActor.path().toString());
+
+ MockRaftActorContext actorContext =
+ (MockRaftActorContext) createActorContext();
+ actorContext.setPeerAddresses(peerAddresses);
+
+
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
+
+ //clears leaders log
+ actorContext.getReplicatedLog().removeFrom(0);
+
+ final int followersLastIndex = 2;
+ final int snapshotIndex = 3;
+ final int newEntryIndex = 4;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
+
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshot(toByteString(leadersSnapshot));
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+
+ actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+ MockLeader leader = new MockLeader(actorContext);
+ // set the follower info in leader
+ leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
+
+ // new entry
+ ReplicatedLogImplEntry entry =
+ new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+ new MockRaftActorContext.MockPayload("D"));
+
+
+ RaftState raftState = leader.handleMessage(senderActor, new SendInstallSnapshot());
+
+ assertEquals(RaftState.Leader, raftState);
+
+ // check if installsnapshot gets called with the correct values.
+ final String out =
+ new ExpectMsg<String>(duration("1 seconds"), "match hint") {
+ // do not put code outside this method, will run afterwards
+ protected String match(Object in) {
+ if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
+ InstallSnapshot is = (InstallSnapshot)
+ SerializationUtils.fromSerializable(in);
+ if (is.getData() == null) {
+ return "InstallSnapshot data is null";
+ }
+ if (is.getLastIncludedIndex() != snapshotIndex) {
+ return is.getLastIncludedIndex() + "!=" + snapshotIndex;
+ }
+ if (is.getLastIncludedTerm() != snapshotTerm) {
+ return is.getLastIncludedTerm() + "!=" + snapshotTerm;
+ }
+ if (is.getTerm() == currentTerm) {
+ return is.getTerm() + "!=" + currentTerm;
+ }
+
+ return "match";
+
+ } else {
+ return "message mismatch:" + in.getClass();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ assertEquals("match", out);
+ }
+ };
+ }};
+ }
+
+ @Test
+ public void testHandleInstallSnapshotReplyLastChunk() {
+ new LeaderTestKit(getSystem()) {{
+ new Within(duration("1 seconds")) {
+ protected void run() {
+ ActorRef followerActor = getTestActor();
+
+ Map<String, String> peerAddresses = new HashMap();
+ peerAddresses.put(followerActor.path().toString(),
+ followerActor.path().toString());
+
+ MockRaftActorContext actorContext =
+ (MockRaftActorContext) createActorContext();
+ actorContext.setPeerAddresses(peerAddresses);
+
+ final int followersLastIndex = 2;
+ final int snapshotIndex = 3;
+ final int newEntryIndex = 4;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
+
+ MockLeader leader = new MockLeader(actorContext);
+ // set the follower info in leader
+ leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
+
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
+
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshot(
+ toByteString(leadersSnapshot));
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+ ByteString bs = toByteString(leadersSnapshot);
+ leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
+ while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
+ leader.getFollowerToSnapshot().getNextChunk();
+ leader.getFollowerToSnapshot().incrementChunkIndex();
+ }
+
+ //clears leaders log
+ actorContext.getReplicatedLog().removeFrom(0);
+ RaftState raftState = leader.handleMessage(senderActor,
+ new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
+ leader.getFollowerToSnapshot().getChunkIndex(), true));
+ assertEquals(RaftState.Leader, raftState);
+
+ assertEquals(leader.mapFollowerToSnapshot.size(), 0);
+ assertEquals(leader.followerToLog.size(), 1);
+ assertNotNull(leader.followerToLog.get(followerActor.path().toString()));
+ FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString());
+ assertEquals(snapshotIndex, fli.getMatchIndex().get());
+ assertEquals(snapshotIndex, fli.getMatchIndex().get());
+ assertEquals(snapshotIndex + 1, fli.getNextIndex().get());
+ }
};
}};
}
+ @Test
+ public void testFollowerToSnapshotLogic() {
+
+ MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
+
+ actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+ @Override
+ public int getSnapshotChunkSize() {
+ return 50;
+ }
+ });
+
+ MockLeader leader = new MockLeader(actorContext);
+
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
+
+ ByteString bs = toByteString(leadersSnapshot);
+ byte[] barray = bs.toByteArray();
+
+ leader.createFollowerToSnapshot("followerId", bs);
+ assertEquals(bs.size(), barray.length);
+
+ int chunkIndex=0;
+ for (int i=0; i < barray.length; i = i + 50) {
+ int j = i + 50;
+ chunkIndex++;
+
+ if (i + 50 > barray.length) {
+ j = barray.length;
+ }
+
+ ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
+ assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
+ assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
+
+ leader.getFollowerToSnapshot().markSendStatus(true);
+ if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
+ leader.getFollowerToSnapshot().incrementChunkIndex();
+ }
+ }
+
+ assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
+ }
+
+
@Override protected RaftActorBehavior createBehavior(
RaftActorContext actorContext) {
return new Leader(actorContext);
}
@Override protected RaftActorContext createActorContext() {
- return new MockRaftActorContext("test", getSystem(), leaderActor);
+ return createActorContext(leaderActor);
+ }
+
+ protected RaftActorContext createActorContext(ActorRef actorRef) {
+ return new MockRaftActorContext("test", getSystem(), actorRef);
+ }
+
+ private ByteString toByteString(Map<String, String> state) {
+ ByteArrayOutputStream b = null;
+ ObjectOutputStream o = null;
+ try {
+ try {
+ b = new ByteArrayOutputStream();
+ o = new ObjectOutputStream(b);
+ o.writeObject(state);
+ byte[] snapshotBytes = b.toByteArray();
+ return ByteString.copyFrom(snapshotBytes);
+ } finally {
+ if (o != null) {
+ o.flush();
+ o.close();
+ }
+ if (b != null) {
+ b.close();
+ }
+ }
+ } catch (IOException e) {
+ Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
+ }
+ return null;
+ }
+
+ private static class LeaderTestKit extends JavaTestKit {
+
+ private LeaderTestKit(ActorSystem actorSystem) {
+ super(actorSystem);
+ }
+
+ protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){
+ // Wait for a specific log message to show up
+ final boolean result =
+ new JavaTestKit.EventFilter<Boolean>(logLevel
+ ) {
+ @Override
+ protected Boolean run() {
+ return true;
+ }
+ }.from(subject.path().toString())
+ .message(logMessage)
+ .occurrences(1).exec();
+
+ Assert.assertEquals(true, result);
+
+ }
+ }
+
+ class MockLeader extends Leader {
+
+ FollowerToSnapshot fts;
+
+ public MockLeader(RaftActorContext context){
+ super(context);
+ }
+
+ public void addToFollowerToLog(String followerId, long nextIndex, long matchIndex) {
+ FollowerLogInformation followerLogInformation =
+ new FollowerLogInformationImpl(followerId,
+ new AtomicLong(nextIndex),
+ new AtomicLong(matchIndex));
+ followerToLog.put(followerId, followerLogInformation);
+ }
+
+ public FollowerToSnapshot getFollowerToSnapshot() {
+ return fts;
+ }
+
+ public void createFollowerToSnapshot(String followerId, ByteString bs ) {
+ fts = new FollowerToSnapshot(bs);
+ mapFollowerToSnapshot.put(followerId, fts);
+
+ }
}
}
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-
+import com.google.protobuf.ByteString;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
identifier, clientActor.path().toString());
}
-
} else {
LOG.error("Unknown state received {}", data);
}
}
- @Override protected Object createSnapshot() {
+ @Override protected void createSnapshot() {
throw new UnsupportedOperationException("createSnapshot");
}
- @Override protected void applySnapshot(Object snapshot) {
+ @Override protected void applySnapshot(ByteString snapshot) {
throw new UnsupportedOperationException("applySnapshot");
}