Merge "BUG-1567 Expose sources for submodules in netconf"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardManagerTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertTrue;
6 import static org.mockito.Mockito.mock;
7 import static org.mockito.Mockito.never;
8 import static org.mockito.Mockito.times;
9 import static org.mockito.Mockito.verify;
10 import static org.mockito.Mockito.when;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSystem;
13 import akka.actor.AddressFromURIString;
14 import akka.actor.Props;
15 import akka.cluster.Cluster;
16 import akka.cluster.ClusterEvent;
17 import akka.dispatch.Dispatchers;
18 import akka.japi.Creator;
19 import akka.pattern.Patterns;
20 import akka.persistence.RecoveryCompleted;
21 import akka.testkit.JavaTestKit;
22 import akka.testkit.TestActorRef;
23 import akka.util.Timeout;
24 import com.google.common.collect.ImmutableMap;
25 import com.google.common.collect.ImmutableSet;
26 import com.google.common.collect.Sets;
27 import com.google.common.util.concurrent.Uninterruptibles;
28 import com.typesafe.config.ConfigFactory;
29 import java.net.URI;
30 import java.util.Arrays;
31 import java.util.Collection;
32 import java.util.HashSet;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Set;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.TimeUnit;
38 import org.junit.After;
39 import org.junit.Before;
40 import org.junit.Test;
41 import org.mockito.Mock;
42 import org.mockito.MockitoAnnotations;
43 import org.opendaylight.controller.cluster.DataPersistenceProvider;
44 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
45 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
46 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
47 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
48 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
49 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
50 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
51 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
52 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
53 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
54 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
55 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
56 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
57 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
58 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
59 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
60 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
61 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
62 import org.opendaylight.controller.cluster.raft.RaftState;
63 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
64 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
65 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
66 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
67 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
68 import scala.concurrent.Await;
69 import scala.concurrent.Future;
70 import scala.concurrent.duration.FiniteDuration;
71
72 public class ShardManagerTest extends AbstractActorTest {
73     private static int ID_COUNTER = 1;
74
75     private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
76     private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix;
77
78     @Mock
79     private static CountDownLatch ready;
80
81     private static TestActorRef<MessageCollectorActor> mockShardActor;
82
83     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
84             dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS);
85
86     private static ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
87         String name = new ShardIdentifier(shardName, memberName,"config").toString();
88         return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
89     }
90
91     @Before
92     public void setUp() {
93         MockitoAnnotations.initMocks(this);
94
95         InMemoryJournal.clear();
96
97         if(mockShardActor == null) {
98             String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString();
99             mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), name);
100         }
101
102         mockShardActor.underlyingActor().clear();
103     }
104
105     @After
106     public void tearDown() {
107         InMemoryJournal.clear();
108     }
109
110     private Props newShardMgrProps() {
111         return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
112                 datastoreContextBuilder.build(), ready);
113     }
114
115     private Props newPropsShardMgrWithMockShardActor() {
116         return newPropsShardMgrWithMockShardActor("shardManager", mockShardActor, new MockClusterWrapper(),
117                 new MockConfiguration());
118     }
119
120     private Props newPropsShardMgrWithMockShardActor(final String name, final ActorRef shardActor,
121             final ClusterWrapper clusterWrapper, final Configuration config) {
122         Creator<ShardManager> creator = new Creator<ShardManager>() {
123             private static final long serialVersionUID = 1L;
124             @Override
125             public ShardManager create() throws Exception {
126                 return new ForwardingShardManager(clusterWrapper, config, datastoreContextBuilder.build(),
127                         ready, name, shardActor);
128             }
129         };
130
131         return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
132     }
133
134     @Test
135     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
136         new JavaTestKit(getSystem()) {{
137             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
138
139             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
140
141             shardManager.tell(new FindPrimary("non-existent", false), getRef());
142
143             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
144         }};
145     }
146
147     @Test
148     public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception {
149         new JavaTestKit(getSystem()) {{
150             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
151
152             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
153
154             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
155             shardManager.tell(new ActorInitialized(), mockShardActor);
156
157             shardManager.tell(new LeaderStateChanged(memberId, memberId), getRef());
158
159             MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
160             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
161                     RaftState.Leader.name())), mockShardActor);
162
163             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
164
165             PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
166             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
167                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
168         }};
169     }
170
171     @Test
172     public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception {
173         new JavaTestKit(getSystem()) {{
174             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
175
176             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
177             shardManager.tell(new ActorInitialized(), mockShardActor);
178
179             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
180             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
181
182             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
183             shardManager.tell(new RoleChangeNotification(memberId1,
184                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
185             shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor);
186
187             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
188
189             PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
190             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
191                     primaryFound.getPrimaryPath().contains("member-2-shard-default"));
192         }};
193     }
194
195     @Test
196     public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception {
197         new JavaTestKit(getSystem()) {{
198             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
199
200             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
201
202             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
203         }};
204     }
205
206     @Test
207     public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception {
208         new JavaTestKit(getSystem()) {{
209             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
210
211             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
212             shardManager.tell(new ActorInitialized(), mockShardActor);
213
214             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
215
216             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
217         }};
218     }
219
220     @Test
221     public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception {
222         new JavaTestKit(getSystem()) {{
223             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
224
225             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
226             shardManager.tell(new ActorInitialized(), mockShardActor);
227
228             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
229             shardManager.tell(new RoleChangeNotification(memberId,
230                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
231
232             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
233
234             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
235
236             shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor);
237
238             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
239
240             PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
241             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
242                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
243         }};
244     }
245
246     @Test
247     public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
248         new JavaTestKit(getSystem()) {{
249             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
250
251             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
252
253             // We're passing waitUntilInitialized = true to FindPrimary so the response should be
254             // delayed until we send ActorInitialized and RoleChangeNotification.
255             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
256
257             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
258
259             shardManager.tell(new ActorInitialized(), mockShardActor);
260
261             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
262
263             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
264             shardManager.tell(new RoleChangeNotification(memberId,
265                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor);
266
267             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
268
269             shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor);
270
271             PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
272             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
273                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
274
275             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
276         }};
277     }
278
279     @Test
280     public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception {
281         new JavaTestKit(getSystem()) {{
282             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
283
284             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
285
286             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
287
288             expectMsgClass(duration("2 seconds"), NotInitializedException.class);
289
290             shardManager.tell(new ActorInitialized(), mockShardActor);
291
292             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
293         }};
294     }
295
296     @Test
297     public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception {
298         new JavaTestKit(getSystem()) {{
299             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
300
301             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
302             shardManager.tell(new ActorInitialized(), mockShardActor);
303             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
304                     null, RaftState.Candidate.name()), mockShardActor);
305
306             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
307
308             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
309         }};
310     }
311
312     @Test
313     public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
314         new JavaTestKit(getSystem()) {{
315             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
316
317             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
318             shardManager.tell(new ActorInitialized(), mockShardActor);
319
320             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
321
322             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
323         }};
324     }
325
326     @Test
327     public void testOnReceiveFindPrimaryForRemoteShard() throws Exception {
328         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
329
330         // Create an ActorSystem ShardManager actor for member-1.
331
332         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
333         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
334
335         ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
336
337         final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
338                 newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
339                         new MockConfiguration()), shardManagerID);
340
341         // Create an ActorSystem ShardManager actor for member-2.
342
343         final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
344
345         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
346
347         final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
348
349         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
350                 put("default", Arrays.asList("member-1", "member-2")).
351                 put("astronauts", Arrays.asList("member-2")).build());
352
353         final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
354                 newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
355                         mockConfig2), shardManagerID);
356
357         new JavaTestKit(system1) {{
358
359             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
360             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
361
362             shardManager2.tell(new ActorInitialized(), mockShardActor2);
363
364             String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
365             shardManager2.tell(new LeaderStateChanged(memberId2, memberId2), mockShardActor2);
366             shardManager2.tell(new RoleChangeNotification(memberId2,
367                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
368
369             shardManager1.underlyingActor().waitForMemberUp();
370
371             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
372
373             PrimaryFound found = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
374             String path = found.getPrimaryPath();
375             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
376
377             shardManager2.underlyingActor().verifyFindPrimary();
378
379             Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
380
381             shardManager1.underlyingActor().waitForMemberRemoved();
382
383             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
384
385             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
386         }};
387
388         JavaTestKit.shutdownActorSystem(system1);
389         JavaTestKit.shutdownActorSystem(system2);
390     }
391
392     @Test
393     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
394         new JavaTestKit(getSystem()) {{
395             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
396
397             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
398
399             shardManager.tell(new FindLocalShard("non-existent", false), getRef());
400
401             LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
402
403             assertEquals("getShardName", "non-existent", notFound.getShardName());
404         }};
405     }
406
407     @Test
408     public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
409         new JavaTestKit(getSystem()) {{
410             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
411
412             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
413             shardManager.tell(new ActorInitialized(), mockShardActor);
414
415             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
416
417             LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
418
419             assertTrue("Found path contains " + found.getPath().path().toString(),
420                     found.getPath().path().toString().contains("member-1-shard-default-config"));
421         }};
422     }
423
424     @Test
425     public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
426         new JavaTestKit(getSystem()) {{
427             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
428
429             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
430
431             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
432         }};
433     }
434
435     @Test
436     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
437         new JavaTestKit(getSystem()) {{
438             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
439
440             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
441
442             // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
443             // delayed until we send ActorInitialized.
444             Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
445                     new Timeout(5, TimeUnit.SECONDS));
446
447             shardManager.tell(new ActorInitialized(), mockShardActor);
448
449             Object resp = Await.result(future, duration("5 seconds"));
450             assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
451         }};
452     }
453
454     @Test
455     public void testOnRecoveryJournalIsCleaned() {
456         InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
457                 ImmutableSet.of("foo")));
458         InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules(
459                 ImmutableSet.of("bar")));
460         InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
461
462         new JavaTestKit(getSystem()) {{
463             TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
464                     Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
465
466             shardManager.underlyingActor().waitForRecoveryComplete();
467             InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
468
469             // Journal entries up to the last one should've been deleted
470             Map<Long, Object> journal = InMemoryJournal.get(shardMgrID);
471             synchronized (journal) {
472                 assertEquals("Journal size", 1, journal.size());
473                 assertEquals("Journal entry seq #", Long.valueOf(2), journal.keySet().iterator().next());
474             }
475         }};
476     }
477
478     @Test
479     public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception {
480         final ImmutableSet<String> persistedModules = ImmutableSet.of("foo", "bar");
481         InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
482                 persistedModules));
483         new JavaTestKit(getSystem()) {{
484             TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
485                     Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
486
487             shardManager.underlyingActor().waitForRecoveryComplete();
488
489             Collection<String> knownModules = shardManager.underlyingActor().getKnownModules();
490
491             assertEquals("getKnownModules", persistedModules, Sets.newHashSet(knownModules));
492         }};
493     }
494
495     @Test
496     public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules()
497             throws Exception {
498         new JavaTestKit(getSystem()) {{
499             final TestActorRef<ShardManager> shardManager =
500                     TestActorRef.create(getSystem(), newShardMgrProps());
501
502             assertEquals("getKnownModules size", 0, shardManager.underlyingActor().getKnownModules().size());
503
504             ModuleIdentifier foo = mock(ModuleIdentifier.class);
505             when(foo.getNamespace()).thenReturn(new URI("foo"));
506
507             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
508             moduleIdentifierSet.add(foo);
509
510             SchemaContext schemaContext = mock(SchemaContext.class);
511             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
512
513             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
514
515             assertEquals("getKnownModules", Sets.newHashSet("foo"),
516                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
517
518             ModuleIdentifier bar = mock(ModuleIdentifier.class);
519             when(bar.getNamespace()).thenReturn(new URI("bar"));
520
521             moduleIdentifierSet.add(bar);
522
523             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
524
525             assertEquals("getKnownModules", Sets.newHashSet("foo", "bar"),
526                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
527         }};
528     }
529
530     @Test
531     public void testOnUpdateSchemaContextDoNotUpdateKnownModulesIfTheyDoNotContainASuperSetOfKnownModules()
532             throws Exception {
533         new JavaTestKit(getSystem()) {{
534             final TestActorRef<ShardManager> shardManager =
535                     TestActorRef.create(getSystem(), newShardMgrProps());
536
537             SchemaContext schemaContext = mock(SchemaContext.class);
538             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
539
540             ModuleIdentifier foo = mock(ModuleIdentifier.class);
541             when(foo.getNamespace()).thenReturn(new URI("foo"));
542
543             moduleIdentifierSet.add(foo);
544
545             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
546
547             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
548
549             assertEquals("getKnownModules", Sets.newHashSet("foo"),
550                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
551
552             //Create a completely different SchemaContext with only the bar module in it
553             //schemaContext = mock(SchemaContext.class);
554             moduleIdentifierSet.clear();
555             ModuleIdentifier bar = mock(ModuleIdentifier.class);
556             when(bar.getNamespace()).thenReturn(new URI("bar"));
557
558             moduleIdentifierSet.add(bar);
559
560             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
561
562             assertEquals("getKnownModules", Sets.newHashSet("foo"),
563                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
564
565         }};
566     }
567
568     @Test
569     public void testRecoveryApplicable(){
570         new JavaTestKit(getSystem()) {
571             {
572                 final Props persistentProps = ShardManager.props(
573                         new MockClusterWrapper(),
574                         new MockConfiguration(),
575                         DatastoreContext.newBuilder().persistent(true).build(), ready);
576                 final TestActorRef<ShardManager> persistentShardManager =
577                         TestActorRef.create(getSystem(), persistentProps);
578
579                 DataPersistenceProvider dataPersistenceProvider1 = persistentShardManager.underlyingActor().getDataPersistenceProvider();
580
581                 assertTrue("Recovery Applicable", dataPersistenceProvider1.isRecoveryApplicable());
582
583                 final Props nonPersistentProps = ShardManager.props(
584                         new MockClusterWrapper(),
585                         new MockConfiguration(),
586                         DatastoreContext.newBuilder().persistent(false).build(), ready);
587                 final TestActorRef<ShardManager> nonPersistentShardManager =
588                         TestActorRef.create(getSystem(), nonPersistentProps);
589
590                 DataPersistenceProvider dataPersistenceProvider2 = nonPersistentShardManager.underlyingActor().getDataPersistenceProvider();
591
592                 assertFalse("Recovery Not Applicable", dataPersistenceProvider2.isRecoveryApplicable());
593
594
595             }};
596
597     }
598
599     @Test
600     public void testOnUpdateSchemaContextUpdateKnownModulesCallsDataPersistenceProvider()
601             throws Exception {
602         final CountDownLatch persistLatch = new CountDownLatch(1);
603         final Creator<ShardManager> creator = new Creator<ShardManager>() {
604             private static final long serialVersionUID = 1L;
605             @Override
606             public ShardManager create() throws Exception {
607                 return new ShardManager(new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(), ready) {
608                     @Override
609                     protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
610                         DataPersistenceProviderMonitor dataPersistenceProviderMonitor
611                                 = new DataPersistenceProviderMonitor();
612                         dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
613                         return dataPersistenceProviderMonitor;
614                     }
615                 };
616             }
617         };
618
619         new JavaTestKit(getSystem()) {{
620
621             final TestActorRef<ShardManager> shardManager =
622                     TestActorRef.create(getSystem(), Props.create(new DelegatingShardManagerCreator(creator)));
623
624             ModuleIdentifier foo = mock(ModuleIdentifier.class);
625             when(foo.getNamespace()).thenReturn(new URI("foo"));
626
627             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
628             moduleIdentifierSet.add(foo);
629
630             SchemaContext schemaContext = mock(SchemaContext.class);
631             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
632
633             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
634
635             assertEquals("Persisted", true,
636                     Uninterruptibles.awaitUninterruptibly(persistLatch, 5, TimeUnit.SECONDS));
637
638         }};
639     }
640
641     @Test
642     public void testRoleChangeNotificationAndLeaderStateChangedReleaseReady() throws Exception {
643         new JavaTestKit(getSystem()) {
644             {
645                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
646
647                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
648                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
649                         memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
650
651                 verify(ready, never()).countDown();
652
653                 shardManager.underlyingActor().onReceiveCommand(new LeaderStateChanged(memberId, memberId));
654
655                 verify(ready, times(1)).countDown();
656
657             }};
658     }
659
660     @Test
661     public void testRoleChangeNotificationToFollowerWithLeaderStateChangedReleaseReady() throws Exception {
662         new JavaTestKit(getSystem()) {
663             {
664                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
665
666                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
667                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
668                         memberId, null, RaftState.Follower.name()));
669
670                 verify(ready, never()).countDown();
671
672                 shardManager.underlyingActor().onReceiveCommand(new LeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix));
673
674                 verify(ready, times(1)).countDown();
675
676             }};
677     }
678
679
680     @Test
681     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
682         new JavaTestKit(getSystem()) {
683             {
684                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
685
686                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
687                         "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
688
689                 verify(ready, never()).countDown();
690
691             }};
692     }
693
694
695     @Test
696     public void testByDefaultSyncStatusIsFalse() throws Exception{
697         final Props persistentProps = ShardManager.props(
698                 new MockClusterWrapper(),
699                 new MockConfiguration(),
700                 DatastoreContext.newBuilder().persistent(true).build(), ready);
701         final TestActorRef<ShardManager> shardManager =
702                 TestActorRef.create(getSystem(), persistentProps);
703
704         ShardManager shardManagerActor = shardManager.underlyingActor();
705
706         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
707     }
708
709     @Test
710     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
711         final Props persistentProps = ShardManager.props(
712                 new MockClusterWrapper(),
713                 new MockConfiguration(),
714                 DatastoreContext.newBuilder().persistent(true).build(), ready);
715         final TestActorRef<ShardManager> shardManager =
716                 TestActorRef.create(getSystem(), persistentProps);
717
718         ShardManager shardManagerActor = shardManager.underlyingActor();
719         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
720                 RaftState.Follower.name(), RaftState.Leader.name()));
721
722         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
723     }
724
725     @Test
726     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
727         final Props persistentProps = ShardManager.props(
728                 new MockClusterWrapper(),
729                 new MockConfiguration(),
730                 DatastoreContext.newBuilder().persistent(true).build(), ready);
731         final TestActorRef<ShardManager> shardManager =
732                 TestActorRef.create(getSystem(), persistentProps);
733
734         ShardManager shardManagerActor = shardManager.underlyingActor();
735         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
736                 RaftState.Follower.name(), RaftState.Candidate.name()));
737
738         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
739
740         // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
741         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
742
743         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
744     }
745
746     @Test
747     public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
748         final Props persistentProps = ShardManager.props(
749                 new MockClusterWrapper(),
750                 new MockConfiguration(),
751                 DatastoreContext.newBuilder().persistent(true).build(), ready);
752         final TestActorRef<ShardManager> shardManager =
753                 TestActorRef.create(getSystem(), persistentProps);
754
755         ShardManager shardManagerActor = shardManager.underlyingActor();
756         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
757                 RaftState.Candidate.name(), RaftState.Follower.name()));
758
759         // Initially will be false
760         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
761
762         // Send status true will make sync status true
763         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
764
765         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
766
767         // Send status false will make sync status false
768         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-default-unknown"));
769
770         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
771
772     }
773
774     @Test
775     public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
776         final Props persistentProps = ShardManager.props(
777                 new MockClusterWrapper(),
778                 new MockConfiguration() {
779                     @Override
780                     public List<String> getMemberShardNames(String memberName) {
781                         return Arrays.asList("default", "astronauts");
782                     }
783                 },
784                 DatastoreContext.newBuilder().persistent(true).build(), ready);
785         final TestActorRef<ShardManager> shardManager =
786                 TestActorRef.create(getSystem(), persistentProps);
787
788         ShardManager shardManagerActor = shardManager.underlyingActor();
789
790         // Initially will be false
791         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
792
793         // Make default shard leader
794         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
795                 RaftState.Follower.name(), RaftState.Leader.name()));
796
797         // default = Leader, astronauts is unknown so sync status remains false
798         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
799
800         // Make astronauts shard leader as well
801         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
802                 RaftState.Follower.name(), RaftState.Leader.name()));
803
804         // Now sync status should be true
805         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
806
807         // Make astronauts a Follower
808         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
809                 RaftState.Leader.name(), RaftState.Follower.name()));
810
811         // Sync status is not true
812         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
813
814         // Make the astronauts follower sync status true
815         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-astronauts-unknown"));
816
817         // Sync status is now true
818         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
819
820     }
821
822     private static class TestShardManager extends ShardManager {
823         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
824
825         TestShardManager(String shardMrgIDSuffix) {
826             super(new MockClusterWrapper(), new MockConfiguration(),
827                     DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready);
828         }
829
830         @Override
831         public void handleRecover(Object message) throws Exception {
832             try {
833                 super.handleRecover(message);
834             } finally {
835                 if(message instanceof RecoveryCompleted) {
836                     recoveryComplete.countDown();
837                 }
838             }
839         }
840
841         void waitForRecoveryComplete() {
842             assertEquals("Recovery complete", true,
843                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
844         }
845     }
846
847     @SuppressWarnings("serial")
848     static class TestShardManagerCreator implements Creator<TestShardManager> {
849         String shardMrgIDSuffix;
850
851         TestShardManagerCreator(String shardMrgIDSuffix) {
852             this.shardMrgIDSuffix = shardMrgIDSuffix;
853         }
854
855         @Override
856         public TestShardManager create() throws Exception {
857             return new TestShardManager(shardMrgIDSuffix);
858         }
859
860     }
861
862     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
863         private static final long serialVersionUID = 1L;
864         private final Creator<ShardManager> delegate;
865
866         public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
867             this.delegate = delegate;
868         }
869
870         @Override
871         public ShardManager create() throws Exception {
872             return delegate.create();
873         }
874     }
875
876     private static class ForwardingShardManager extends ShardManager {
877         private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
878         private CountDownLatch memberUpReceived = new CountDownLatch(1);
879         private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
880         private final ActorRef shardActor;
881         private final String name;
882
883         protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration,
884                 DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, String name,
885                 ActorRef shardActor) {
886             super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch);
887             this.shardActor = shardActor;
888             this.name = name;
889         }
890
891         @Override
892         public void handleCommand(Object message) throws Exception {
893             try{
894                 super.handleCommand(message);
895             } finally {
896                 if(message instanceof FindPrimary) {
897                     findPrimaryMessageReceived.countDown();
898                 } else if(message instanceof ClusterEvent.MemberUp) {
899                     String role = ((ClusterEvent.MemberUp)message).member().roles().head();
900                     if(!getCluster().getCurrentMemberName().equals(role)) {
901                         memberUpReceived.countDown();
902                     }
903                 } else if(message instanceof ClusterEvent.MemberRemoved) {
904                     String role = ((ClusterEvent.MemberRemoved)message).member().roles().head();
905                     if(!getCluster().getCurrentMemberName().equals(role)) {
906                         memberRemovedReceived.countDown();
907                     }
908                 }
909             }
910         }
911
912         @Override
913         public String persistenceId() {
914             return name;
915         }
916
917         @Override
918         protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
919             return shardActor;
920         }
921
922         void waitForMemberUp() {
923             assertEquals("MemberUp received", true,
924                     Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
925             memberUpReceived = new CountDownLatch(1);
926         }
927
928         void waitForMemberRemoved() {
929             assertEquals("MemberRemoved received", true,
930                     Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
931             memberRemovedReceived = new CountDownLatch(1);
932         }
933
934         void verifyFindPrimary() {
935             assertEquals("FindPrimary received", true,
936                     Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
937             findPrimaryMessageReceived = new CountDownLatch(1);
938         }
939     }
940 }