Merge "BUG 2820 - LLDP refactor"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardManagerTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertSame;
6 import static org.junit.Assert.assertTrue;
7 import static org.mockito.Mockito.mock;
8 import static org.mockito.Mockito.never;
9 import static org.mockito.Mockito.times;
10 import static org.mockito.Mockito.verify;
11 import static org.mockito.Mockito.when;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.AddressFromURIString;
15 import akka.actor.Props;
16 import akka.cluster.Cluster;
17 import akka.cluster.ClusterEvent;
18 import akka.dispatch.Dispatchers;
19 import akka.japi.Creator;
20 import akka.pattern.Patterns;
21 import akka.persistence.RecoveryCompleted;
22 import akka.testkit.JavaTestKit;
23 import akka.testkit.TestActorRef;
24 import akka.util.Timeout;
25 import com.google.common.base.Optional;
26 import com.google.common.collect.ImmutableMap;
27 import com.google.common.collect.ImmutableSet;
28 import com.google.common.collect.Sets;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import com.typesafe.config.ConfigFactory;
31 import java.net.URI;
32 import java.util.Arrays;
33 import java.util.Collection;
34 import java.util.HashSet;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.Set;
38 import java.util.concurrent.CountDownLatch;
39 import java.util.concurrent.TimeUnit;
40 import org.junit.After;
41 import org.junit.Before;
42 import org.junit.Test;
43 import org.mockito.Mock;
44 import org.mockito.MockitoAnnotations;
45 import org.opendaylight.controller.cluster.DataPersistenceProvider;
46 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
47 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
48 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
49 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
50 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
51 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
52 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
53 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
54 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
55 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
56 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
57 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
58 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
59 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
60 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
61 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
62 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
63 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
64 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
65 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
66 import org.opendaylight.controller.cluster.raft.RaftState;
67 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
68 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
69 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
70 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
71 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
72 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
73 import scala.concurrent.Await;
74 import scala.concurrent.Future;
75 import scala.concurrent.duration.FiniteDuration;
76
77 public class ShardManagerTest extends AbstractActorTest {
78     private static int ID_COUNTER = 1;
79
80     private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
81     private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix;
82
83     @Mock
84     private static CountDownLatch ready;
85
86     private static TestActorRef<MessageCollectorActor> mockShardActor;
87
88     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
89             dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS);
90
91     private static ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
92         String name = new ShardIdentifier(shardName, memberName,"config").toString();
93         return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
94     }
95
96     @Before
97     public void setUp() {
98         MockitoAnnotations.initMocks(this);
99
100         InMemoryJournal.clear();
101
102         if(mockShardActor == null) {
103             String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString();
104             mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), name);
105         }
106
107         mockShardActor.underlyingActor().clear();
108     }
109
110     @After
111     public void tearDown() {
112         InMemoryJournal.clear();
113     }
114
115     private Props newShardMgrProps() {
116         return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
117                 datastoreContextBuilder.build(), ready);
118     }
119
120     private Props newPropsShardMgrWithMockShardActor() {
121         return newPropsShardMgrWithMockShardActor("shardManager", mockShardActor, new MockClusterWrapper(),
122                 new MockConfiguration());
123     }
124
125     private Props newPropsShardMgrWithMockShardActor(final String name, final ActorRef shardActor,
126             final ClusterWrapper clusterWrapper, final Configuration config) {
127         Creator<ShardManager> creator = new Creator<ShardManager>() {
128             private static final long serialVersionUID = 1L;
129             @Override
130             public ShardManager create() throws Exception {
131                 return new ForwardingShardManager(clusterWrapper, config, datastoreContextBuilder.build(),
132                         ready, name, shardActor);
133             }
134         };
135
136         return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
137     }
138
139     @Test
140     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
141         new JavaTestKit(getSystem()) {{
142             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
143
144             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
145
146             shardManager.tell(new FindPrimary("non-existent", false), getRef());
147
148             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
149         }};
150     }
151
152     @Test
153     public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception {
154         new JavaTestKit(getSystem()) {{
155             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
156
157             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
158
159             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
160             shardManager.tell(new ActorInitialized(), mockShardActor);
161
162             DataTree mockDataTree = mock(DataTree.class);
163             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree)), getRef());
164
165             MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
166             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
167                     RaftState.Leader.name())), mockShardActor);
168
169             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
170
171             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
172             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
173                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
174             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
175         }};
176     }
177
178     @Test
179     public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() throws Exception {
180         new JavaTestKit(getSystem()) {{
181             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
182
183             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
184             shardManager.tell(new ActorInitialized(), mockShardActor);
185
186             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
187             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
188             shardManager.tell(new RoleChangeNotification(memberId1,
189                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
190             shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor);
191
192             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
193
194             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
195         }};
196     }
197
198     @Test
199     public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception {
200         new JavaTestKit(getSystem()) {{
201             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
202
203             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
204             shardManager.tell(new ActorInitialized(), mockShardActor);
205
206             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
207             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
208
209             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
210             shardManager.tell(new RoleChangeNotification(memberId1,
211                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
212             shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, Optional.<DataTree>absent()), mockShardActor);
213
214             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
215
216             RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
217             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
218                     primaryFound.getPrimaryPath().contains("member-2-shard-default"));
219         }};
220     }
221
222     @Test
223     public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception {
224         new JavaTestKit(getSystem()) {{
225             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
226
227             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
228
229             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
230         }};
231     }
232
233     @Test
234     public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception {
235         new JavaTestKit(getSystem()) {{
236             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
237
238             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
239             shardManager.tell(new ActorInitialized(), mockShardActor);
240
241             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
242
243             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
244         }};
245     }
246
247     @Test
248     public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception {
249         new JavaTestKit(getSystem()) {{
250             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
251
252             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
253             shardManager.tell(new ActorInitialized(), mockShardActor);
254
255             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
256             shardManager.tell(new RoleChangeNotification(memberId,
257                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
258
259             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
260
261             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
262
263             DataTree mockDataTree = mock(DataTree.class);
264             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree)), mockShardActor);
265
266             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
267
268             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
269             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
270                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
271             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
272         }};
273     }
274
275     @Test
276     public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
277         new JavaTestKit(getSystem()) {{
278             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
279
280             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
281
282             // We're passing waitUntilInitialized = true to FindPrimary so the response should be
283             // delayed until we send ActorInitialized and RoleChangeNotification.
284             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
285
286             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
287
288             shardManager.tell(new ActorInitialized(), mockShardActor);
289
290             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
291
292             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
293             shardManager.tell(new RoleChangeNotification(memberId,
294                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor);
295
296             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
297
298             DataTree mockDataTree = mock(DataTree.class);
299             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree)), mockShardActor);
300
301             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
302             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
303                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
304             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
305
306             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
307         }};
308     }
309
310     @Test
311     public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception {
312         new JavaTestKit(getSystem()) {{
313             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
314
315             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
316
317             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
318
319             expectMsgClass(duration("2 seconds"), NotInitializedException.class);
320
321             shardManager.tell(new ActorInitialized(), mockShardActor);
322
323             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
324         }};
325     }
326
327     @Test
328     public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception {
329         new JavaTestKit(getSystem()) {{
330             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
331
332             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
333             shardManager.tell(new ActorInitialized(), mockShardActor);
334             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
335                     null, RaftState.Candidate.name()), mockShardActor);
336
337             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
338
339             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
340         }};
341     }
342
343     @Test
344     public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
345         new JavaTestKit(getSystem()) {{
346             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
347
348             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
349             shardManager.tell(new ActorInitialized(), mockShardActor);
350
351             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
352
353             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
354         }};
355     }
356
357     @Test
358     public void testOnReceiveFindPrimaryForRemoteShard() throws Exception {
359         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
360
361         // Create an ActorSystem ShardManager actor for member-1.
362
363         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
364         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
365
366         ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
367
368         final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
369                 newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
370                         new MockConfiguration()), shardManagerID);
371
372         // Create an ActorSystem ShardManager actor for member-2.
373
374         final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
375
376         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
377
378         final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
379
380         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
381                 put("default", Arrays.asList("member-1", "member-2")).
382                 put("astronauts", Arrays.asList("member-2")).build());
383
384         final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
385                 newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
386                         mockConfig2), shardManagerID);
387
388         new JavaTestKit(system1) {{
389
390             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
391             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
392
393             shardManager2.tell(new ActorInitialized(), mockShardActor2);
394
395             String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
396             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2,
397                     Optional.of(mock(DataTree.class))), mockShardActor2);
398             shardManager2.tell(new RoleChangeNotification(memberId2,
399                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
400
401             shardManager1.underlyingActor().waitForMemberUp();
402
403             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
404
405             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
406             String path = found.getPrimaryPath();
407             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
408
409             shardManager2.underlyingActor().verifyFindPrimary();
410
411             Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
412
413             shardManager1.underlyingActor().waitForMemberRemoved();
414
415             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
416
417             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
418         }};
419
420         JavaTestKit.shutdownActorSystem(system1);
421         JavaTestKit.shutdownActorSystem(system2);
422     }
423
424     @Test
425     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
426         new JavaTestKit(getSystem()) {{
427             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
428
429             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
430
431             shardManager.tell(new FindLocalShard("non-existent", false), getRef());
432
433             LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
434
435             assertEquals("getShardName", "non-existent", notFound.getShardName());
436         }};
437     }
438
439     @Test
440     public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
441         new JavaTestKit(getSystem()) {{
442             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
443
444             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
445             shardManager.tell(new ActorInitialized(), mockShardActor);
446
447             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
448
449             LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
450
451             assertTrue("Found path contains " + found.getPath().path().toString(),
452                     found.getPath().path().toString().contains("member-1-shard-default-config"));
453         }};
454     }
455
456     @Test
457     public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
458         new JavaTestKit(getSystem()) {{
459             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
460
461             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
462
463             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
464         }};
465     }
466
467     @Test
468     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
469         new JavaTestKit(getSystem()) {{
470             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
471
472             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
473
474             // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
475             // delayed until we send ActorInitialized.
476             Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
477                     new Timeout(5, TimeUnit.SECONDS));
478
479             shardManager.tell(new ActorInitialized(), mockShardActor);
480
481             Object resp = Await.result(future, duration("5 seconds"));
482             assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
483         }};
484     }
485
486     @Test
487     public void testOnRecoveryJournalIsCleaned() {
488         InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
489                 ImmutableSet.of("foo")));
490         InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules(
491                 ImmutableSet.of("bar")));
492         InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
493
494         new JavaTestKit(getSystem()) {{
495             TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
496                     Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
497
498             shardManager.underlyingActor().waitForRecoveryComplete();
499             InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
500
501             // Journal entries up to the last one should've been deleted
502             Map<Long, Object> journal = InMemoryJournal.get(shardMgrID);
503             synchronized (journal) {
504                 assertEquals("Journal size", 1, journal.size());
505                 assertEquals("Journal entry seq #", Long.valueOf(2), journal.keySet().iterator().next());
506             }
507         }};
508     }
509
510     @Test
511     public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception {
512         final ImmutableSet<String> persistedModules = ImmutableSet.of("foo", "bar");
513         InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
514                 persistedModules));
515         new JavaTestKit(getSystem()) {{
516             TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
517                     Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
518
519             shardManager.underlyingActor().waitForRecoveryComplete();
520
521             Collection<String> knownModules = shardManager.underlyingActor().getKnownModules();
522
523             assertEquals("getKnownModules", persistedModules, Sets.newHashSet(knownModules));
524         }};
525     }
526
527     @Test
528     public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules()
529             throws Exception {
530         new JavaTestKit(getSystem()) {{
531             final TestActorRef<ShardManager> shardManager =
532                     TestActorRef.create(getSystem(), newShardMgrProps());
533
534             assertEquals("getKnownModules size", 0, shardManager.underlyingActor().getKnownModules().size());
535
536             ModuleIdentifier foo = mock(ModuleIdentifier.class);
537             when(foo.getNamespace()).thenReturn(new URI("foo"));
538
539             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
540             moduleIdentifierSet.add(foo);
541
542             SchemaContext schemaContext = mock(SchemaContext.class);
543             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
544
545             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
546
547             assertEquals("getKnownModules", Sets.newHashSet("foo"),
548                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
549
550             ModuleIdentifier bar = mock(ModuleIdentifier.class);
551             when(bar.getNamespace()).thenReturn(new URI("bar"));
552
553             moduleIdentifierSet.add(bar);
554
555             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
556
557             assertEquals("getKnownModules", Sets.newHashSet("foo", "bar"),
558                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
559         }};
560     }
561
562     @Test
563     public void testOnUpdateSchemaContextDoNotUpdateKnownModulesIfTheyDoNotContainASuperSetOfKnownModules()
564             throws Exception {
565         new JavaTestKit(getSystem()) {{
566             final TestActorRef<ShardManager> shardManager =
567                     TestActorRef.create(getSystem(), newShardMgrProps());
568
569             SchemaContext schemaContext = mock(SchemaContext.class);
570             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
571
572             ModuleIdentifier foo = mock(ModuleIdentifier.class);
573             when(foo.getNamespace()).thenReturn(new URI("foo"));
574
575             moduleIdentifierSet.add(foo);
576
577             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
578
579             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
580
581             assertEquals("getKnownModules", Sets.newHashSet("foo"),
582                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
583
584             //Create a completely different SchemaContext with only the bar module in it
585             //schemaContext = mock(SchemaContext.class);
586             moduleIdentifierSet.clear();
587             ModuleIdentifier bar = mock(ModuleIdentifier.class);
588             when(bar.getNamespace()).thenReturn(new URI("bar"));
589
590             moduleIdentifierSet.add(bar);
591
592             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
593
594             assertEquals("getKnownModules", Sets.newHashSet("foo"),
595                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
596
597         }};
598     }
599
600     @Test
601     public void testRecoveryApplicable(){
602         new JavaTestKit(getSystem()) {
603             {
604                 final Props persistentProps = ShardManager.props(
605                         new MockClusterWrapper(),
606                         new MockConfiguration(),
607                         DatastoreContext.newBuilder().persistent(true).build(), ready);
608                 final TestActorRef<ShardManager> persistentShardManager =
609                         TestActorRef.create(getSystem(), persistentProps);
610
611                 DataPersistenceProvider dataPersistenceProvider1 = persistentShardManager.underlyingActor().getDataPersistenceProvider();
612
613                 assertTrue("Recovery Applicable", dataPersistenceProvider1.isRecoveryApplicable());
614
615                 final Props nonPersistentProps = ShardManager.props(
616                         new MockClusterWrapper(),
617                         new MockConfiguration(),
618                         DatastoreContext.newBuilder().persistent(false).build(), ready);
619                 final TestActorRef<ShardManager> nonPersistentShardManager =
620                         TestActorRef.create(getSystem(), nonPersistentProps);
621
622                 DataPersistenceProvider dataPersistenceProvider2 = nonPersistentShardManager.underlyingActor().getDataPersistenceProvider();
623
624                 assertFalse("Recovery Not Applicable", dataPersistenceProvider2.isRecoveryApplicable());
625
626
627             }};
628
629     }
630
631     @Test
632     public void testOnUpdateSchemaContextUpdateKnownModulesCallsDataPersistenceProvider()
633             throws Exception {
634         final CountDownLatch persistLatch = new CountDownLatch(1);
635         final Creator<ShardManager> creator = new Creator<ShardManager>() {
636             private static final long serialVersionUID = 1L;
637             @Override
638             public ShardManager create() throws Exception {
639                 return new ShardManager(new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(), ready) {
640                     @Override
641                     protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
642                         DataPersistenceProviderMonitor dataPersistenceProviderMonitor
643                                 = new DataPersistenceProviderMonitor();
644                         dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
645                         return dataPersistenceProviderMonitor;
646                     }
647                 };
648             }
649         };
650
651         new JavaTestKit(getSystem()) {{
652
653             final TestActorRef<ShardManager> shardManager =
654                     TestActorRef.create(getSystem(), Props.create(new DelegatingShardManagerCreator(creator)));
655
656             ModuleIdentifier foo = mock(ModuleIdentifier.class);
657             when(foo.getNamespace()).thenReturn(new URI("foo"));
658
659             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
660             moduleIdentifierSet.add(foo);
661
662             SchemaContext schemaContext = mock(SchemaContext.class);
663             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
664
665             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
666
667             assertEquals("Persisted", true,
668                     Uninterruptibles.awaitUninterruptibly(persistLatch, 5, TimeUnit.SECONDS));
669
670         }};
671     }
672
673     @Test
674     public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
675         new JavaTestKit(getSystem()) {
676             {
677                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
678
679                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
680                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
681                         memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
682
683                 verify(ready, never()).countDown();
684
685                 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
686                         Optional.of(mock(DataTree.class))));
687
688                 verify(ready, times(1)).countDown();
689
690             }};
691     }
692
693     @Test
694     public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
695         new JavaTestKit(getSystem()) {
696             {
697                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
698
699                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
700                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
701                         memberId, null, RaftState.Follower.name()));
702
703                 verify(ready, never()).countDown();
704
705                 shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
706
707                 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId,
708                         "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class))));
709
710                 verify(ready, times(1)).countDown();
711
712             }};
713     }
714
715     @Test
716     public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
717         new JavaTestKit(getSystem()) {
718             {
719                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
720
721                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
722                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
723                         memberId, null, RaftState.Follower.name()));
724
725                 verify(ready, never()).countDown();
726
727                 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId,
728                         "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class))));
729
730                 shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
731
732                 verify(ready, times(1)).countDown();
733
734             }};
735     }
736
737     @Test
738     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
739         new JavaTestKit(getSystem()) {
740             {
741                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
742
743                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
744                         "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
745
746                 verify(ready, never()).countDown();
747
748             }};
749     }
750
751
752     @Test
753     public void testByDefaultSyncStatusIsFalse() throws Exception{
754         final Props persistentProps = ShardManager.props(
755                 new MockClusterWrapper(),
756                 new MockConfiguration(),
757                 DatastoreContext.newBuilder().persistent(true).build(), ready);
758         final TestActorRef<ShardManager> shardManager =
759                 TestActorRef.create(getSystem(), persistentProps);
760
761         ShardManager shardManagerActor = shardManager.underlyingActor();
762
763         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
764     }
765
766     @Test
767     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
768         final Props persistentProps = ShardManager.props(
769                 new MockClusterWrapper(),
770                 new MockConfiguration(),
771                 DatastoreContext.newBuilder().persistent(true).build(), ready);
772         final TestActorRef<ShardManager> shardManager =
773                 TestActorRef.create(getSystem(), persistentProps);
774
775         ShardManager shardManagerActor = shardManager.underlyingActor();
776         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
777                 RaftState.Follower.name(), RaftState.Leader.name()));
778
779         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
780     }
781
782     @Test
783     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
784         final Props persistentProps = ShardManager.props(
785                 new MockClusterWrapper(),
786                 new MockConfiguration(),
787                 DatastoreContext.newBuilder().persistent(true).build(), ready);
788         final TestActorRef<ShardManager> shardManager =
789                 TestActorRef.create(getSystem(), persistentProps);
790
791         ShardManager shardManagerActor = shardManager.underlyingActor();
792         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
793                 RaftState.Follower.name(), RaftState.Candidate.name()));
794
795         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
796
797         // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
798         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
799
800         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
801     }
802
803     @Test
804     public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
805         final Props persistentProps = ShardManager.props(
806                 new MockClusterWrapper(),
807                 new MockConfiguration(),
808                 DatastoreContext.newBuilder().persistent(true).build(), ready);
809         final TestActorRef<ShardManager> shardManager =
810                 TestActorRef.create(getSystem(), persistentProps);
811
812         ShardManager shardManagerActor = shardManager.underlyingActor();
813         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
814                 RaftState.Candidate.name(), RaftState.Follower.name()));
815
816         // Initially will be false
817         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
818
819         // Send status true will make sync status true
820         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
821
822         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
823
824         // Send status false will make sync status false
825         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-default-unknown"));
826
827         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
828
829     }
830
831     @Test
832     public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
833         final Props persistentProps = ShardManager.props(
834                 new MockClusterWrapper(),
835                 new MockConfiguration() {
836                     @Override
837                     public List<String> getMemberShardNames(String memberName) {
838                         return Arrays.asList("default", "astronauts");
839                     }
840                 },
841                 DatastoreContext.newBuilder().persistent(true).build(), ready);
842         final TestActorRef<ShardManager> shardManager =
843                 TestActorRef.create(getSystem(), persistentProps);
844
845         ShardManager shardManagerActor = shardManager.underlyingActor();
846
847         // Initially will be false
848         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
849
850         // Make default shard leader
851         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
852                 RaftState.Follower.name(), RaftState.Leader.name()));
853
854         // default = Leader, astronauts is unknown so sync status remains false
855         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
856
857         // Make astronauts shard leader as well
858         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
859                 RaftState.Follower.name(), RaftState.Leader.name()));
860
861         // Now sync status should be true
862         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
863
864         // Make astronauts a Follower
865         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
866                 RaftState.Leader.name(), RaftState.Follower.name()));
867
868         // Sync status is not true
869         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
870
871         // Make the astronauts follower sync status true
872         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-astronauts-unknown"));
873
874         // Sync status is now true
875         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
876
877     }
878
879     private static class TestShardManager extends ShardManager {
880         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
881
882         TestShardManager(String shardMrgIDSuffix) {
883             super(new MockClusterWrapper(), new MockConfiguration(),
884                     DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready);
885         }
886
887         @Override
888         public void handleRecover(Object message) throws Exception {
889             try {
890                 super.handleRecover(message);
891             } finally {
892                 if(message instanceof RecoveryCompleted) {
893                     recoveryComplete.countDown();
894                 }
895             }
896         }
897
898         void waitForRecoveryComplete() {
899             assertEquals("Recovery complete", true,
900                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
901         }
902     }
903
904     @SuppressWarnings("serial")
905     static class TestShardManagerCreator implements Creator<TestShardManager> {
906         String shardMrgIDSuffix;
907
908         TestShardManagerCreator(String shardMrgIDSuffix) {
909             this.shardMrgIDSuffix = shardMrgIDSuffix;
910         }
911
912         @Override
913         public TestShardManager create() throws Exception {
914             return new TestShardManager(shardMrgIDSuffix);
915         }
916
917     }
918
919     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
920         private static final long serialVersionUID = 1L;
921         private final Creator<ShardManager> delegate;
922
923         public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
924             this.delegate = delegate;
925         }
926
927         @Override
928         public ShardManager create() throws Exception {
929             return delegate.create();
930         }
931     }
932
933     private static class ForwardingShardManager extends ShardManager {
934         private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
935         private CountDownLatch memberUpReceived = new CountDownLatch(1);
936         private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
937         private final ActorRef shardActor;
938         private final String name;
939
940         protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration,
941                 DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, String name,
942                 ActorRef shardActor) {
943             super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch);
944             this.shardActor = shardActor;
945             this.name = name;
946         }
947
948         @Override
949         public void handleCommand(Object message) throws Exception {
950             try{
951                 super.handleCommand(message);
952             } finally {
953                 if(message instanceof FindPrimary) {
954                     findPrimaryMessageReceived.countDown();
955                 } else if(message instanceof ClusterEvent.MemberUp) {
956                     String role = ((ClusterEvent.MemberUp)message).member().roles().head();
957                     if(!getCluster().getCurrentMemberName().equals(role)) {
958                         memberUpReceived.countDown();
959                     }
960                 } else if(message instanceof ClusterEvent.MemberRemoved) {
961                     String role = ((ClusterEvent.MemberRemoved)message).member().roles().head();
962                     if(!getCluster().getCurrentMemberName().equals(role)) {
963                         memberRemovedReceived.countDown();
964                     }
965                 }
966             }
967         }
968
969         @Override
970         public String persistenceId() {
971             return name;
972         }
973
974         @Override
975         protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
976             return shardActor;
977         }
978
979         void waitForMemberUp() {
980             assertEquals("MemberUp received", true,
981                     Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
982             memberUpReceived = new CountDownLatch(1);
983         }
984
985         void waitForMemberRemoved() {
986             assertEquals("MemberRemoved received", true,
987                     Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
988             memberRemovedReceived = new CountDownLatch(1);
989         }
990
991         void verifyFindPrimary() {
992             assertEquals("FindPrimary received", true,
993                     Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
994             findPrimaryMessageReceived = new CountDownLatch(1);
995         }
996     }
997 }