import akka.persistence.SaveSnapshotSuccess;
import akka.persistence.SnapshotOffer;
import akka.persistence.SnapshotSelectionCriteria;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
import com.google.protobuf.ByteString;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
+import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
-import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
-import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
context.getReplicatedLog().getSnapshotTerm(),
context.getReplicatedLog().size());
- } else if (message instanceof AddRaftPeer){
-
- // FIXME : Do not add raft peers like this.
- // When adding a new Peer we have to ensure that the a majority of
- // the peers know about the new Peer. Doing it this way may cause
- // a situation where multiple Leaders may emerge
- AddRaftPeer arp = (AddRaftPeer)message;
- context.addToPeers(arp.getName(), arp.getAddress());
-
- } else if (message instanceof RemoveRaftPeer){
-
- RemoveRaftPeer rrp = (RemoveRaftPeer)message;
- context.removePeer(rrp.getName());
-
} else if (message instanceof CaptureSnapshot) {
LOG.info("CaptureSnapshot received by actor");
CaptureSnapshot cs = (CaptureSnapshot)message;
}
public java.util.Set<String> getPeers() {
+
return context.getPeerAddresses().keySet();
}
//be greedy and remove entries from in-mem journal which are in the snapshot
// and update snapshotIndex and snapshotTerm without waiting for the success,
- context.getReplicatedLog().snapshotPreCommit(stateInBytes,
+ context.getReplicatedLog().snapshotPreCommit(
captureSnapshot.getLastAppliedIndex(),
captureSnapshot.getLastAppliedTerm());
"and term:{}", captureSnapshot.getLastAppliedIndex(),
captureSnapshot.getLastAppliedTerm());
+ if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) {
+ // this would be call straight to the leader and won't initiate in serialization
+ currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot(stateInBytes));
+ }
+
captureSnapshot = null;
hasSnapshotCaptureInitiated = false;
}
-
private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
public ReplicatedLogImpl(Snapshot snapshot) {
- super(ByteString.copyFrom(snapshot.getState()),
- snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
+ super(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
snapshot.getUnAppliedEntries());
}
}
static class DeleteEntries implements Serializable {
+ private static final long serialVersionUID = 1L;
private final int fromIndex;
-
public DeleteEntries(int fromIndex) {
this.fromIndex = fromIndex;
}
}
static class UpdateElectionTerm implements Serializable {
+ private static final long serialVersionUID = 1L;
private final long currentTerm;
private final String votedFor;
}
}
+ @VisibleForTesting
+ void setCurrentBehavior(AbstractRaftActorBehavior behavior) {
+ currentBehavior = behavior;
+ }
+
+ protected RaftActorBehavior getCurrentBehavior() {
+ return currentBehavior;
+ }
}