Add election info to Snapshot 95/27495/6
authorTom Pantelis <tpanteli@brocade.com>
Sat, 26 Sep 2015 15:13:06 +0000 (11:13 -0400)
committerGerrit Code Review <gerrit@opendaylight.org>
Tue, 29 Sep 2015 11:35:32 +0000 (11:35 +0000)
When a snaphot is saved we delete all prior applied data entries from
the journal. However this also has the side-effect of also deleting prior
UpdateElectionTerm entries so, on restart, we lose the election term
info. We need to persist the election term wih the Snapshot.

Change-Id: I0ed140de1868cc03a28cfbc1d6eb909fe4dbc252
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/resources/lithium-serialized-Snapshot [new file with mode: 0644]

index 85de4da..caa853d 100644 (file)
@@ -127,6 +127,7 @@ class RaftActorRecoverySupport {
         context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, currentBehavior));
         context.setLastApplied(snapshot.getLastAppliedIndex());
         context.setCommitIndex(snapshot.getLastAppliedIndex());
+        context.getTermInformation().update(snapshot.getElectionTerm(), snapshot.getElectionVotedFor());
 
         Stopwatch timer = Stopwatch.createStarted();
 
index feccea7..a369c25 100644 (file)
@@ -20,24 +20,30 @@ public class Snapshot implements Serializable {
     private final long lastTerm;
     private final long lastAppliedIndex;
     private final long lastAppliedTerm;
+    private final long electionTerm;
+    private final String electionVotedFor;
 
-    private Snapshot(byte[] state,
-        List<ReplicatedLogEntry> unAppliedEntries, long lastIndex,
-        long lastTerm, long lastAppliedIndex, long lastAppliedTerm) {
+    private Snapshot(byte[] state, List<ReplicatedLogEntry> unAppliedEntries, long lastIndex, long lastTerm,
+            long lastAppliedIndex, long lastAppliedTerm, long electionTerm, String electionVotedFor) {
         this.state = state;
         this.unAppliedEntries = unAppliedEntries;
         this.lastIndex = lastIndex;
         this.lastTerm = lastTerm;
         this.lastAppliedIndex = lastAppliedIndex;
         this.lastAppliedTerm = lastAppliedTerm;
+        this.electionTerm = electionTerm;
+        this.electionVotedFor = electionVotedFor;
     }
 
+    public static Snapshot create(byte[] state, List<ReplicatedLogEntry> entries, long lastIndex, long lastTerm,
+            long lastAppliedIndex, long lastAppliedTerm) {
+        return new Snapshot(state, entries, lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, -1, null);
+    }
 
-    public static Snapshot create(byte[] state,
-        List<ReplicatedLogEntry> entries, long lastIndex, long lastTerm,
-        long lastAppliedIndex, long lastAppliedTerm) {
-        return new Snapshot(state, entries, lastIndex, lastTerm,
-            lastAppliedIndex, lastAppliedTerm);
+    public static Snapshot create(byte[] state, List<ReplicatedLogEntry> entries, long lastIndex, long lastTerm,
+            long lastAppliedIndex, long lastAppliedTerm, long electionTerm, String electionVotedFor) {
+        return new Snapshot(state, entries, lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm,
+                electionTerm, electionVotedFor);
     }
 
     public byte[] getState() {
@@ -64,15 +70,20 @@ public class Snapshot implements Serializable {
         return this.lastIndex;
     }
 
-    public String getLogMessage() {
-        StringBuilder sb = new StringBuilder();
-        return sb.append("Snapshot={")
-            .append("lastTerm:" + this.getLastTerm() + ", ")
-            .append("lastIndex:" + this.getLastIndex()  + ", ")
-            .append("LastAppliedIndex:" + this.getLastAppliedIndex()  + ", ")
-            .append("LastAppliedTerm:" + this.getLastAppliedTerm()  + ", ")
-            .append("UnAppliedEntries size:" + this.getUnAppliedEntries().size()  + "}")
-            .toString();
+    public long getElectionTerm() {
+        return electionTerm;
+    }
+
+
+    public String getElectionVotedFor() {
+        return electionVotedFor;
+    }
 
+    @Override
+    public String toString() {
+        return "Snapshot [lastIndex=" + lastIndex + ", lastTerm=" + lastTerm + ", lastAppliedIndex=" + lastAppliedIndex
+                + ", lastAppliedTerm=" + lastAppliedTerm + ", unAppliedEntries size=" + unAppliedEntries.size()
+                + ", state size=" + state.length + ", electionTerm=" + electionTerm + ", electionVotedFor=" + electionVotedFor
+                + "]";
     }
 }
index 4c3fb10..c553a39 100644 (file)
@@ -293,11 +293,13 @@ public class SnapshotManager implements SnapshotState {
             Snapshot snapshot = Snapshot.create(snapshotBytes,
                     captureSnapshot.getUnAppliedEntries(),
                     captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
-                    captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
+                    captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm(),
+                    context.getTermInformation().getCurrentTerm(),
+                    context.getTermInformation().getVotedFor());
 
             context.getPersistenceProvider().saveSnapshot(snapshot);
 
-            LOG.info("{}: Persisting of snapshot done: {}", persistenceId(), snapshot.getLogMessage());
+            LOG.info("{}: Persisting of snapshot done: {}", persistenceId(), snapshot);
 
             long dataThreshold = totalMemory *
                     context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
@@ -305,7 +307,7 @@ public class SnapshotManager implements SnapshotState {
 
             boolean logSizeExceededSnapshotBatchCount =
                     context.getReplicatedLog().size() >= context.getConfigParams().getSnapshotBatchCount();
-LOG.debug("Log size: {}, getSnapshotBatchCount: {}",context.getReplicatedLog().size(),context.getConfigParams().getSnapshotBatchCount());
+
             if (dataSizeThresholdExceeded || logSizeExceededSnapshotBatchCount) {
                 if(LOG.isDebugEnabled()) {
                     if(dataSizeThresholdExceeded) {
index c853561..d516f9c 100644 (file)
@@ -355,7 +355,9 @@ public class Follower extends AbstractRaftActorBehavior {
                         installSnapshot.getLastIncludedIndex(),
                         installSnapshot.getLastIncludedTerm(),
                         installSnapshot.getLastIncludedIndex(),
-                        installSnapshot.getLastIncludedTerm());
+                        installSnapshot.getLastIncludedTerm(),
+                        context.getTermInformation().getCurrentTerm(),
+                        context.getTermInformation().getVotedFor());
 
                 actor().tell(new ApplySnapshot(snapshot), actor());
 
index b4c6cab..26bf722 100644 (file)
@@ -185,11 +185,13 @@ public class RaftActorRecoverySupportTest {
         ReplicatedLogEntry unAppliedEntry2 = new MockRaftActorContext.MockReplicatedLogEntry(1,
                 5, new MockRaftActorContext.MockPayload("5", 5));
 
-        int lastAppliedDuringSnapshotCapture = 3;
-        int lastIndexDuringSnapshotCapture = 5;
+        long lastAppliedDuringSnapshotCapture = 3;
+        long lastIndexDuringSnapshotCapture = 5;
+        long electionTerm = 2;
+        String electionVotedFor = "member-2";
 
         Snapshot snapshot = Snapshot.create(snapshotBytes, Arrays.asList(unAppliedEntry1, unAppliedEntry2),
-                lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1);
+                lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1, electionTerm, electionVotedFor);
 
         SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
         SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
@@ -203,6 +205,8 @@ public class RaftActorRecoverySupportTest {
         assertEquals("Commit index", lastAppliedDuringSnapshotCapture, context.getCommitIndex());
         assertEquals("Snapshot term", 1, context.getReplicatedLog().getSnapshotTerm());
         assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
+        assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
+        assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
 
         verify(mockCohort).applyRecoverySnapshot(snapshotBytes);
     }
index a238237..ed4ea72 100644 (file)
@@ -25,7 +25,6 @@ import akka.actor.Props;
 import akka.actor.Terminated;
 import akka.dispatch.Dispatchers;
 import akka.japi.Procedure;
-import akka.persistence.RecoveryCompleted;
 import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
 import akka.persistence.SnapshotMetadata;
@@ -961,7 +960,7 @@ public class RaftActorTest extends AbstractActorTest {
 
         MockRaftActor leaderActor = mockActorRef.underlyingActor();
 
-        leaderActor.handleRecover(RecoveryCompleted.getInstance());
+        leaderActor.waitForRecoveryComplete();
 
         leaderActor.handleCommand(new SwitchBehavior(RaftState.Follower, 100));
 
@@ -982,8 +981,6 @@ public class RaftActorTest extends AbstractActorTest {
 
         assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
-
-
     }
 
     public static ByteString fromObject(Object snapshot) throws Exception {
index df83f8f..f1e7d3f 100644 (file)
@@ -62,6 +62,9 @@ public class SnapshotManagerTest extends AbstractActorTest {
     @Mock
     private Procedure<Void> mockProcedure;
 
+    @Mock
+    private ElectionTerm mockElectionTerm;
+
     private SnapshotManager snapshotManager;
 
     private TestActorFactory factory;
@@ -81,9 +84,9 @@ public class SnapshotManagerTest extends AbstractActorTest {
         doReturn(mockDataPersistenceProvider).when(mockRaftActorContext).getPersistenceProvider();
         doReturn("123").when(mockRaftActorBehavior).getLeaderId();
 
-        ElectionTerm mockElectionTerm = mock(ElectionTerm.class);
         doReturn(mockElectionTerm).when(mockRaftActorContext).getTermInformation();
         doReturn(5L).when(mockElectionTerm).getCurrentTerm();
+        doReturn("member5").when(mockElectionTerm).getVotedFor();
 
         snapshotManager = new SnapshotManager(mockRaftActorContext, LoggerFactory.getLogger(this.getClass()));
         factory = new TestActorFactory(getSystem());
@@ -258,6 +261,8 @@ public class SnapshotManagerTest extends AbstractActorTest {
         assertEquals("getLastAppliedIndex", 8L, snapshot.getLastAppliedIndex());
         assertArrayEquals("getState", bytes, snapshot.getState());
         assertEquals("getUnAppliedEntries", Arrays.asList(lastLogEntry), snapshot.getUnAppliedEntries());
+        assertEquals("electionTerm", mockElectionTerm.getCurrentTerm(), snapshot.getElectionTerm());
+        assertEquals("electionVotedFor", mockElectionTerm.getVotedFor(), snapshot.getElectionVotedFor());
 
         verify(mockReplicatedLog).snapshotPreCommit(7L, 1L);
     }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotTest.java
new file mode 100644 (file)
index 0000000..9302b6d
--- /dev/null
@@ -0,0 +1,71 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
+
+/**
+ * Unit tests for Snapshot.
+ *
+ * @author Thomas Pantelis
+ */
+public class SnapshotTest {
+
+    @Test
+    public void testBackwardsCompatibleDeserializationFromLithium() throws Exception {
+        Snapshot expSnapshot = newLithiumSnapshot();
+        try(FileInputStream fis = new FileInputStream("src/test/resources/lithium-serialized-Snapshot")) {
+            ObjectInputStream ois = new ObjectInputStream(fis);
+
+            Snapshot snapshot = (Snapshot) ois.readObject();
+            ois.close();
+
+            assertEquals("lastIndex", expSnapshot.getLastIndex(), snapshot.getLastIndex());
+            assertEquals("lastTerm", expSnapshot.getLastTerm(), snapshot.getLastTerm());
+            assertEquals("lastAppliedIndex", expSnapshot.getLastAppliedIndex(), snapshot.getLastAppliedIndex());
+            assertEquals("lastAppliedTerm", expSnapshot.getLastAppliedTerm(), snapshot.getLastAppliedTerm());
+            assertEquals("unAppliedEntries size", expSnapshot.getUnAppliedEntries().size(), snapshot.getUnAppliedEntries().size());
+            assertArrayEquals("state", expSnapshot.getState(), snapshot.getState());
+            assertEquals("electionTerm", 0, snapshot.getElectionTerm());
+            assertEquals("electionVotedFor", null, snapshot.getElectionVotedFor());
+        }
+    }
+
+    private Snapshot newLithiumSnapshot() {
+        byte[] state = {1, 2, 3, 4, 5};
+        List<ReplicatedLogEntry> entries = new ArrayList<>();
+        entries.add(new ReplicatedLogImplEntry(6, 2, new MockPayload("payload")));
+        long lastIndex = 6;
+        long lastTerm = 2;
+        long lastAppliedIndex = 5;
+        long lastAppliedTerm = 1;
+
+        return Snapshot.create(state, entries, lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm);
+    }
+
+    /**
+     * Use this method to generate a file with a serialized Snapshot instance to be
+     * used in tests that verify backwards compatible de-serialization.
+     */
+    private void generateSerializedFile(Snapshot snapshot, String fileName) throws IOException {
+        FileOutputStream fos = new FileOutputStream("src/test/resources/" + fileName);
+        ObjectOutputStream oos = new ObjectOutputStream(fos);
+        oos.writeObject(snapshot);
+        fos.close();
+    }
+}
index f189e2d..a06d086 100644 (file)
@@ -749,6 +749,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         logStart("testHandleInstallSnapshot");
 
         MockRaftActorContext context = createActorContext();
+        context.getTermInformation().update(1, "leader");
 
         follower = createBehavior(context);
 
@@ -782,6 +783,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 snapshot.getLastAppliedIndex());
         assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
         Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
+        assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
+        assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
 
         List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
                 leaderActor, InstallSnapshotReply.class);
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/resources/lithium-serialized-Snapshot b/opendaylight/md-sal/sal-akka-raft/src/test/resources/lithium-serialized-Snapshot
new file mode 100644 (file)
index 0000000..5947c4a
Binary files /dev/null and b/opendaylight/md-sal/sal-akka-raft/src/test/resources/lithium-serialized-Snapshot differ