import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.io.ByteSource;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.ByteString;
+import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.cluster.raft.MockRaftActor;
import org.opendaylight.controller.cluster.raft.MockRaftActor.Builder;
+import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
-import org.opendaylight.controller.cluster.raft.RaftActorTest;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.ByteState;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
@Override
protected MockRaftActorContext createActorContext(ActorRef actorRef) {
MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
- context.setPayloadVersion(payloadVersion );
+ context.setPayloadVersion(payloadVersion);
return context;
}
// AppendEntries is now sent with a bigger term
// this will set the receivers term to be the same as the sender's term
- AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0);
+ AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, Collections.emptyList(), 101, -1,
+ (short)0);
follower = createBehavior(context);
follower = createBehavior(context);
- ByteString bsSnapshot = createSnapshot();
+ ByteString bsSnapshot = createSnapshot();
int offset = 0;
int snapshotLength = bsSnapshot.size();
int chunkSize = 50;
assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
snapshot.getLastAppliedIndex());
assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
- Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
+ assertEquals("getState type", ByteState.class, snapshot.getState().getClass());
+ Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), ((ByteState)snapshot.getState()).getBytes());
assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
applySnapshot.getCallback().onSuccess();
InMemorySnapshotStore.addSnapshotSavedLatch(id);
InMemoryJournal.addDeleteMessagesCompleteLatch(id);
+ InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
List<ReplicatedLogEntry> entries = Arrays.asList(
newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"));
final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
InMemoryJournal.waitForDeleteMessagesComplete(id);
+ InMemoryJournal.waitForWriteMessagesComplete(id);
// We expect the ApplyJournalEntries for index 1 to remain in the persisted log b/c it's still queued for
// persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
// This is OK - on recovery it will be a no-op since index 1 has already been applied.
assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
assertEquals("Snapshot getLastIndex", 1, snapshot.getLastIndex());
assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData()),
- MockRaftActor.toObject(snapshot.getState()));
+ MockRaftActor.fromState(snapshot.getState()));
}
@Test
InMemorySnapshotStore.addSnapshotSavedLatch(id);
InMemoryJournal.addDeleteMessagesCompleteLatch(id);
+ InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
List<ReplicatedLogEntry> entries = Arrays.asList(
newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
InMemoryJournal.waitForDeleteMessagesComplete(id);
+ InMemoryJournal.waitForWriteMessagesComplete(id);
// We expect the ApplyJournalEntries for index 2 to remain in the persisted log b/c it's still queued for
// persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
// This is OK - on recovery it will be a no-op since index 2 has already been applied.
assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(),
- entries.get(2).getData()), MockRaftActor.toObject(snapshot.getState()));
+ entries.get(2).getData()), MockRaftActor.fromState(snapshot.getState()));
assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
assertEquals("Snapshot index", 2, followerRaftActor.get().getReplicatedLog().getSnapshotIndex());
InMemorySnapshotStore.addSnapshotSavedLatch(id);
InMemoryJournal.addDeleteMessagesCompleteLatch(id);
+ InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
List<ReplicatedLogEntry> entries = Arrays.asList(
newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
InMemoryJournal.waitForDeleteMessagesComplete(id);
+ InMemoryJournal.waitForWriteMessagesComplete(id);
// We expect the ApplyJournalEntries for index 0 to remain in the persisted log b/c it's still queued for
// persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
// This is OK - on recovery it will be a no-op since index 0 has already been applied.
assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData()),
- MockRaftActor.toObject(snapshot.getState()));
+ MockRaftActor.fromState(snapshot.getState()));
}
@SuppressWarnings("checkstyle:IllegalCatch")
private RaftActorSnapshotCohort newRaftActorSnapshotCohort(final AtomicReference<MockRaftActor> followerRaftActor) {
RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
@Override
- public void createSnapshot(ActorRef actorRef) {
+ public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
try {
- actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(
- followerRaftActor.get().getState()).toByteArray()), actorRef);
+ actorRef.tell(new CaptureSnapshotReply(new MockSnapshotState(followerRaftActor.get().getState()),
+ installSnapshotStream), actorRef);
} catch (Exception e) {
Throwables.propagate(e);
}
}
@Override
- public void applySnapshot(byte[] snapshotBytes) {
+ public void applySnapshot(State snapshotState) {
+ }
+
+ @Override
+ public State deserializeSnapshot(ByteSource snapshotBytes) {
+ throw new UnsupportedOperationException();
}
};
return snapshotCohort;