Merge "Bug 2769: Fix issues with ApplyLogEntries"
authorMoiz Raja <moraja@cisco.com>
Mon, 2 Mar 2015 22:16:17 +0000 (22:16 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 2 Mar 2015 22:16:18 +0000 (22:16 +0000)
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplyJournalEntries.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplyLogEntries.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MockAkkaJournal.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java

index 285be39c0b3286c2abbdaf3c08e09c1f85c0224f..cb1b42aa1b1fb95ced067fc152e0760a3ce0576f 100644 (file)
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
@@ -147,7 +148,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             } else if (message instanceof ReplicatedLogEntry) {
                 onRecoveredJournalLogEntry((ReplicatedLogEntry) message);
             } else if (message instanceof ApplyLogEntries) {
-                onRecoveredApplyLogEntries((ApplyLogEntries) message);
+                // Handle this message for backwards compatibility with pre-Lithium versions.
+                onRecoveredApplyLogEntries(((ApplyLogEntries) message).getToIndex());
+            } else if (message instanceof ApplyJournalEntries) {
+                onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
             } else if (message instanceof DeleteEntries) {
                 replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
             } else if (message instanceof UpdateElectionTerm) {
@@ -209,18 +213,18 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         replicatedLog.append(logEntry);
     }
 
-    private void onRecoveredApplyLogEntries(ApplyLogEntries ale) {
+    private void onRecoveredApplyLogEntries(long toIndex) {
         if(LOG.isDebugEnabled()) {
             LOG.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
-                    persistenceId(), context.getLastApplied() + 1, ale.getToIndex());
+                    persistenceId(), context.getLastApplied() + 1, toIndex);
         }
 
-        for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) {
+        for (long i = context.getLastApplied() + 1; i <= toIndex; i++) {
             batchRecoveredLogEntry(replicatedLog.get(i));
         }
 
-        context.setLastApplied(ale.getToIndex());
-        context.setCommitIndex(ale.getToIndex());
+        context.setLastApplied(toIndex);
+        context.setCommitIndex(toIndex);
     }
 
     private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
@@ -297,14 +301,14 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             applyState(applyState.getClientActor(), applyState.getIdentifier(),
                 applyState.getReplicatedLogEntry().getData());
 
-        } else if (message instanceof ApplyLogEntries){
-            ApplyLogEntries ale = (ApplyLogEntries) message;
+        } else if (message instanceof ApplyJournalEntries){
+            ApplyJournalEntries applyEntries = (ApplyJournalEntries) message;
             if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), ale.getToIndex());
+                LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), applyEntries.getToIndex());
             }
-            persistence().persist(new ApplyLogEntries(ale.getToIndex()), new Procedure<ApplyLogEntries>() {
+            persistence().persist(applyEntries, new Procedure<ApplyJournalEntries>() {
                 @Override
-                public void apply(ApplyLogEntries param) throws Exception {
+                public void apply(ApplyJournalEntries param) throws Exception {
                 }
             });
 
@@ -424,9 +428,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                             // Apply the state immediately
                             applyState(clientActor, identifier, data);
 
-                            // Send a ApplyLogEntries message so that we write the fact that we applied
+                            // Send a ApplyJournalEntries message so that we write the fact that we applied
                             // the state to durable storage
-                            self().tell(new ApplyLogEntries((int) replicatedLogEntry.getIndex()), self());
+                            self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self());
 
                             // Check if the "real" snapshot capture has been initiated. If no then do the fake snapshot
                             if(!context.isSnapshotCaptureInitiated()){
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplyJournalEntries.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplyJournalEntries.java
new file mode 100644 (file)
index 0000000..ca251d2
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.base.messages;
+
+import java.io.Serializable;
+
+/**
+ * This is an internal message that is stored in the akka's persistent journal. During recovery, this
+ * message is used to apply recovered journal entries to the state whose indexes range from the context's
+ * current lastApplied index to "toIndex" contained in the message. This message is sent internally from a
+ * behavior to the RaftActor to persist.
+ *
+ * @author Thomas Pantelis
+ */
+public class ApplyJournalEntries implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final long toIndex;
+
+    public ApplyJournalEntries(long toIndex) {
+        this.toIndex = toIndex;
+    }
+
+    public long getToIndex() {
+        return toIndex;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("ApplyJournalEntries [toIndex=").append(toIndex).append("]");
+        return builder.toString();
+    }
+}
index c395915a0fb5763ab733774b5a4680c97553afe0..744d0098419576696d289768e10d813ad09c4176 100644 (file)
@@ -18,9 +18,11 @@ import java.io.Serializable;
  * This class is also used as a internal message sent from Behaviour to
  * RaftActor to persist the ApplyLogEntries
  *
+ * @deprecated Deprecated in favor of ApplyJournalEntries whose type for toIndex is long instead of int.
+ *             This class was kept for backwards compatibility with Helium.
  */
+@Deprecated
 public class ApplyLogEntries implements Serializable {
-    private static final long serialVersionUID = 1L;
     private final int toIndex;
 
     public ApplyLogEntries(int toIndex) {
index 0b0b4c7cd642480f92dd600a4f8f10be07977dc4..ef5f11e37aef4fe7490887a27d91caedb0e50c51 100644 (file)
@@ -17,7 +17,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.SerializationUtils;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
@@ -393,7 +393,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         // will be used during recovery
         //in case if the above code throws an error and this message is not sent, it would be fine
         // as the  append entries received later would initiate add this message to the journal
-        actor().tell(new ApplyLogEntries((int) context.getLastApplied()), actor());
+        actor().tell(new ApplyJournalEntries(context.getLastApplied()), actor());
     }
 
     protected Object fromSerializableMessage(Object serializable){
index 56bfc21f23c2047a19d5a49c49861ae1bf6eca5e..c0bdc53c51f27907da2de0e2399d619cee8fdda1 100644 (file)
@@ -55,6 +55,7 @@ import org.junit.Test;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
@@ -396,10 +397,11 @@ public class RaftActorTest extends AbstractActorTest {
 
             MockAkkaJournal.addToJournal(5, entry2);
             // 2 entries are applied to state besides the 4 entries in snapshot
-            MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState));
+            MockAkkaJournal.addToJournal(6, new ApplyJournalEntries(lastAppliedToState));
             MockAkkaJournal.addToJournal(7, entry3);
             MockAkkaJournal.addToJournal(8, entry4);
 
+
             // kill the actor
             followerActor.tell(PoisonPill.getInstance(), null);
             expectMsgClass(duration("5 seconds"), Terminated.class);
@@ -423,6 +425,46 @@ public class RaftActorTest extends AbstractActorTest {
         }};
     }
 
+    @Test
+    public void testRaftActorRecoveryWithPreLithuimApplyLogEntries() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            String persistenceId = factory.generateActorId("leader-");
+
+            DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+            config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+            // Setup the persisted journal with some entries
+            ReplicatedLogEntry entry0 = new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
+                    new MockRaftActorContext.MockPayload("zero"));
+            ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
+                    new MockRaftActorContext.MockPayload("oen"));
+            ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
+                    new MockRaftActorContext.MockPayload("two"));
+
+            long seqNr = 1;
+            MockAkkaJournal.addToJournal(seqNr++, entry0);
+            MockAkkaJournal.addToJournal(seqNr++, entry1);
+            MockAkkaJournal.addToJournal(seqNr++, new ApplyLogEntries(1));
+            MockAkkaJournal.addToJournal(seqNr++, entry2);
+
+            int lastAppliedToState = 1;
+            int lastIndex = 2;
+
+            //reinstate the actor
+            TestActorRef<MockRaftActor> leaderActor = factory.createTestActor(
+                    MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
+                            Optional.<ConfigParams>of(config)));
+
+            leaderActor.underlyingActor().waitForRecoveryComplete();
+
+            RaftActorContext context = leaderActor.underlyingActor().getRaftActorContext();
+            assertEquals("Journal log size", 3, context.getReplicatedLog().size());
+            assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
+            assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
+            assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
+        }};
+    }
+
     /**
      * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
      * process recovery messages
@@ -471,7 +513,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 assertEquals("add replicated log entry", 2, replicatedLog.size());
 
-                mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
+                mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
 
                 assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
 
@@ -538,7 +580,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 assertEquals("add replicated log entry", 0, replicatedLog.size());
 
-                mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
+                mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
 
                 assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
 
@@ -641,7 +683,7 @@ public class RaftActorTest extends AbstractActorTest {
     }
 
     @Test
-    public void testApplyLogEntriesCallsDataPersistence() throws Exception {
+    public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
         new JavaTestKit(getSystem()) {
             {
                 String persistenceId = factory.generateActorId("leader-");
@@ -659,7 +701,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.waitForInitializeBehaviorComplete();
 
-                mockRaftActor.onReceiveCommand(new ApplyLogEntries(10));
+                mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
 
                 verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class));
 
index c57fce1cd553d8c41dd786d9c4bb6f33e97791b6..6964db51f273f52ac0591b80cf75edf5997ec68c 100644 (file)
@@ -27,7 +27,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.SerializationUtils;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
@@ -929,12 +929,12 @@ public class LeaderTest extends AbstractLeaderTest {
 
         assertEquals(2, leaderActorContext.getCommitIndex());
 
-        ApplyLogEntries applyLogEntries = MessageCollectorActor.expectFirstMatching(
-                leaderActor, ApplyLogEntries.class);
+        ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
+                leaderActor, ApplyJournalEntries.class);
 
         assertEquals(2, leaderActorContext.getLastApplied());
 
-        assertEquals(2, applyLogEntries.getToIndex());
+        assertEquals(2, applyJournalEntries.getToIndex());
 
         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
                 ApplyState.class);
index 85edc07bc5c683a136d0bbb93f534ecc6f2e7103..47864be41161a060c71b35f52ce835dbd0064b61 100644 (file)
@@ -15,14 +15,13 @@ import akka.persistence.PersistentImpl;
 import akka.persistence.PersistentRepr;
 import akka.persistence.journal.japi.AsyncWriteJournal;
 import com.google.common.collect.Maps;
-import scala.concurrent.Future;
-
 import java.util.Map;
 import java.util.concurrent.Callable;
+import scala.concurrent.Future;
 
 public class MockAkkaJournal extends AsyncWriteJournal {
 
-    private static Map<Long, Object> journal = Maps.newHashMap();
+    private static Map<Long, Object> journal = Maps.newLinkedHashMap();
 
     public static void addToJournal(long sequenceNr, Object message) {
         journal.put(sequenceNr, message);
index 4b0651a48eb07f4814bd990386bbe94d32159dfd..bbbc4db5e351f99df3856e807b146f18ac1899d1 100644 (file)
@@ -74,6 +74,7 @@ import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
@@ -492,7 +493,7 @@ public class ShardTest extends AbstractActorTest {
         }
 
         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
-                new ApplyLogEntries(nListEntries));
+                new ApplyJournalEntries(nListEntries));
 
         testRecovery(listEntryKeys);
     }