1 package org.opendaylight.controller.cluster.raft;
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.PoisonPill;
6 import akka.actor.Props;
7 import akka.actor.Terminated;
8 import akka.event.Logging;
9 import akka.japi.Creator;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.protobuf.ByteString;
14 import org.junit.After;
15 import org.junit.Test;
16 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
17 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
18 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
19 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
20 import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
21 import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
22 import scala.concurrent.duration.FiniteDuration;
23 import java.io.ByteArrayInputStream;
24 import java.io.ByteArrayOutputStream;
25 import java.io.IOException;
26 import java.io.ObjectInputStream;
27 import java.io.ObjectOutputStream;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.Collections;
31 import java.util.List;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.TimeUnit;
35 import static org.junit.Assert.assertEquals;
37 public class RaftActorTest extends AbstractActorTest {
41 public void tearDown() {
42 MockAkkaJournal.clearJournal();
43 MockSnapshotStore.setMockSnapshot(null);
46 public static class MockRaftActor extends RaftActor {
48 public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
49 private final Map<String, String> peerAddresses;
50 private final String id;
51 private final Optional<ConfigParams> config;
53 private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
54 Optional<ConfigParams> config) {
55 this.peerAddresses = peerAddresses;
61 public MockRaftActor create() throws Exception {
62 return new MockRaftActor(id, peerAddresses, config);
66 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
67 private final List<Object> state;
69 public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config) {
70 super(id, peerAddresses, config);
71 state = new ArrayList<>();
74 public void waitForRecoveryComplete() {
76 assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
77 } catch (InterruptedException e) {
82 public List<Object> getState() {
86 public static Props props(final String id, final Map<String, String> peerAddresses,
87 Optional<ConfigParams> config){
88 return Props.create(new MockRaftActorCreator(peerAddresses, id, config));
91 @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
95 protected void startLogRecoveryBatch(int maxBatchSize) {
99 protected void appendRecoveredLogEntry(Payload data) {
104 protected void applyCurrentLogRecoveryBatch() {
108 protected void onRecoveryComplete() {
109 recoveryComplete.countDown();
113 protected void applyRecoverySnapshot(ByteString snapshot) {
115 Object data = toObject(snapshot);
116 System.out.println("!!!!!applyRecoverySnapshot: "+data);
117 if (data instanceof List) {
118 state.addAll((List) data);
120 } catch (Exception e) {
125 @Override protected void createSnapshot() {
126 throw new UnsupportedOperationException("createSnapshot");
129 @Override protected void applySnapshot(ByteString snapshot) {
132 @Override protected void onStateChanged() {
135 @Override public String persistenceId() {
139 private Object toObject(ByteString bs) throws ClassNotFoundException, IOException {
141 ByteArrayInputStream bis = null;
142 ObjectInputStream ois = null;
144 bis = new ByteArrayInputStream(bs.toByteArray());
145 ois = new ObjectInputStream(bis);
146 obj = ois.readObject();
162 private static class RaftActorTestKit extends JavaTestKit {
163 private final ActorRef raftActor;
165 public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
168 raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
169 Collections.EMPTY_MAP, Optional.<ConfigParams>absent()), actorName);
174 public boolean waitForStartup(){
175 // Wait for a specific log message to show up
177 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
180 protected Boolean run() {
183 }.from(raftActor.path().toString())
184 .message("Switching from state Candidate to Leader")
185 .occurrences(1).exec();
190 public void findLeader(final String expectedLeader){
191 raftActor.tell(new FindLeader(), getRef());
193 FindLeaderReply reply = expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
194 assertEquals("getLeaderActor", expectedLeader, reply.getLeaderActor());
197 public ActorRef getRaftActor() {
204 public void testConstruction() {
205 boolean started = new RaftActorTestKit(getSystem(), "testConstruction").waitForStartup();
206 assertEquals(true, started);
210 public void testFindLeaderWhenLeaderIsSelf(){
211 RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
212 kit.waitForStartup();
213 kit.findLeader(kit.getRaftActor().path().toString());
217 public void testRaftActorRecovery() throws Exception {
218 new JavaTestKit(getSystem()) {{
219 String persistenceId = "follower10";
221 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
222 // Set the heartbeat interval high to essentially disable election otherwise the test
223 // may fail if the actor is switched to Leader and the commitIndex is set to the last
225 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
227 ActorRef followerActor = getSystem().actorOf(MockRaftActor.props(persistenceId,
228 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config)), persistenceId);
230 watch(followerActor);
232 List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
233 ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
234 new MockRaftActorContext.MockPayload("E"));
235 snapshotUnappliedEntries.add(entry1);
237 int lastAppliedDuringSnapshotCapture = 3;
238 int lastIndexDuringSnapshotCapture = 4;
240 // 4 messages as part of snapshot, which are applied to state
241 ByteString snapshotBytes = fromObject(Arrays.asList(
242 new MockRaftActorContext.MockPayload("A"),
243 new MockRaftActorContext.MockPayload("B"),
244 new MockRaftActorContext.MockPayload("C"),
245 new MockRaftActorContext.MockPayload("D")));
247 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
248 snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 ,
249 lastAppliedDuringSnapshotCapture, 1);
250 MockSnapshotStore.setMockSnapshot(snapshot);
251 MockSnapshotStore.setPersistenceId(persistenceId);
253 // add more entries after snapshot is taken
254 List<ReplicatedLogEntry> entries = new ArrayList<>();
255 ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
256 new MockRaftActorContext.MockPayload("F"));
257 ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
258 new MockRaftActorContext.MockPayload("G"));
259 ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
260 new MockRaftActorContext.MockPayload("H"));
265 int lastAppliedToState = 5;
268 MockAkkaJournal.addToJournal(5, entry2);
269 // 2 entries are applied to state besides the 4 entries in snapshot
270 MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState));
271 MockAkkaJournal.addToJournal(7, entry3);
272 MockAkkaJournal.addToJournal(8, entry4);
275 followerActor.tell(PoisonPill.getInstance(), null);
276 expectMsgClass(duration("5 seconds"), Terminated.class);
278 unwatch(followerActor);
280 //reinstate the actor
281 TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(),
282 MockRaftActor.props(persistenceId, Collections.EMPTY_MAP,
283 Optional.<ConfigParams>of(config)));
285 ref.underlyingActor().waitForRecoveryComplete();
287 RaftActorContext context = ref.underlyingActor().getRaftActorContext();
288 assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
289 context.getReplicatedLog().size());
290 assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
291 assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
292 assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
293 assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
297 private ByteString fromObject(Object snapshot) throws Exception {
298 ByteArrayOutputStream b = null;
299 ObjectOutputStream o = null;
301 b = new ByteArrayOutputStream();
302 o = new ObjectOutputStream(b);
303 o.writeObject(snapshot);
304 byte[] snapshotBytes = b.toByteArray();
305 return ByteString.copyFrom(snapshotBytes);