1 package org.opendaylight.controller.cluster.raft;
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.PoisonPill;
6 import akka.actor.Props;
7 import akka.event.Logging;
8 import akka.japi.Creator;
9 import akka.testkit.JavaTestKit;
10 import akka.testkit.TestActorRef;
11 import com.google.protobuf.ByteString;
12 import org.junit.After;
13 import org.junit.Test;
14 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
15 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
16 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
17 import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
18 import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
20 import java.io.ByteArrayInputStream;
21 import java.io.ByteArrayOutputStream;
22 import java.io.IOException;
23 import java.io.ObjectInputStream;
24 import java.io.ObjectOutputStream;
25 import java.util.ArrayList;
26 import java.util.Arrays;
27 import java.util.Collections;
28 import java.util.List;
31 import static junit.framework.Assert.assertTrue;
32 import static junit.framework.TestCase.assertEquals;
34 public class RaftActorTest extends AbstractActorTest {
38 public void tearDown() {
39 MockAkkaJournal.clearJournal();
40 MockSnapshotStore.setMockSnapshot(null);
43 public static class MockRaftActor extends RaftActor {
45 private boolean applySnapshotCalled = false;
46 private List<Object> state;
48 public MockRaftActor(String id,
49 Map<String, String> peerAddresses) {
50 super(id, peerAddresses);
51 state = new ArrayList<>();
54 public RaftActorContext getRaftActorContext() {
58 public boolean isApplySnapshotCalled() {
59 return applySnapshotCalled;
62 public List<Object> getState() {
66 public static Props props(final String id, final Map<String, String> peerAddresses){
67 return Props.create(new Creator<MockRaftActor>(){
69 @Override public MockRaftActor create() throws Exception {
70 return new MockRaftActor(id, peerAddresses);
75 @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
79 @Override protected void createSnapshot() {
80 throw new UnsupportedOperationException("createSnapshot");
83 @Override protected void applySnapshot(ByteString snapshot) {
84 applySnapshotCalled = true;
86 Object data = toObject(snapshot);
87 if (data instanceof List) {
88 state.addAll((List) data);
90 } catch (ClassNotFoundException e) {
92 } catch (IOException e) {
97 @Override protected void onStateChanged() {
100 @Override public String persistenceId() {
104 private Object toObject(ByteString bs) throws ClassNotFoundException, IOException {
106 ByteArrayInputStream bis = null;
107 ObjectInputStream ois = null;
109 bis = new ByteArrayInputStream(bs.toByteArray());
110 ois = new ObjectInputStream(bis);
111 obj = ois.readObject();
127 private static class RaftActorTestKit extends JavaTestKit {
128 private final ActorRef raftActor;
130 public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
133 raftActor = this.getSystem()
134 .actorOf(MockRaftActor.props(actorName,
135 Collections.EMPTY_MAP), actorName);
140 public boolean waitForStartup(){
141 // Wait for a specific log message to show up
143 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
145 protected Boolean run() {
148 }.from(raftActor.path().toString())
149 .message("Switching from state Candidate to Leader")
150 .occurrences(1).exec();
155 public void findLeader(final String expectedLeader){
158 new Within(duration("1 seconds")) {
159 protected void run() {
161 raftActor.tell(new FindLeader(), getRef());
163 String s = new ExpectMsg<String>(duration("1 seconds"),
165 // do not put code outside this method, will run afterwards
166 protected String match(Object in) {
167 if (in instanceof FindLeaderReply) {
168 return ((FindLeaderReply) in).getLeaderActor();
173 }.get();// this extracts the received message
175 assertEquals(expectedLeader, s);
183 public ActorRef getRaftActor() {
191 public void testConstruction() {
192 boolean started = new RaftActorTestKit(getSystem(), "testConstruction").waitForStartup();
193 assertEquals(true, started);
197 public void testFindLeaderWhenLeaderIsSelf(){
198 RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
199 kit.waitForStartup();
200 kit.findLeader(kit.getRaftActor().path().toString());
204 public void testRaftActorRecovery() {
205 new JavaTestKit(getSystem()) {{
206 new Within(duration("1 seconds")) {
207 protected void run() {
209 String persistenceId = "follower10";
211 ActorRef followerActor = getSystem().actorOf(
212 MockRaftActor.props(persistenceId, Collections.EMPTY_MAP), persistenceId);
214 List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
215 ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("E"));
216 snapshotUnappliedEntries.add(entry1);
218 int lastAppliedDuringSnapshotCapture = 3;
219 int lastIndexDuringSnapshotCapture = 4;
221 ByteString snapshotBytes = null;
223 // 4 messages as part of snapshot, which are applied to state
224 snapshotBytes = fromObject(Arrays.asList(new MockRaftActorContext.MockPayload("A"),
225 new MockRaftActorContext.MockPayload("B"),
226 new MockRaftActorContext.MockPayload("C"),
227 new MockRaftActorContext.MockPayload("D")));
228 } catch (Exception e) {
231 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
232 snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 ,
233 lastAppliedDuringSnapshotCapture, 1);
234 MockSnapshotStore.setMockSnapshot(snapshot);
235 MockSnapshotStore.setPersistenceId(persistenceId);
237 // add more entries after snapshot is taken
238 List<ReplicatedLogEntry> entries = new ArrayList<>();
239 ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5, new MockRaftActorContext.MockPayload("F"));
240 ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6, new MockRaftActorContext.MockPayload("G"));
241 ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7, new MockRaftActorContext.MockPayload("H"));
246 int lastAppliedToState = 5;
249 MockAkkaJournal.addToJournal(5, entry2);
250 // 2 entries are applied to state besides the 4 entries in snapshot
251 MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState));
252 MockAkkaJournal.addToJournal(7, entry3);
253 MockAkkaJournal.addToJournal(8, entry4);
256 followerActor.tell(PoisonPill.getInstance(), null);
259 // give some time for actor to die
261 } catch (InterruptedException e) {
265 //reinstate the actor
266 TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(),
267 MockRaftActor.props(persistenceId, Collections.EMPTY_MAP));
270 //give some time for snapshot offer to get called.
272 } catch (InterruptedException e) {
276 RaftActorContext context = ref.underlyingActor().getRaftActorContext();
277 assertEquals(snapshotUnappliedEntries.size() + entries.size(), context.getReplicatedLog().size());
278 assertEquals(lastIndex, context.getReplicatedLog().lastIndex());
279 assertEquals(lastAppliedToState, context.getLastApplied());
280 assertEquals(lastAppliedToState, context.getCommitIndex());
281 assertTrue(ref.underlyingActor().isApplySnapshotCalled());
282 assertEquals(6, ref.underlyingActor().getState().size());
289 private ByteString fromObject(Object snapshot) throws Exception {
290 ByteArrayOutputStream b = null;
291 ObjectOutputStream o = null;
293 b = new ByteArrayOutputStream();
294 o = new ObjectOutputStream(b);
295 o.writeObject(snapshot);
296 byte[] snapshotBytes = b.toByteArray();
297 return ByteString.copyFrom(snapshotBytes);