Bug 2769: Fix issues with ApplyLogEntries 01/15901/4
authorTom Pantelis <tpanteli@brocade.com>
Sun, 1 Mar 2015 13:16:28 +0000 (08:16 -0500)
committerTom Pantelis <tpanteli@brocade.com>
Sun, 1 Mar 2015 14:36:34 +0000 (09:36 -0500)
ApplyLogEntries defines toIndex as an int which is inconsistent with
everything else that stores journal indexes as long. In addition this
would cause an insiduous bug if an index was greater than max-int.

To maintain backwards compatibility, we can't change the type in
ApplyLogEntries so I created a new class, ApplyJournalEntries, that
defines toIndex as a long and deprecated ApplyLogEntries.

Also, the serialVersionUID for ApplyLogEntries was previously added in
Lithium however this would cause de-serialization failures when
recovering a pre-Lithium journal from persistence. So I removed the
serialVersionUID.

Change-Id: I7c5fe3a2ef8de292224a1a278abe53fc774a79d8
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplyJournalEntries.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplyLogEntries.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MockAkkaJournal.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java

index 285be39c0b3286c2abbdaf3c08e09c1f85c0224f..cb1b42aa1b1fb95ced067fc152e0760a3ce0576f 100644 (file)
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
@@ -147,7 +148,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             } else if (message instanceof ReplicatedLogEntry) {
                 onRecoveredJournalLogEntry((ReplicatedLogEntry) message);
             } else if (message instanceof ApplyLogEntries) {
-                onRecoveredApplyLogEntries((ApplyLogEntries) message);
+                // Handle this message for backwards compatibility with pre-Lithium versions.
+                onRecoveredApplyLogEntries(((ApplyLogEntries) message).getToIndex());
+            } else if (message instanceof ApplyJournalEntries) {
+                onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
             } else if (message instanceof DeleteEntries) {
                 replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
             } else if (message instanceof UpdateElectionTerm) {
@@ -209,18 +213,18 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         replicatedLog.append(logEntry);
     }
 
-    private void onRecoveredApplyLogEntries(ApplyLogEntries ale) {
+    private void onRecoveredApplyLogEntries(long toIndex) {
         if(LOG.isDebugEnabled()) {
             LOG.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
-                    persistenceId(), context.getLastApplied() + 1, ale.getToIndex());
+                    persistenceId(), context.getLastApplied() + 1, toIndex);
         }
 
-        for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) {
+        for (long i = context.getLastApplied() + 1; i <= toIndex; i++) {
             batchRecoveredLogEntry(replicatedLog.get(i));
         }
 
-        context.setLastApplied(ale.getToIndex());
-        context.setCommitIndex(ale.getToIndex());
+        context.setLastApplied(toIndex);
+        context.setCommitIndex(toIndex);
     }
 
     private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
@@ -297,14 +301,14 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             applyState(applyState.getClientActor(), applyState.getIdentifier(),
                 applyState.getReplicatedLogEntry().getData());
 
-        } else if (message instanceof ApplyLogEntries){
-            ApplyLogEntries ale = (ApplyLogEntries) message;
+        } else if (message instanceof ApplyJournalEntries){
+            ApplyJournalEntries applyEntries = (ApplyJournalEntries) message;
             if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), ale.getToIndex());
+                LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), applyEntries.getToIndex());
             }
-            persistence().persist(new ApplyLogEntries(ale.getToIndex()), new Procedure<ApplyLogEntries>() {
+            persistence().persist(applyEntries, new Procedure<ApplyJournalEntries>() {
                 @Override
-                public void apply(ApplyLogEntries param) throws Exception {
+                public void apply(ApplyJournalEntries param) throws Exception {
                 }
             });
 
@@ -424,9 +428,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                             // Apply the state immediately
                             applyState(clientActor, identifier, data);
 
-                            // Send a ApplyLogEntries message so that we write the fact that we applied
+                            // Send a ApplyJournalEntries message so that we write the fact that we applied
                             // the state to durable storage
-                            self().tell(new ApplyLogEntries((int) replicatedLogEntry.getIndex()), self());
+                            self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self());
 
                             // Check if the "real" snapshot capture has been initiated. If no then do the fake snapshot
                             if(!context.isSnapshotCaptureInitiated()){
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplyJournalEntries.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplyJournalEntries.java
new file mode 100644 (file)
index 0000000..ca251d2
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.base.messages;
+
+import java.io.Serializable;
+
+/**
+ * This is an internal message that is stored in the akka's persistent journal. During recovery, this
+ * message is used to apply recovered journal entries to the state whose indexes range from the context's
+ * current lastApplied index to "toIndex" contained in the message. This message is sent internally from a
+ * behavior to the RaftActor to persist.
+ *
+ * @author Thomas Pantelis
+ */
+public class ApplyJournalEntries implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final long toIndex;
+
+    public ApplyJournalEntries(long toIndex) {
+        this.toIndex = toIndex;
+    }
+
+    public long getToIndex() {
+        return toIndex;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("ApplyJournalEntries [toIndex=").append(toIndex).append("]");
+        return builder.toString();
+    }
+}
index c395915a0fb5763ab733774b5a4680c97553afe0..744d0098419576696d289768e10d813ad09c4176 100644 (file)
@@ -18,9 +18,11 @@ import java.io.Serializable;
  * This class is also used as a internal message sent from Behaviour to
  * RaftActor to persist the ApplyLogEntries
  *
+ * @deprecated Deprecated in favor of ApplyJournalEntries whose type for toIndex is long instead of int.
+ *             This class was kept for backwards compatibility with Helium.
  */
+@Deprecated
 public class ApplyLogEntries implements Serializable {
-    private static final long serialVersionUID = 1L;
     private final int toIndex;
 
     public ApplyLogEntries(int toIndex) {
index 0b0b4c7cd642480f92dd600a4f8f10be07977dc4..ef5f11e37aef4fe7490887a27d91caedb0e50c51 100644 (file)
@@ -17,7 +17,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.SerializationUtils;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
@@ -393,7 +393,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         // will be used during recovery
         //in case if the above code throws an error and this message is not sent, it would be fine
         // as the  append entries received later would initiate add this message to the journal
-        actor().tell(new ApplyLogEntries((int) context.getLastApplied()), actor());
+        actor().tell(new ApplyJournalEntries(context.getLastApplied()), actor());
     }
 
     protected Object fromSerializableMessage(Object serializable){
index 56bfc21f23c2047a19d5a49c49861ae1bf6eca5e..c0bdc53c51f27907da2de0e2399d619cee8fdda1 100644 (file)
@@ -55,6 +55,7 @@ import org.junit.Test;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
@@ -396,10 +397,11 @@ public class RaftActorTest extends AbstractActorTest {
 
             MockAkkaJournal.addToJournal(5, entry2);
             // 2 entries are applied to state besides the 4 entries in snapshot
-            MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState));
+            MockAkkaJournal.addToJournal(6, new ApplyJournalEntries(lastAppliedToState));
             MockAkkaJournal.addToJournal(7, entry3);
             MockAkkaJournal.addToJournal(8, entry4);
 
+
             // kill the actor
             followerActor.tell(PoisonPill.getInstance(), null);
             expectMsgClass(duration("5 seconds"), Terminated.class);
@@ -423,6 +425,46 @@ public class RaftActorTest extends AbstractActorTest {
         }};
     }
 
+    @Test
+    public void testRaftActorRecoveryWithPreLithuimApplyLogEntries() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            String persistenceId = factory.generateActorId("leader-");
+
+            DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+            config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+            // Setup the persisted journal with some entries
+            ReplicatedLogEntry entry0 = new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
+                    new MockRaftActorContext.MockPayload("zero"));
+            ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
+                    new MockRaftActorContext.MockPayload("oen"));
+            ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
+                    new MockRaftActorContext.MockPayload("two"));
+
+            long seqNr = 1;
+            MockAkkaJournal.addToJournal(seqNr++, entry0);
+            MockAkkaJournal.addToJournal(seqNr++, entry1);
+            MockAkkaJournal.addToJournal(seqNr++, new ApplyLogEntries(1));
+            MockAkkaJournal.addToJournal(seqNr++, entry2);
+
+            int lastAppliedToState = 1;
+            int lastIndex = 2;
+
+            //reinstate the actor
+            TestActorRef<MockRaftActor> leaderActor = factory.createTestActor(
+                    MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
+                            Optional.<ConfigParams>of(config)));
+
+            leaderActor.underlyingActor().waitForRecoveryComplete();
+
+            RaftActorContext context = leaderActor.underlyingActor().getRaftActorContext();
+            assertEquals("Journal log size", 3, context.getReplicatedLog().size());
+            assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
+            assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
+            assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
+        }};
+    }
+
     /**
      * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
      * process recovery messages
@@ -471,7 +513,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 assertEquals("add replicated log entry", 2, replicatedLog.size());
 
-                mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
+                mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
 
                 assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
 
@@ -538,7 +580,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 assertEquals("add replicated log entry", 0, replicatedLog.size());
 
-                mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
+                mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
 
                 assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
 
@@ -641,7 +683,7 @@ public class RaftActorTest extends AbstractActorTest {
     }
 
     @Test
-    public void testApplyLogEntriesCallsDataPersistence() throws Exception {
+    public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
         new JavaTestKit(getSystem()) {
             {
                 String persistenceId = factory.generateActorId("leader-");
@@ -659,7 +701,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.waitForInitializeBehaviorComplete();
 
-                mockRaftActor.onReceiveCommand(new ApplyLogEntries(10));
+                mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
 
                 verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class));
 
index c57fce1cd553d8c41dd786d9c4bb6f33e97791b6..6964db51f273f52ac0591b80cf75edf5997ec68c 100644 (file)
@@ -27,7 +27,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.SerializationUtils;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
@@ -929,12 +929,12 @@ public class LeaderTest extends AbstractLeaderTest {
 
         assertEquals(2, leaderActorContext.getCommitIndex());
 
-        ApplyLogEntries applyLogEntries = MessageCollectorActor.expectFirstMatching(
-                leaderActor, ApplyLogEntries.class);
+        ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
+                leaderActor, ApplyJournalEntries.class);
 
         assertEquals(2, leaderActorContext.getLastApplied());
 
-        assertEquals(2, applyLogEntries.getToIndex());
+        assertEquals(2, applyJournalEntries.getToIndex());
 
         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
                 ApplyState.class);
index 85edc07bc5c683a136d0bbb93f534ecc6f2e7103..47864be41161a060c71b35f52ce835dbd0064b61 100644 (file)
@@ -15,14 +15,13 @@ import akka.persistence.PersistentImpl;
 import akka.persistence.PersistentRepr;
 import akka.persistence.journal.japi.AsyncWriteJournal;
 import com.google.common.collect.Maps;
-import scala.concurrent.Future;
-
 import java.util.Map;
 import java.util.concurrent.Callable;
+import scala.concurrent.Future;
 
 public class MockAkkaJournal extends AsyncWriteJournal {
 
-    private static Map<Long, Object> journal = Maps.newHashMap();
+    private static Map<Long, Object> journal = Maps.newLinkedHashMap();
 
     public static void addToJournal(long sequenceNr, Object message) {
         journal.put(sequenceNr, message);
index 4b0651a48eb07f4814bd990386bbe94d32159dfd..bbbc4db5e351f99df3856e807b146f18ac1899d1 100644 (file)
@@ -74,6 +74,7 @@ import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
@@ -492,7 +493,7 @@ public class ShardTest extends AbstractActorTest {
         }
 
         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
-                new ApplyLogEntries(nListEntries));
+                new ApplyJournalEntries(nListEntries));
 
         testRecovery(listEntryKeys);
     }