Merge "BUG 1839 - HTTP delete of non existing data"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
1 package org.opendaylight.controller.cluster.raft;
2
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.PoisonPill;
6 import akka.actor.Props;
7 import akka.actor.Terminated;
8 import akka.event.Logging;
9 import akka.japi.Creator;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.base.Optional;
13 import com.google.protobuf.ByteString;
14 import org.junit.After;
15 import org.junit.Test;
16 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
17 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
18 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
19 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
20 import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
21 import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
22 import scala.concurrent.duration.FiniteDuration;
23 import java.io.ByteArrayInputStream;
24 import java.io.ByteArrayOutputStream;
25 import java.io.IOException;
26 import java.io.ObjectInputStream;
27 import java.io.ObjectOutputStream;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.Collections;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.TimeUnit;
35 import static org.junit.Assert.assertEquals;
36
37 public class RaftActorTest extends AbstractActorTest {
38
39
40     @After
41     public void tearDown() {
42         MockAkkaJournal.clearJournal();
43         MockSnapshotStore.setMockSnapshot(null);
44     }
45
46     public static class MockRaftActor extends RaftActor {
47
48         public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
49             private final Map<String, String> peerAddresses;
50             private final String id;
51             private final Optional<ConfigParams> config;
52
53             private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
54                     Optional<ConfigParams> config) {
55                 this.peerAddresses = peerAddresses;
56                 this.id = id;
57                 this.config = config;
58             }
59
60             @Override
61             public MockRaftActor create() throws Exception {
62                 return new MockRaftActor(id, peerAddresses, config);
63             }
64         }
65
66         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
67         private final List<Object> state;
68
69         public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config) {
70             super(id, peerAddresses, config);
71             state = new ArrayList<>();
72         }
73
74         public void waitForRecoveryComplete() {
75             try {
76                 assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
77             } catch (InterruptedException e) {
78                 e.printStackTrace();
79             }
80         }
81
82         public List<Object> getState() {
83             return state;
84         }
85
86         public static Props props(final String id, final Map<String, String> peerAddresses,
87                 Optional<ConfigParams> config){
88             return Props.create(new MockRaftActorCreator(peerAddresses, id, config));
89         }
90
91         @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
92         }
93
94         @Override
95         protected void startLogRecoveryBatch(int maxBatchSize) {
96         }
97
98         @Override
99         protected void appendRecoveredLogEntry(Payload data) {
100             state.add(data);
101         }
102
103         @Override
104         protected void applyCurrentLogRecoveryBatch() {
105         }
106
107         @Override
108         protected void onRecoveryComplete() {
109             recoveryComplete.countDown();
110         }
111
112         @Override
113         protected void applyRecoverySnapshot(ByteString snapshot) {
114             try {
115                 Object data = toObject(snapshot);
116                 System.out.println("!!!!!applyRecoverySnapshot: "+data);
117                 if (data instanceof List) {
118                     state.addAll((List) data);
119                 }
120             } catch (Exception e) {
121                 e.printStackTrace();
122             }
123         }
124
125         @Override protected void createSnapshot() {
126             throw new UnsupportedOperationException("createSnapshot");
127         }
128
129         @Override protected void applySnapshot(ByteString snapshot) {
130         }
131
132         @Override protected void onStateChanged() {
133         }
134
135         @Override public String persistenceId() {
136             return this.getId();
137         }
138
139         private Object toObject(ByteString bs) throws ClassNotFoundException, IOException {
140             Object obj = null;
141             ByteArrayInputStream bis = null;
142             ObjectInputStream ois = null;
143             try {
144                 bis = new ByteArrayInputStream(bs.toByteArray());
145                 ois = new ObjectInputStream(bis);
146                 obj = ois.readObject();
147             } finally {
148                 if (bis != null) {
149                     bis.close();
150                 }
151                 if (ois != null) {
152                     ois.close();
153                 }
154             }
155             return obj;
156         }
157
158
159     }
160
161
162     private static class RaftActorTestKit extends JavaTestKit {
163         private final ActorRef raftActor;
164
165         public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
166             super(actorSystem);
167
168             raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
169                     Collections.EMPTY_MAP, Optional.<ConfigParams>absent()), actorName);
170
171         }
172
173
174         public boolean waitForStartup(){
175             // Wait for a specific log message to show up
176             return
177                 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
178                 ) {
179                     @Override
180                     protected Boolean run() {
181                         return true;
182                     }
183                 }.from(raftActor.path().toString())
184                     .message("Switching from behavior Candidate to Leader")
185                     .occurrences(1).exec();
186
187
188         }
189
190         public void findLeader(final String expectedLeader){
191             raftActor.tell(new FindLeader(), getRef());
192
193             FindLeaderReply reply = expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
194             assertEquals("getLeaderActor", expectedLeader, reply.getLeaderActor());
195         }
196
197         public ActorRef getRaftActor() {
198             return raftActor;
199         }
200     }
201
202
203     @Test
204     public void testConstruction() {
205         boolean started = new RaftActorTestKit(getSystem(), "testConstruction").waitForStartup();
206         assertEquals(true, started);
207     }
208
209     @Test
210     public void testFindLeaderWhenLeaderIsSelf(){
211         RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
212         kit.waitForStartup();
213         kit.findLeader(kit.getRaftActor().path().toString());
214     }
215
216     @Test
217     public void testRaftActorRecovery() throws Exception {
218         new JavaTestKit(getSystem()) {{
219             String persistenceId = "follower10";
220
221             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
222             // Set the heartbeat interval high to essentially disable election otherwise the test
223             // may fail if the actor is switched to Leader and the commitIndex is set to the last
224             // log entry.
225             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
226
227             ActorRef followerActor = getSystem().actorOf(MockRaftActor.props(persistenceId,
228                     Collections.EMPTY_MAP, Optional.<ConfigParams>of(config)), persistenceId);
229
230             watch(followerActor);
231
232             List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
233             ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
234                     new MockRaftActorContext.MockPayload("E"));
235             snapshotUnappliedEntries.add(entry1);
236
237             int lastAppliedDuringSnapshotCapture = 3;
238             int lastIndexDuringSnapshotCapture = 4;
239
240                 // 4 messages as part of snapshot, which are applied to state
241             ByteString snapshotBytes  = fromObject(Arrays.asList(
242                         new MockRaftActorContext.MockPayload("A"),
243                         new MockRaftActorContext.MockPayload("B"),
244                         new MockRaftActorContext.MockPayload("C"),
245                         new MockRaftActorContext.MockPayload("D")));
246
247             Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
248                     snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 ,
249                     lastAppliedDuringSnapshotCapture, 1);
250             MockSnapshotStore.setMockSnapshot(snapshot);
251             MockSnapshotStore.setPersistenceId(persistenceId);
252
253             // add more entries after snapshot is taken
254             List<ReplicatedLogEntry> entries = new ArrayList<>();
255             ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
256                     new MockRaftActorContext.MockPayload("F"));
257             ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
258                     new MockRaftActorContext.MockPayload("G"));
259             ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
260                     new MockRaftActorContext.MockPayload("H"));
261             entries.add(entry2);
262             entries.add(entry3);
263             entries.add(entry4);
264
265             int lastAppliedToState = 5;
266             int lastIndex = 7;
267
268             MockAkkaJournal.addToJournal(5, entry2);
269             // 2 entries are applied to state besides the 4 entries in snapshot
270             MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState));
271             MockAkkaJournal.addToJournal(7, entry3);
272             MockAkkaJournal.addToJournal(8, entry4);
273
274             // kill the actor
275             followerActor.tell(PoisonPill.getInstance(), null);
276             expectMsgClass(duration("5 seconds"), Terminated.class);
277
278             unwatch(followerActor);
279
280             //reinstate the actor
281             TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(),
282                     MockRaftActor.props(persistenceId, Collections.EMPTY_MAP,
283                             Optional.<ConfigParams>of(config)));
284
285             ref.underlyingActor().waitForRecoveryComplete();
286
287             RaftActorContext context = ref.underlyingActor().getRaftActorContext();
288             assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
289                     context.getReplicatedLog().size());
290             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
291             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
292             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
293             assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
294         }};
295     }
296
297     private ByteString fromObject(Object snapshot) throws Exception {
298         ByteArrayOutputStream b = null;
299         ObjectOutputStream o = null;
300         try {
301             b = new ByteArrayOutputStream();
302             o = new ObjectOutputStream(b);
303             o.writeObject(snapshot);
304             byte[] snapshotBytes = b.toByteArray();
305             return ByteString.copyFrom(snapshotBytes);
306         } finally {
307             if (o != null) {
308                 o.flush();
309                 o.close();
310             }
311             if (b != null) {
312                 b.close();
313             }
314         }
315     }
316 }