From b9cc9136fe3970ea12dfa3f0fedc803dc3cc101b Mon Sep 17 00:00:00 2001 From: Kamal Rameshan Date: Tue, 16 Sep 2014 23:15:32 -0700 Subject: [PATCH] Bug-1903:On recovery all replicated log entries should not be applied to state Change-Id: I8b246a813d0e2afb723510cfd187b2d411caab6b Signed-off-by: Kamal Rameshan --- .../controller/cluster/raft/RaftActor.java | 28 +++- .../raft/base/messages/ApplyLogEntries.java | 32 ++++ .../behaviors/AbstractRaftActorBehavior.java | 7 + .../cluster/raft/RaftActorTest.java | 141 +++++++++++++++--- .../cluster/raft/utils/MockAkkaJournal.java | 76 ++++++++++ .../src/test/resources/application.conf | 8 + 6 files changed, 268 insertions(+), 24 deletions(-) create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplyLogEntries.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MockAkkaJournal.java diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 8270f2949a..6e1a13cf0c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -21,6 +21,7 @@ import akka.persistence.SnapshotSelectionCriteria; import akka.persistence.UntypedPersistentActor; import com.google.common.base.Optional; import com.google.protobuf.ByteString; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; @@ -147,12 +148,20 @@ public abstract class RaftActor extends UntypedPersistentActor { } else if (message instanceof ReplicatedLogEntry) { ReplicatedLogEntry logEntry = (ReplicatedLogEntry) message; - - // Apply State immediately + LOG.info("Received ReplicatedLogEntry for recovery:{}", logEntry.getIndex()); replicatedLog.append(logEntry); - applyState(null, "recovery", logEntry.getData()); - context.setLastApplied(logEntry.getIndex()); - context.setCommitIndex(logEntry.getIndex()); + + } else if (message instanceof ApplyLogEntries) { + ApplyLogEntries ale = (ApplyLogEntries) message; + + LOG.info("Received ApplyLogEntries for recovery, applying to state:{} to {}", + context.getLastApplied() + 1, ale.getToIndex()); + + for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) { + applyState(null, "recovery", replicatedLog.get(i).getData()); + } + context.setLastApplied(ale.getToIndex()); + context.setCommitIndex(ale.getToIndex()); } else if (message instanceof DeleteEntries) { replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex()); @@ -187,6 +196,15 @@ public abstract class RaftActor extends UntypedPersistentActor { applyState(applyState.getClientActor(), applyState.getIdentifier(), applyState.getReplicatedLogEntry().getData()); + } else if (message instanceof ApplyLogEntries){ + ApplyLogEntries ale = (ApplyLogEntries) message; + LOG.info("Persisting ApplyLogEntries with index={}", ale.getToIndex()); + persist(new ApplyLogEntries(ale.getToIndex()), new Procedure() { + @Override + public void apply(ApplyLogEntries param) throws Exception { + } + }); + } else if(message instanceof ApplySnapshot ) { Snapshot snapshot = ((ApplySnapshot) message).getSnapshot(); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplyLogEntries.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplyLogEntries.java new file mode 100644 index 0000000000..af3c4fd87d --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplyLogEntries.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft.base.messages; + +import java.io.Serializable; + +/** + * ApplyLogEntries serves as a message which is stored in the akka's persistent + * journal. + * During recovery if this message is found, then all in-mem journal entries from + * context.lastApplied to ApplyLogEntries.toIndex are applied to the state + * + * This class is also used as a internal message sent from Behaviour to + * RaftActor to persist the ApplyLogEntries + * + */ +public class ApplyLogEntries implements Serializable { + private final int toIndex; + + public ApplyLogEntries(int toIndex) { + this.toIndex = toIndex; + } + + public int getToIndex() { + return toIndex; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index 35d563b784..b1560a5648 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -15,6 +15,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.SerializationUtils; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; @@ -347,6 +348,12 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } context.getLogger().debug("Setting last applied to {}", newLastApplied); context.setLastApplied(newLastApplied); + + // send a message to persist a ApplyLogEntries marker message into akka's persistent journal + // will be used during recovery + //in case if the above code throws an error and this message is not sent, it would be fine + // as the append entries received later would initiate add this message to the journal + actor().tell(new ApplyLogEntries((int) context.getLastApplied()), actor()); } protected Object fromSerializableMessage(Object serializable){ diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index 9b099c2aba..998c198756 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -9,12 +9,21 @@ import akka.japi.Creator; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; import com.google.protobuf.ByteString; +import org.junit.After; import org.junit.Test; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; +import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal; import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -25,13 +34,21 @@ import static junit.framework.TestCase.assertEquals; public class RaftActorTest extends AbstractActorTest { + @After + public void tearDown() { + MockAkkaJournal.clearJournal(); + MockSnapshotStore.setMockSnapshot(null); + } + public static class MockRaftActor extends RaftActor { - boolean applySnapshotCalled = false; + private boolean applySnapshotCalled = false; + private List state; public MockRaftActor(String id, Map peerAddresses) { super(id, peerAddresses); + state = new ArrayList<>(); } public RaftActorContext getRaftActorContext() { @@ -42,6 +59,10 @@ public class RaftActorTest extends AbstractActorTest { return applySnapshotCalled; } + public List getState() { + return state; + } + public static Props props(final String id, final Map peerAddresses){ return Props.create(new Creator(){ @@ -51,9 +72,8 @@ public class RaftActorTest extends AbstractActorTest { }); } - @Override protected void applyState(ActorRef clientActor, - String identifier, - Object data) { + @Override protected void applyState(ActorRef clientActor, String identifier, Object data) { + state.add(data); } @Override protected void createSnapshot() { @@ -61,7 +81,17 @@ public class RaftActorTest extends AbstractActorTest { } @Override protected void applySnapshot(ByteString snapshot) { - applySnapshotCalled = true; + applySnapshotCalled = true; + try { + Object data = toObject(snapshot); + if (data instanceof List) { + state.addAll((List) data); + } + } catch (ClassNotFoundException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } } @Override protected void onStateChanged() { @@ -71,6 +101,26 @@ public class RaftActorTest extends AbstractActorTest { return this.getId(); } + private Object toObject(ByteString bs) throws ClassNotFoundException, IOException { + Object obj = null; + ByteArrayInputStream bis = null; + ObjectInputStream ois = null; + try { + bis = new ByteArrayInputStream(bs.toByteArray()); + ois = new ObjectInputStream(bis); + obj = ois.readObject(); + } finally { + if (bis != null) { + bis.close(); + } + if (ois != null) { + ois.close(); + } + } + return obj; + } + + } @@ -151,7 +201,7 @@ public class RaftActorTest extends AbstractActorTest { } @Test - public void testActorRecovery() { + public void testRaftActorRecovery() { new JavaTestKit(getSystem()) {{ new Within(duration("1 seconds")) { protected void run() { @@ -161,20 +211,50 @@ public class RaftActorTest extends AbstractActorTest { ActorRef followerActor = getSystem().actorOf( MockRaftActor.props(persistenceId, Collections.EMPTY_MAP), persistenceId); + List snapshotUnappliedEntries = new ArrayList<>(); + ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("E")); + snapshotUnappliedEntries.add(entry1); + + int lastAppliedDuringSnapshotCapture = 3; + int lastIndexDuringSnapshotCapture = 4; + ByteString snapshotBytes = null; + try { + // 4 messages as part of snapshot, which are applied to state + snapshotBytes = fromObject(Arrays.asList(new MockRaftActorContext.MockPayload("A"), + new MockRaftActorContext.MockPayload("B"), + new MockRaftActorContext.MockPayload("C"), + new MockRaftActorContext.MockPayload("D"))); + } catch (Exception e) { + e.printStackTrace(); + } + Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(), + snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 , + lastAppliedDuringSnapshotCapture, 1); + MockSnapshotStore.setMockSnapshot(snapshot); + MockSnapshotStore.setPersistenceId(persistenceId); + + // add more entries after snapshot is taken List entries = new ArrayList<>(); - ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("E")); ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5, new MockRaftActorContext.MockPayload("F")); - entries.add(entry1); + ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6, new MockRaftActorContext.MockPayload("G")); + ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7, new MockRaftActorContext.MockPayload("H")); entries.add(entry2); + entries.add(entry3); + entries.add(entry4); - int lastApplied = 3; - int lastIndex = 5; - Snapshot snapshot = Snapshot.create("A B C D".getBytes(), entries, lastIndex, 1 , lastApplied, 1); - MockSnapshotStore.setMockSnapshot(snapshot); - MockSnapshotStore.setPersistenceId(persistenceId); + int lastAppliedToState = 5; + int lastIndex = 7; + + MockAkkaJournal.addToJournal(5, entry2); + // 2 entries are applied to state besides the 4 entries in snapshot + MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState)); + MockAkkaJournal.addToJournal(7, entry3); + MockAkkaJournal.addToJournal(8, entry4); + // kill the actor followerActor.tell(PoisonPill.getInstance(), null); + try { // give some time for actor to die Thread.sleep(200); @@ -182,24 +262,47 @@ public class RaftActorTest extends AbstractActorTest { e.printStackTrace(); } - TestActorRef ref = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, Collections.EMPTY_MAP)); + //reinstate the actor + TestActorRef ref = TestActorRef.create(getSystem(), + MockRaftActor.props(persistenceId, Collections.EMPTY_MAP)); + try { //give some time for snapshot offer to get called. Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } + RaftActorContext context = ref.underlyingActor().getRaftActorContext(); - assertEquals(entries.size(), context.getReplicatedLog().size()); - assertEquals(lastApplied, context.getLastApplied()); - assertEquals(lastApplied, context.getCommitIndex()); + assertEquals(snapshotUnappliedEntries.size() + entries.size(), context.getReplicatedLog().size()); + assertEquals(lastIndex, context.getReplicatedLog().lastIndex()); + assertEquals(lastAppliedToState, context.getLastApplied()); + assertEquals(lastAppliedToState, context.getCommitIndex()); assertTrue(ref.underlyingActor().isApplySnapshotCalled()); + assertEquals(6, ref.underlyingActor().getState().size()); } - }; }}; } - + private ByteString fromObject(Object snapshot) throws Exception { + ByteArrayOutputStream b = null; + ObjectOutputStream o = null; + try { + b = new ByteArrayOutputStream(); + o = new ObjectOutputStream(b); + o.writeObject(snapshot); + byte[] snapshotBytes = b.toByteArray(); + return ByteString.copyFrom(snapshotBytes); + } finally { + if (o != null) { + o.flush(); + o.close(); + } + if (b != null) { + b.close(); + } + } + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MockAkkaJournal.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MockAkkaJournal.java new file mode 100644 index 0000000000..85edc07bc5 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MockAkkaJournal.java @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft.utils; + +import akka.dispatch.Futures; +import akka.japi.Procedure; +import akka.persistence.PersistentConfirmation; +import akka.persistence.PersistentId; +import akka.persistence.PersistentImpl; +import akka.persistence.PersistentRepr; +import akka.persistence.journal.japi.AsyncWriteJournal; +import com.google.common.collect.Maps; +import scala.concurrent.Future; + +import java.util.Map; +import java.util.concurrent.Callable; + +public class MockAkkaJournal extends AsyncWriteJournal { + + private static Map journal = Maps.newHashMap(); + + public static void addToJournal(long sequenceNr, Object message) { + journal.put(sequenceNr, message); + } + + public static void clearJournal() { + journal.clear(); + } + + @Override + public Future doAsyncReplayMessages(final String persistenceId, long fromSequenceNr, + long toSequenceNr, long max, final Procedure replayCallback) { + + return Futures.future(new Callable() { + @Override + public Void call() throws Exception { + for (Map.Entry entry : journal.entrySet()) { + PersistentRepr persistentMessage = + new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId, false, null, null); + replayCallback.apply(persistentMessage); + } + return null; + } + }, context().dispatcher()); + } + + @Override + public Future doAsyncReadHighestSequenceNr(String s, long l) { + return Futures.successful(new Long(0)); + } + + @Override + public Future doAsyncWriteMessages(Iterable persistentReprs) { + return Futures.successful(null); + } + + @Override + public Future doAsyncWriteConfirmations(Iterable persistentConfirmations) { + return Futures.successful(null); + } + + @Override + public Future doAsyncDeleteMessages(Iterable persistentIds, boolean b) { + return Futures.successful(null); + } + + @Override + public Future doAsyncDeleteMessagesTo(String s, long l, boolean b) { + return Futures.successful(null); + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/resources/application.conf b/opendaylight/md-sal/sal-akka-raft/src/test/resources/application.conf index 6b2cc22038..2f53d4a4eb 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/resources/application.conf +++ b/opendaylight/md-sal/sal-akka-raft/src/test/resources/application.conf @@ -1,5 +1,6 @@ akka { persistence.snapshot-store.plugin = "mock-snapshot-store" + persistence.journal.plugin = "mock-journal" loglevel = "DEBUG" loggers = ["akka.testkit.TestEventListener", "akka.event.slf4j.Slf4jLogger"] @@ -28,3 +29,10 @@ mock-snapshot-store { # Dispatcher for the plugin actor. plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" } + +mock-journal { + # Class name of the plugin. + class = "org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal" + # Dispatcher for the plugin actor. + plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" +} -- 2.36.6