Merge "Bug-1903:On recovery all replicated log entries should not be applied to state"
authorMoiz Raja <moraja@cisco.com>
Thu, 18 Sep 2014 16:30:45 +0000 (16:30 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Thu, 18 Sep 2014 16:30:45 +0000 (16:30 +0000)
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplyLogEntries.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MockAkkaJournal.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/resources/application.conf

index 8270f2949a67cc9fc00f5180dce41872ca6a8a47..6e1a13cf0c19669443b9273e1d2703a3ff2dede9 100644 (file)
@@ -21,6 +21,7 @@ import akka.persistence.SnapshotSelectionCriteria;
 import akka.persistence.UntypedPersistentActor;
 import com.google.common.base.Optional;
 import com.google.protobuf.ByteString;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
@@ -147,12 +148,20 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
         } else if (message instanceof ReplicatedLogEntry) {
             ReplicatedLogEntry logEntry = (ReplicatedLogEntry) message;
-
-            // Apply State immediately
+            LOG.info("Received ReplicatedLogEntry for recovery:{}", logEntry.getIndex());
             replicatedLog.append(logEntry);
-            applyState(null, "recovery", logEntry.getData());
-            context.setLastApplied(logEntry.getIndex());
-            context.setCommitIndex(logEntry.getIndex());
+
+        } else if (message instanceof ApplyLogEntries) {
+            ApplyLogEntries ale = (ApplyLogEntries) message;
+
+            LOG.info("Received ApplyLogEntries for recovery, applying to state:{} to {}",
+                context.getLastApplied() + 1, ale.getToIndex());
+
+            for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) {
+                applyState(null, "recovery", replicatedLog.get(i).getData());
+            }
+            context.setLastApplied(ale.getToIndex());
+            context.setCommitIndex(ale.getToIndex());
 
         } else if (message instanceof DeleteEntries) {
             replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
@@ -187,6 +196,15 @@ public abstract class RaftActor extends UntypedPersistentActor {
             applyState(applyState.getClientActor(), applyState.getIdentifier(),
                 applyState.getReplicatedLogEntry().getData());
 
+        } else if (message instanceof ApplyLogEntries){
+            ApplyLogEntries ale = (ApplyLogEntries) message;
+            LOG.info("Persisting ApplyLogEntries with index={}", ale.getToIndex());
+            persist(new ApplyLogEntries(ale.getToIndex()), new Procedure<ApplyLogEntries>() {
+                @Override
+                public void apply(ApplyLogEntries param) throws Exception {
+                }
+            });
+
         } else if(message instanceof ApplySnapshot ) {
             Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
 
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplyLogEntries.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplyLogEntries.java
new file mode 100644 (file)
index 0000000..af3c4fd
--- /dev/null
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.base.messages;
+
+import java.io.Serializable;
+
+/**
+ * ApplyLogEntries serves as a message which is stored in the akka's persistent
+ * journal.
+ * During recovery if this message is found, then all in-mem journal entries from
+ * context.lastApplied to ApplyLogEntries.toIndex are applied to the state
+ *
+ * This class is also used as a internal message sent from Behaviour to
+ * RaftActor to persist the ApplyLogEntries
+ *
+ */
+public class ApplyLogEntries implements Serializable {
+    private final int toIndex;
+
+    public ApplyLogEntries(int toIndex) {
+        this.toIndex = toIndex;
+    }
+
+    public int getToIndex() {
+        return toIndex;
+    }
+}
index 35d563b784cf3f4705784a78952249ce06badbea..b1560a5648b283e028ae0dc96e4267d92c6f438f 100644 (file)
@@ -15,6 +15,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
@@ -347,6 +348,12 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         }
         context.getLogger().debug("Setting last applied to {}", newLastApplied);
         context.setLastApplied(newLastApplied);
+
+        // send a message to persist a ApplyLogEntries marker message into akka's persistent journal
+        // will be used during recovery
+        //in case if the above code throws an error and this message is not sent, it would be fine
+        // as the  append entries received later would initiate add this message to the journal
+        actor().tell(new ApplyLogEntries((int) context.getLastApplied()), actor());
     }
 
     protected Object fromSerializableMessage(Object serializable){
index 9b099c2abac8223529b750c6ea906925105ec487..998c198756d191df038de6f6cc75a8091fd0d1f1 100644 (file)
@@ -9,12 +9,21 @@ import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import com.google.protobuf.ByteString;
+import org.junit.After;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
 import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -25,13 +34,21 @@ import static junit.framework.TestCase.assertEquals;
 public class RaftActorTest extends AbstractActorTest {
 
 
+    @After
+    public void tearDown() {
+        MockAkkaJournal.clearJournal();
+        MockSnapshotStore.setMockSnapshot(null);
+    }
+
     public static class MockRaftActor extends RaftActor {
 
-        boolean applySnapshotCalled = false;
+        private boolean applySnapshotCalled = false;
+        private List<Object> state;
 
         public MockRaftActor(String id,
             Map<String, String> peerAddresses) {
             super(id, peerAddresses);
+            state = new ArrayList<>();
         }
 
         public RaftActorContext getRaftActorContext() {
@@ -42,6 +59,10 @@ public class RaftActorTest extends AbstractActorTest {
             return applySnapshotCalled;
         }
 
+        public List<Object> getState() {
+            return state;
+        }
+
         public static Props props(final String id, final Map<String, String> peerAddresses){
             return Props.create(new Creator<MockRaftActor>(){
 
@@ -51,9 +72,8 @@ public class RaftActorTest extends AbstractActorTest {
             });
         }
 
-        @Override protected void applyState(ActorRef clientActor,
-            String identifier,
-            Object data) {
+        @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
+            state.add(data);
         }
 
         @Override protected void createSnapshot() {
@@ -61,7 +81,17 @@ public class RaftActorTest extends AbstractActorTest {
         }
 
         @Override protected void applySnapshot(ByteString snapshot) {
-           applySnapshotCalled = true;
+            applySnapshotCalled = true;
+            try {
+                Object data = toObject(snapshot);
+                if (data instanceof List) {
+                    state.addAll((List) data);
+                }
+            } catch (ClassNotFoundException e) {
+                e.printStackTrace();
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
         }
 
         @Override protected void onStateChanged() {
@@ -71,6 +101,26 @@ public class RaftActorTest extends AbstractActorTest {
             return this.getId();
         }
 
+        private Object toObject(ByteString bs) throws ClassNotFoundException, IOException {
+            Object obj = null;
+            ByteArrayInputStream bis = null;
+            ObjectInputStream ois = null;
+            try {
+                bis = new ByteArrayInputStream(bs.toByteArray());
+                ois = new ObjectInputStream(bis);
+                obj = ois.readObject();
+            } finally {
+                if (bis != null) {
+                    bis.close();
+                }
+                if (ois != null) {
+                    ois.close();
+                }
+            }
+            return obj;
+        }
+
+
     }
 
 
@@ -151,7 +201,7 @@ public class RaftActorTest extends AbstractActorTest {
     }
 
     @Test
-    public void testActorRecovery() {
+    public void testRaftActorRecovery() {
         new JavaTestKit(getSystem()) {{
             new Within(duration("1 seconds")) {
                 protected void run() {
@@ -161,20 +211,50 @@ public class RaftActorTest extends AbstractActorTest {
                     ActorRef followerActor = getSystem().actorOf(
                         MockRaftActor.props(persistenceId, Collections.EMPTY_MAP), persistenceId);
 
+                    List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
+                    ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("E"));
+                    snapshotUnappliedEntries.add(entry1);
+
+                    int lastAppliedDuringSnapshotCapture = 3;
+                    int lastIndexDuringSnapshotCapture = 4;
 
+                    ByteString snapshotBytes = null;
+                    try {
+                        // 4 messages as part of snapshot, which are applied to state
+                        snapshotBytes  = fromObject(Arrays.asList(new MockRaftActorContext.MockPayload("A"),
+                            new MockRaftActorContext.MockPayload("B"),
+                            new MockRaftActorContext.MockPayload("C"),
+                            new MockRaftActorContext.MockPayload("D")));
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                    Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
+                        snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 ,
+                        lastAppliedDuringSnapshotCapture, 1);
+                    MockSnapshotStore.setMockSnapshot(snapshot);
+                    MockSnapshotStore.setPersistenceId(persistenceId);
+
+                    // add more entries after snapshot is taken
                     List<ReplicatedLogEntry> entries = new ArrayList<>();
-                    ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("E"));
                     ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5, new MockRaftActorContext.MockPayload("F"));
-                    entries.add(entry1);
+                    ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6, new MockRaftActorContext.MockPayload("G"));
+                    ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7, new MockRaftActorContext.MockPayload("H"));
                     entries.add(entry2);
+                    entries.add(entry3);
+                    entries.add(entry4);
 
-                    int lastApplied = 3;
-                    int lastIndex = 5;
-                    Snapshot snapshot = Snapshot.create("A B C D".getBytes(), entries, lastIndex, 1 , lastApplied, 1);
-                    MockSnapshotStore.setMockSnapshot(snapshot);
-                    MockSnapshotStore.setPersistenceId(persistenceId);
+                    int lastAppliedToState = 5;
+                    int lastIndex = 7;
+
+                    MockAkkaJournal.addToJournal(5, entry2);
+                    // 2 entries are applied to state besides the 4 entries in snapshot
+                    MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState));
+                    MockAkkaJournal.addToJournal(7, entry3);
+                    MockAkkaJournal.addToJournal(8, entry4);
 
+                    // kill the actor
                     followerActor.tell(PoisonPill.getInstance(), null);
+
                     try {
                         // give some time for actor to die
                         Thread.sleep(200);
@@ -182,24 +262,47 @@ public class RaftActorTest extends AbstractActorTest {
                         e.printStackTrace();
                     }
 
-                    TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, Collections.EMPTY_MAP));
+                    //reinstate the actor
+                    TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(),
+                        MockRaftActor.props(persistenceId, Collections.EMPTY_MAP));
+
                     try {
                         //give some time for snapshot offer to get called.
                         Thread.sleep(200);
                     } catch (InterruptedException e) {
                         e.printStackTrace();
                     }
+
                     RaftActorContext context = ref.underlyingActor().getRaftActorContext();
-                    assertEquals(entries.size(), context.getReplicatedLog().size());
-                    assertEquals(lastApplied, context.getLastApplied());
-                    assertEquals(lastApplied, context.getCommitIndex());
+                    assertEquals(snapshotUnappliedEntries.size() + entries.size(), context.getReplicatedLog().size());
+                    assertEquals(lastIndex, context.getReplicatedLog().lastIndex());
+                    assertEquals(lastAppliedToState, context.getLastApplied());
+                    assertEquals(lastAppliedToState, context.getCommitIndex());
                     assertTrue(ref.underlyingActor().isApplySnapshotCalled());
+                    assertEquals(6, ref.underlyingActor().getState().size());
                 }
-
             };
         }};
 
     }
 
-
+    private ByteString fromObject(Object snapshot) throws Exception {
+        ByteArrayOutputStream b = null;
+        ObjectOutputStream o = null;
+        try {
+            b = new ByteArrayOutputStream();
+            o = new ObjectOutputStream(b);
+            o.writeObject(snapshot);
+            byte[] snapshotBytes = b.toByteArray();
+            return ByteString.copyFrom(snapshotBytes);
+        } finally {
+            if (o != null) {
+                o.flush();
+                o.close();
+            }
+            if (b != null) {
+                b.close();
+            }
+        }
+    }
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MockAkkaJournal.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MockAkkaJournal.java
new file mode 100644 (file)
index 0000000..85edc07
--- /dev/null
@@ -0,0 +1,76 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.utils;
+
+import akka.dispatch.Futures;
+import akka.japi.Procedure;
+import akka.persistence.PersistentConfirmation;
+import akka.persistence.PersistentId;
+import akka.persistence.PersistentImpl;
+import akka.persistence.PersistentRepr;
+import akka.persistence.journal.japi.AsyncWriteJournal;
+import com.google.common.collect.Maps;
+import scala.concurrent.Future;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+public class MockAkkaJournal extends AsyncWriteJournal {
+
+    private static Map<Long, Object> journal = Maps.newHashMap();
+
+    public static void addToJournal(long sequenceNr, Object message) {
+        journal.put(sequenceNr, message);
+    }
+
+    public static void clearJournal() {
+        journal.clear();
+    }
+
+    @Override
+    public Future<Void> doAsyncReplayMessages(final String persistenceId, long fromSequenceNr,
+        long toSequenceNr, long max, final Procedure<PersistentRepr> replayCallback) {
+
+        return Futures.future(new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                for (Map.Entry<Long,Object> entry : journal.entrySet()) {
+                    PersistentRepr persistentMessage =
+                        new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId, false, null, null);
+                    replayCallback.apply(persistentMessage);
+                }
+                return null;
+            }
+        }, context().dispatcher());
+    }
+
+    @Override
+    public Future<Long> doAsyncReadHighestSequenceNr(String s, long l) {
+        return Futures.successful(new Long(0));
+    }
+
+    @Override
+    public Future<Void> doAsyncWriteMessages(Iterable<PersistentRepr> persistentReprs) {
+        return Futures.successful(null);
+    }
+
+    @Override
+    public Future<Void> doAsyncWriteConfirmations(Iterable<PersistentConfirmation> persistentConfirmations) {
+        return Futures.successful(null);
+    }
+
+    @Override
+    public Future<Void> doAsyncDeleteMessages(Iterable<PersistentId> persistentIds, boolean b) {
+        return Futures.successful(null);
+    }
+
+    @Override
+    public Future<Void> doAsyncDeleteMessagesTo(String s, long l, boolean b) {
+        return Futures.successful(null);
+    }
+}
index 6b2cc2203844198bc546ab509695d3c134a35a8b..2f53d4a4ebef7121cf20f2241e97929b6e324964 100644 (file)
@@ -1,5 +1,6 @@
 akka {
     persistence.snapshot-store.plugin = "mock-snapshot-store"
+    persistence.journal.plugin = "mock-journal"
 
     loglevel = "DEBUG"
     loggers = ["akka.testkit.TestEventListener", "akka.event.slf4j.Slf4jLogger"]
@@ -28,3 +29,10 @@ mock-snapshot-store {
   # Dispatcher for the plugin actor.
   plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
 }
+
+mock-journal {
+  # Class name of the plugin.
+  class = "org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal"
+  # Dispatcher for the plugin actor.
+  plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+}