import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import com.google.common.collect.ImmutableMap;
+import com.google.common.io.ByteSource;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.ByteString;
import java.util.Arrays;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.RaftVersions;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.VotingState;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
+import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.SnapshotHolder;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.ByteState;
import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
leader.markFollowerActive(FOLLOWER_ID);
ByteString bs = toByteString(leadersSnapshot);
- leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
- commitIndex, snapshotTerm, commitIndex, snapshotTerm));
+ leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
+ Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+ -1, null, null), ByteSource.wrap(bs.toByteArray())));
LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
fts.setSnapshotBytes(bs);
CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
- assertTrue(cs.isInstallSnapshotInitiated());
assertEquals(3, cs.getLastAppliedIndex());
assertEquals(1, cs.getLastAppliedTerm());
assertEquals(4, cs.getLastIndex());
CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
- assertTrue(cs.isInstallSnapshotInitiated());
assertEquals(3, cs.getLastAppliedIndex());
assertEquals(1, cs.getLastAppliedTerm());
assertEquals(4, cs.getLastIndex());
leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
leader.getFollower(FOLLOWER_ID).setNextIndex(0);
- Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
- Collections.<ReplicatedLogEntry>emptyList(),
- lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
+ byte[] bytes = toByteString(leadersSnapshot).toByteArray();
+ Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
+ lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
- RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
+ RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
+ new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
assertTrue(raftBehavior instanceof Leader);
leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
- Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
- Collections.<ReplicatedLogEntry>emptyList(),
- lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
+ byte[] bytes = toByteString(leadersSnapshot).toByteArray();
+ Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
+ lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
- RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
+ RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
+ new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes)));
assertTrue(raftBehavior instanceof Leader);
actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
ByteString bs = toByteString(leadersSnapshot);
- leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
- commitIndex, snapshotTerm, commitIndex, snapshotTerm));
+ leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
+ Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+ -1, null, null), ByteSource.wrap(bs.toByteArray())));
LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
fts.setSnapshotBytes(bs);
actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
ByteString bs = toByteString(leadersSnapshot);
- Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
- commitIndex, snapshotTerm, commitIndex, snapshotTerm);
- leader.setSnapshot(snapshot);
+ Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
+ Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+ -1, null, null);
- leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
+ leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
InstallSnapshot.class);
actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
ByteString bs = toByteString(leadersSnapshot);
- Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
- commitIndex, snapshotTerm, commitIndex, snapshotTerm);
- leader.setSnapshot(snapshot);
+ Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
+ Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+ -1, null, null);
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
- leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
+ leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
InstallSnapshot.class);
actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
ByteString bs = toByteString(leadersSnapshot);
- Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
- commitIndex, snapshotTerm, commitIndex, snapshotTerm);
- leader.setSnapshot(snapshot);
+ Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
+ Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+ -1, null, null);
- leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
+ leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
InstallSnapshot.class);