context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, currentBehavior));
context.setLastApplied(snapshot.getLastAppliedIndex());
context.setCommitIndex(snapshot.getLastAppliedIndex());
+ context.getTermInformation().update(snapshot.getElectionTerm(), snapshot.getElectionVotedFor());
Stopwatch timer = Stopwatch.createStarted();
private final long lastTerm;
private final long lastAppliedIndex;
private final long lastAppliedTerm;
+ private final long electionTerm;
+ private final String electionVotedFor;
- private Snapshot(byte[] state,
- List<ReplicatedLogEntry> unAppliedEntries, long lastIndex,
- long lastTerm, long lastAppliedIndex, long lastAppliedTerm) {
+ private Snapshot(byte[] state, List<ReplicatedLogEntry> unAppliedEntries, long lastIndex, long lastTerm,
+ long lastAppliedIndex, long lastAppliedTerm, long electionTerm, String electionVotedFor) {
this.state = state;
this.unAppliedEntries = unAppliedEntries;
this.lastIndex = lastIndex;
this.lastTerm = lastTerm;
this.lastAppliedIndex = lastAppliedIndex;
this.lastAppliedTerm = lastAppliedTerm;
+ this.electionTerm = electionTerm;
+ this.electionVotedFor = electionVotedFor;
}
+ public static Snapshot create(byte[] state, List<ReplicatedLogEntry> entries, long lastIndex, long lastTerm,
+ long lastAppliedIndex, long lastAppliedTerm) {
+ return new Snapshot(state, entries, lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, -1, null);
+ }
- public static Snapshot create(byte[] state,
- List<ReplicatedLogEntry> entries, long lastIndex, long lastTerm,
- long lastAppliedIndex, long lastAppliedTerm) {
- return new Snapshot(state, entries, lastIndex, lastTerm,
- lastAppliedIndex, lastAppliedTerm);
+ public static Snapshot create(byte[] state, List<ReplicatedLogEntry> entries, long lastIndex, long lastTerm,
+ long lastAppliedIndex, long lastAppliedTerm, long electionTerm, String electionVotedFor) {
+ return new Snapshot(state, entries, lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm,
+ electionTerm, electionVotedFor);
}
public byte[] getState() {
return this.lastIndex;
}
- public String getLogMessage() {
- StringBuilder sb = new StringBuilder();
- return sb.append("Snapshot={")
- .append("lastTerm:" + this.getLastTerm() + ", ")
- .append("lastIndex:" + this.getLastIndex() + ", ")
- .append("LastAppliedIndex:" + this.getLastAppliedIndex() + ", ")
- .append("LastAppliedTerm:" + this.getLastAppliedTerm() + ", ")
- .append("UnAppliedEntries size:" + this.getUnAppliedEntries().size() + "}")
- .toString();
+ public long getElectionTerm() {
+ return electionTerm;
+ }
+
+
+ public String getElectionVotedFor() {
+ return electionVotedFor;
+ }
+ @Override
+ public String toString() {
+ return "Snapshot [lastIndex=" + lastIndex + ", lastTerm=" + lastTerm + ", lastAppliedIndex=" + lastAppliedIndex
+ + ", lastAppliedTerm=" + lastAppliedTerm + ", unAppliedEntries size=" + unAppliedEntries.size()
+ + ", state size=" + state.length + ", electionTerm=" + electionTerm + ", electionVotedFor=" + electionVotedFor
+ + "]";
}
}
Snapshot snapshot = Snapshot.create(snapshotBytes,
captureSnapshot.getUnAppliedEntries(),
captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
- captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
+ captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm(),
+ context.getTermInformation().getCurrentTerm(),
+ context.getTermInformation().getVotedFor());
context.getPersistenceProvider().saveSnapshot(snapshot);
- LOG.info("{}: Persisting of snapshot done: {}", persistenceId(), snapshot.getLogMessage());
+ LOG.info("{}: Persisting of snapshot done: {}", persistenceId(), snapshot);
long dataThreshold = totalMemory *
context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
boolean logSizeExceededSnapshotBatchCount =
context.getReplicatedLog().size() >= context.getConfigParams().getSnapshotBatchCount();
-LOG.debug("Log size: {}, getSnapshotBatchCount: {}",context.getReplicatedLog().size(),context.getConfigParams().getSnapshotBatchCount());
+
if (dataSizeThresholdExceeded || logSizeExceededSnapshotBatchCount) {
if(LOG.isDebugEnabled()) {
if(dataSizeThresholdExceeded) {
installSnapshot.getLastIncludedIndex(),
installSnapshot.getLastIncludedTerm(),
installSnapshot.getLastIncludedIndex(),
- installSnapshot.getLastIncludedTerm());
+ installSnapshot.getLastIncludedTerm(),
+ context.getTermInformation().getCurrentTerm(),
+ context.getTermInformation().getVotedFor());
actor().tell(new ApplySnapshot(snapshot), actor());
ReplicatedLogEntry unAppliedEntry2 = new MockRaftActorContext.MockReplicatedLogEntry(1,
5, new MockRaftActorContext.MockPayload("5", 5));
- int lastAppliedDuringSnapshotCapture = 3;
- int lastIndexDuringSnapshotCapture = 5;
+ long lastAppliedDuringSnapshotCapture = 3;
+ long lastIndexDuringSnapshotCapture = 5;
+ long electionTerm = 2;
+ String electionVotedFor = "member-2";
Snapshot snapshot = Snapshot.create(snapshotBytes, Arrays.asList(unAppliedEntry1, unAppliedEntry2),
- lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1);
+ lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor);
SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
assertEquals("Commit index", lastAppliedDuringSnapshotCapture, context.getCommitIndex());
assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
+ assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
+ assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
verify(mockCohort).applyRecoverySnapshot(snapshotBytes);
}
import akka.actor.Terminated;
import akka.dispatch.Dispatchers;
import akka.japi.Procedure;
-import akka.persistence.RecoveryCompleted;
import akka.persistence.SaveSnapshotFailure;
import akka.persistence.SaveSnapshotSuccess;
import akka.persistence.SnapshotMetadata;
MockRaftActor leaderActor = mockActorRef.underlyingActor();
- leaderActor.handleRecover(RecoveryCompleted.getInstance());
+ leaderActor.waitForRecoveryComplete();
leaderActor.handleCommand(new SwitchBehavior(RaftState.Follower, 100));
assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
-
-
}
public static ByteString fromObject(Object snapshot) throws Exception {
@Mock
private Procedure<Void> mockProcedure;
+ @Mock
+ private ElectionTerm mockElectionTerm;
+
private SnapshotManager snapshotManager;
private TestActorFactory factory;
doReturn(mockDataPersistenceProvider).when(mockRaftActorContext).getPersistenceProvider();
doReturn("123").when(mockRaftActorBehavior).getLeaderId();
- ElectionTerm mockElectionTerm = mock(ElectionTerm.class);
doReturn(mockElectionTerm).when(mockRaftActorContext).getTermInformation();
doReturn(5L).when(mockElectionTerm).getCurrentTerm();
+ doReturn("member5").when(mockElectionTerm).getVotedFor();
snapshotManager = new SnapshotManager(mockRaftActorContext, LoggerFactory.getLogger(this.getClass()));
factory = new TestActorFactory(getSystem());
assertEquals("getLastAppliedIndex", 8L, snapshot.getLastAppliedIndex());
assertArrayEquals("getState", bytes, snapshot.getState());
assertEquals("getUnAppliedEntries", Arrays.asList(lastLogEntry), snapshot.getUnAppliedEntries());
+ assertEquals("electionTerm", mockElectionTerm.getCurrentTerm(), snapshot.getElectionTerm());
+ assertEquals("electionVotedFor", mockElectionTerm.getVotedFor(), snapshot.getElectionVotedFor());
verify(mockReplicatedLog).snapshotPreCommit(7L, 1L);
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
+
+/**
+ * Unit tests for Snapshot.
+ *
+ * @author Thomas Pantelis
+ */
+public class SnapshotTest {
+
+ @Test
+ public void testBackwardsCompatibleDeserializationFromLithium() throws Exception {
+ Snapshot expSnapshot = newLithiumSnapshot();
+ try(FileInputStream fis = new FileInputStream("src/test/resources/lithium-serialized-Snapshot")) {
+ ObjectInputStream ois = new ObjectInputStream(fis);
+
+ Snapshot snapshot = (Snapshot) ois.readObject();
+ ois.close();
+
+ assertEquals("lastIndex", expSnapshot.getLastIndex(), snapshot.getLastIndex());
+ assertEquals("lastTerm", expSnapshot.getLastTerm(), snapshot.getLastTerm());
+ assertEquals("lastAppliedIndex", expSnapshot.getLastAppliedIndex(), snapshot.getLastAppliedIndex());
+ assertEquals("lastAppliedTerm", expSnapshot.getLastAppliedTerm(), snapshot.getLastAppliedTerm());
+ assertEquals("unAppliedEntries size", expSnapshot.getUnAppliedEntries().size(), snapshot.getUnAppliedEntries().size());
+ assertArrayEquals("state", expSnapshot.getState(), snapshot.getState());
+ assertEquals("electionTerm", 0, snapshot.getElectionTerm());
+ assertEquals("electionVotedFor", null, snapshot.getElectionVotedFor());
+ }
+ }
+
+ private Snapshot newLithiumSnapshot() {
+ byte[] state = {1, 2, 3, 4, 5};
+ List<ReplicatedLogEntry> entries = new ArrayList<>();
+ entries.add(new ReplicatedLogImplEntry(6, 2, new MockPayload("payload")));
+ long lastIndex = 6;
+ long lastTerm = 2;
+ long lastAppliedIndex = 5;
+ long lastAppliedTerm = 1;
+
+ return Snapshot.create(state, entries, lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm);
+ }
+
+ /**
+ * Use this method to generate a file with a serialized Snapshot instance to be
+ * used in tests that verify backwards compatible de-serialization.
+ */
+ private void generateSerializedFile(Snapshot snapshot, String fileName) throws IOException {
+ FileOutputStream fos = new FileOutputStream("src/test/resources/" + fileName);
+ ObjectOutputStream oos = new ObjectOutputStream(fos);
+ oos.writeObject(snapshot);
+ fos.close();
+ }
+}
logStart("testHandleInstallSnapshot");
MockRaftActorContext context = createActorContext();
+ context.getTermInformation().update(1, "leader");
follower = createBehavior(context);
snapshot.getLastAppliedIndex());
assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
+ assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
+ assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
leaderActor, InstallSnapshotReply.class);