import akka.persistence.UntypedPersistentActor;
import com.google.common.base.Optional;
import com.google.protobuf.ByteString;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
} else if (message instanceof ReplicatedLogEntry) {
ReplicatedLogEntry logEntry = (ReplicatedLogEntry) message;
-
- // Apply State immediately
+ LOG.info("Received ReplicatedLogEntry for recovery:{}", logEntry.getIndex());
replicatedLog.append(logEntry);
- applyState(null, "recovery", logEntry.getData());
- context.setLastApplied(logEntry.getIndex());
- context.setCommitIndex(logEntry.getIndex());
+
+ } else if (message instanceof ApplyLogEntries) {
+ ApplyLogEntries ale = (ApplyLogEntries) message;
+
+ LOG.info("Received ApplyLogEntries for recovery, applying to state:{} to {}",
+ context.getLastApplied() + 1, ale.getToIndex());
+
+ for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) {
+ applyState(null, "recovery", replicatedLog.get(i).getData());
+ }
+ context.setLastApplied(ale.getToIndex());
+ context.setCommitIndex(ale.getToIndex());
} else if (message instanceof DeleteEntries) {
replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
applyState(applyState.getClientActor(), applyState.getIdentifier(),
applyState.getReplicatedLogEntry().getData());
+ } else if (message instanceof ApplyLogEntries){
+ ApplyLogEntries ale = (ApplyLogEntries) message;
+ LOG.info("Persisting ApplyLogEntries with index={}", ale.getToIndex());
+ persist(new ApplyLogEntries(ale.getToIndex()), new Procedure<ApplyLogEntries>() {
+ @Override
+ public void apply(ApplyLogEntries param) throws Exception {
+ }
+ });
+
} else if(message instanceof ApplySnapshot ) {
Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import com.google.protobuf.ByteString;
+import org.junit.After;
import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class RaftActorTest extends AbstractActorTest {
+ @After
+ public void tearDown() {
+ MockAkkaJournal.clearJournal();
+ MockSnapshotStore.setMockSnapshot(null);
+ }
+
public static class MockRaftActor extends RaftActor {
- boolean applySnapshotCalled = false;
+ private boolean applySnapshotCalled = false;
+ private List<Object> state;
public MockRaftActor(String id,
Map<String, String> peerAddresses) {
super(id, peerAddresses);
+ state = new ArrayList<>();
}
public RaftActorContext getRaftActorContext() {
return applySnapshotCalled;
}
+ public List<Object> getState() {
+ return state;
+ }
+
public static Props props(final String id, final Map<String, String> peerAddresses){
return Props.create(new Creator<MockRaftActor>(){
});
}
- @Override protected void applyState(ActorRef clientActor,
- String identifier,
- Object data) {
+ @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
+ state.add(data);
}
@Override protected void createSnapshot() {
}
@Override protected void applySnapshot(ByteString snapshot) {
- applySnapshotCalled = true;
+ applySnapshotCalled = true;
+ try {
+ Object data = toObject(snapshot);
+ if (data instanceof List) {
+ state.addAll((List) data);
+ }
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
}
@Override protected void onStateChanged() {
return this.getId();
}
+ private Object toObject(ByteString bs) throws ClassNotFoundException, IOException {
+ Object obj = null;
+ ByteArrayInputStream bis = null;
+ ObjectInputStream ois = null;
+ try {
+ bis = new ByteArrayInputStream(bs.toByteArray());
+ ois = new ObjectInputStream(bis);
+ obj = ois.readObject();
+ } finally {
+ if (bis != null) {
+ bis.close();
+ }
+ if (ois != null) {
+ ois.close();
+ }
+ }
+ return obj;
+ }
+
+
}
}
@Test
- public void testActorRecovery() {
+ public void testRaftActorRecovery() {
new JavaTestKit(getSystem()) {{
new Within(duration("1 seconds")) {
protected void run() {
ActorRef followerActor = getSystem().actorOf(
MockRaftActor.props(persistenceId, Collections.EMPTY_MAP), persistenceId);
+ List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
+ ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("E"));
+ snapshotUnappliedEntries.add(entry1);
+
+ int lastAppliedDuringSnapshotCapture = 3;
+ int lastIndexDuringSnapshotCapture = 4;
+ ByteString snapshotBytes = null;
+ try {
+ // 4 messages as part of snapshot, which are applied to state
+ snapshotBytes = fromObject(Arrays.asList(new MockRaftActorContext.MockPayload("A"),
+ new MockRaftActorContext.MockPayload("B"),
+ new MockRaftActorContext.MockPayload("C"),
+ new MockRaftActorContext.MockPayload("D")));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
+ snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 ,
+ lastAppliedDuringSnapshotCapture, 1);
+ MockSnapshotStore.setMockSnapshot(snapshot);
+ MockSnapshotStore.setPersistenceId(persistenceId);
+
+ // add more entries after snapshot is taken
List<ReplicatedLogEntry> entries = new ArrayList<>();
- ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("E"));
ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5, new MockRaftActorContext.MockPayload("F"));
- entries.add(entry1);
+ ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6, new MockRaftActorContext.MockPayload("G"));
+ ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7, new MockRaftActorContext.MockPayload("H"));
entries.add(entry2);
+ entries.add(entry3);
+ entries.add(entry4);
- int lastApplied = 3;
- int lastIndex = 5;
- Snapshot snapshot = Snapshot.create("A B C D".getBytes(), entries, lastIndex, 1 , lastApplied, 1);
- MockSnapshotStore.setMockSnapshot(snapshot);
- MockSnapshotStore.setPersistenceId(persistenceId);
+ int lastAppliedToState = 5;
+ int lastIndex = 7;
+
+ MockAkkaJournal.addToJournal(5, entry2);
+ // 2 entries are applied to state besides the 4 entries in snapshot
+ MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState));
+ MockAkkaJournal.addToJournal(7, entry3);
+ MockAkkaJournal.addToJournal(8, entry4);
+ // kill the actor
followerActor.tell(PoisonPill.getInstance(), null);
+
try {
// give some time for actor to die
Thread.sleep(200);
e.printStackTrace();
}
- TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, Collections.EMPTY_MAP));
+ //reinstate the actor
+ TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(),
+ MockRaftActor.props(persistenceId, Collections.EMPTY_MAP));
+
try {
//give some time for snapshot offer to get called.
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
+
RaftActorContext context = ref.underlyingActor().getRaftActorContext();
- assertEquals(entries.size(), context.getReplicatedLog().size());
- assertEquals(lastApplied, context.getLastApplied());
- assertEquals(lastApplied, context.getCommitIndex());
+ assertEquals(snapshotUnappliedEntries.size() + entries.size(), context.getReplicatedLog().size());
+ assertEquals(lastIndex, context.getReplicatedLog().lastIndex());
+ assertEquals(lastAppliedToState, context.getLastApplied());
+ assertEquals(lastAppliedToState, context.getCommitIndex());
assertTrue(ref.underlyingActor().isApplySnapshotCalled());
+ assertEquals(6, ref.underlyingActor().getState().size());
}
-
};
}};
}
-
+ private ByteString fromObject(Object snapshot) throws Exception {
+ ByteArrayOutputStream b = null;
+ ObjectOutputStream o = null;
+ try {
+ b = new ByteArrayOutputStream();
+ o = new ObjectOutputStream(b);
+ o.writeObject(snapshot);
+ byte[] snapshotBytes = b.toByteArray();
+ return ByteString.copyFrom(snapshotBytes);
+ } finally {
+ if (o != null) {
+ o.flush();
+ o.close();
+ }
+ if (b != null) {
+ b.close();
+ }
+ }
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.utils;
+
+import akka.dispatch.Futures;
+import akka.japi.Procedure;
+import akka.persistence.PersistentConfirmation;
+import akka.persistence.PersistentId;
+import akka.persistence.PersistentImpl;
+import akka.persistence.PersistentRepr;
+import akka.persistence.journal.japi.AsyncWriteJournal;
+import com.google.common.collect.Maps;
+import scala.concurrent.Future;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+public class MockAkkaJournal extends AsyncWriteJournal {
+
+ private static Map<Long, Object> journal = Maps.newHashMap();
+
+ public static void addToJournal(long sequenceNr, Object message) {
+ journal.put(sequenceNr, message);
+ }
+
+ public static void clearJournal() {
+ journal.clear();
+ }
+
+ @Override
+ public Future<Void> doAsyncReplayMessages(final String persistenceId, long fromSequenceNr,
+ long toSequenceNr, long max, final Procedure<PersistentRepr> replayCallback) {
+
+ return Futures.future(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ for (Map.Entry<Long,Object> entry : journal.entrySet()) {
+ PersistentRepr persistentMessage =
+ new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId, false, null, null);
+ replayCallback.apply(persistentMessage);
+ }
+ return null;
+ }
+ }, context().dispatcher());
+ }
+
+ @Override
+ public Future<Long> doAsyncReadHighestSequenceNr(String s, long l) {
+ return Futures.successful(new Long(0));
+ }
+
+ @Override
+ public Future<Void> doAsyncWriteMessages(Iterable<PersistentRepr> persistentReprs) {
+ return Futures.successful(null);
+ }
+
+ @Override
+ public Future<Void> doAsyncWriteConfirmations(Iterable<PersistentConfirmation> persistentConfirmations) {
+ return Futures.successful(null);
+ }
+
+ @Override
+ public Future<Void> doAsyncDeleteMessages(Iterable<PersistentId> persistentIds, boolean b) {
+ return Futures.successful(null);
+ }
+
+ @Override
+ public Future<Void> doAsyncDeleteMessagesTo(String s, long l, boolean b) {
+ return Futures.successful(null);
+ }
+}