Bug 2194: Find primary shard on remote ShardManager
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardManagerTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertTrue;
6 import static org.mockito.Mockito.mock;
7 import static org.mockito.Mockito.never;
8 import static org.mockito.Mockito.times;
9 import static org.mockito.Mockito.verify;
10 import static org.mockito.Mockito.when;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSystem;
13 import akka.actor.AddressFromURIString;
14 import akka.actor.Props;
15 import akka.cluster.Cluster;
16 import akka.cluster.ClusterEvent;
17 import akka.dispatch.Dispatchers;
18 import akka.japi.Creator;
19 import akka.pattern.Patterns;
20 import akka.persistence.RecoveryCompleted;
21 import akka.testkit.JavaTestKit;
22 import akka.testkit.TestActorRef;
23 import akka.util.Timeout;
24 import com.google.common.collect.ImmutableMap;
25 import com.google.common.collect.ImmutableSet;
26 import com.google.common.collect.Sets;
27 import com.google.common.util.concurrent.Uninterruptibles;
28 import com.typesafe.config.ConfigFactory;
29 import java.net.URI;
30 import java.util.Arrays;
31 import java.util.Collection;
32 import java.util.HashSet;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Set;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.TimeUnit;
38 import org.junit.After;
39 import org.junit.Before;
40 import org.junit.Test;
41 import org.mockito.Mock;
42 import org.mockito.MockitoAnnotations;
43 import org.opendaylight.controller.cluster.DataPersistenceProvider;
44 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
45 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
46 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
47 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
48 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
49 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
50 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
51 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
52 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
53 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
54 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
55 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
56 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
57 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
58 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
59 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
60 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
61 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
62 import org.opendaylight.controller.cluster.raft.RaftState;
63 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
64 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
65 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
66 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
67 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
68 import scala.concurrent.Await;
69 import scala.concurrent.Future;
70 import scala.concurrent.duration.FiniteDuration;
71
72 public class ShardManagerTest extends AbstractActorTest {
73     private static int ID_COUNTER = 1;
74
75     private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
76     private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix;
77
78     @Mock
79     private static CountDownLatch ready;
80
81     private static TestActorRef<MessageCollectorActor> mockShardActor;
82
83     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
84             dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS);
85
86     private static ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
87         String name = new ShardIdentifier(shardName, memberName,"config").toString();
88         return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
89     }
90
91     @Before
92     public void setUp() {
93         MockitoAnnotations.initMocks(this);
94
95         InMemoryJournal.clear();
96
97         if(mockShardActor == null) {
98             String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString();
99             mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), name);
100         }
101
102         mockShardActor.underlyingActor().clear();
103     }
104
105     @After
106     public void tearDown() {
107         InMemoryJournal.clear();
108     }
109
110     private Props newShardMgrProps() {
111         return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
112                 datastoreContextBuilder.build(), ready);
113     }
114
115     private Props newPropsShardMgrWithMockShardActor() {
116         return newPropsShardMgrWithMockShardActor("shardManager", mockShardActor, new MockClusterWrapper(),
117                 new MockConfiguration());
118     }
119
120     private Props newPropsShardMgrWithMockShardActor(final String name, final ActorRef shardActor,
121             final ClusterWrapper clusterWrapper, final Configuration config) {
122         Creator<ShardManager> creator = new Creator<ShardManager>() {
123             private static final long serialVersionUID = 1L;
124             @Override
125             public ShardManager create() throws Exception {
126                 return new ForwardingShardManager(clusterWrapper, config, datastoreContextBuilder.build(),
127                         ready, name, shardActor);
128             }
129         };
130
131         return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
132     }
133
134     @Test
135     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
136         new JavaTestKit(getSystem()) {{
137             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
138
139             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
140
141             shardManager.tell(new FindPrimary("non-existent", false), getRef());
142
143             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
144         }};
145     }
146
147     @Test
148     public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception {
149         new JavaTestKit(getSystem()) {{
150             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
151
152             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
153
154             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
155             shardManager.tell(new ActorInitialized(), mockShardActor);
156
157             shardManager.tell(new LeaderStateChanged(memberId, memberId), getRef());
158
159             MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
160             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
161                     RaftState.Leader.name())), mockShardActor);
162
163             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
164
165             PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
166             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
167                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
168         }};
169     }
170
171     @Test
172     public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception {
173         new JavaTestKit(getSystem()) {{
174             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
175
176             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
177             shardManager.tell(new ActorInitialized(), mockShardActor);
178
179             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
180             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
181
182             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
183             shardManager.tell(new RoleChangeNotification(memberId1,
184                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
185             shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor);
186
187             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
188
189             PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
190             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
191                     primaryFound.getPrimaryPath().contains("member-2-shard-default"));
192         }};
193     }
194
195     @Test
196     public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception {
197         new JavaTestKit(getSystem()) {{
198             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
199
200             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
201
202             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
203         }};
204     }
205
206     @Test
207     public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception {
208         new JavaTestKit(getSystem()) {{
209             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
210
211             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
212             shardManager.tell(new ActorInitialized(), mockShardActor);
213
214             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
215
216             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
217         }};
218     }
219
220     @Test
221     public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception {
222         new JavaTestKit(getSystem()) {{
223             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
224
225             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
226             shardManager.tell(new ActorInitialized(), mockShardActor);
227
228             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
229             shardManager.tell(new RoleChangeNotification(memberId,
230                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
231
232             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
233
234             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
235
236             shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor);
237
238             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
239
240             PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
241             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
242                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
243         }};
244     }
245
246     @Test
247     public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
248         new JavaTestKit(getSystem()) {{
249             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
250
251             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
252
253             // We're passing waitUntilInitialized = true to FindPrimary so the response should be
254             // delayed until we send ActorInitialized and RoleChangeNotification.
255             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
256
257             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
258
259             shardManager.tell(new ActorInitialized(), mockShardActor);
260
261             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
262
263             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
264             shardManager.tell(new RoleChangeNotification(memberId,
265                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor);
266
267             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
268
269             shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor);
270
271             PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
272             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
273                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
274
275             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
276         }};
277     }
278
279     @Test
280     public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception {
281         new JavaTestKit(getSystem()) {{
282             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
283
284             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
285
286             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
287
288             expectMsgClass(duration("2 seconds"), NotInitializedException.class);
289
290             shardManager.tell(new ActorInitialized(), mockShardActor);
291
292             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
293         }};
294     }
295
296     @Test
297     public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception {
298         new JavaTestKit(getSystem()) {{
299             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
300
301             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
302             shardManager.tell(new ActorInitialized(), mockShardActor);
303             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
304                     null, RaftState.Candidate.name()), mockShardActor);
305
306             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
307
308             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
309         }};
310     }
311
312     @Test
313     public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
314         new JavaTestKit(getSystem()) {{
315             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
316
317             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
318             shardManager.tell(new ActorInitialized(), mockShardActor);
319
320             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
321
322             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
323         }};
324     }
325
326     @Test
327     public void testOnReceiveFindPrimaryForRemoteShard() throws Exception {
328         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
329
330         // Create an ActorSystem ShardManager actor for member-1.
331
332         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
333         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
334
335         ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
336
337         final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
338                 newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
339                         new MockConfiguration()), shardManagerID);
340
341         // Create an ActorSystem ShardManager actor for member-2.
342
343         final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
344
345         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
346
347         final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
348
349         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
350                 put("default", Arrays.asList("member-1", "member-2")).
351                 put("astronauts", Arrays.asList("member-2")).build());
352
353         final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
354                 newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
355                         mockConfig2), shardManagerID);
356
357         new JavaTestKit(system1) {{
358
359             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
360             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
361
362             shardManager2.tell(new ActorInitialized(), mockShardActor2);
363
364             String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
365             shardManager2.tell(new LeaderStateChanged(memberId2, memberId2), mockShardActor2);
366             shardManager2.tell(new RoleChangeNotification(memberId2,
367                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
368
369             shardManager1.underlyingActor().waitForMemberUp();
370
371             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
372
373             PrimaryFound found = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
374             String path = found.getPrimaryPath();
375             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
376
377             shardManager2.underlyingActor().verifyFindPrimary();
378
379             Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
380
381             shardManager1.underlyingActor().waitForMemberRemoved();
382
383             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
384
385             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
386         }};
387
388         JavaTestKit.shutdownActorSystem(system1);
389         JavaTestKit.shutdownActorSystem(system2);
390     }
391
392     @Test
393     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
394         new JavaTestKit(getSystem()) {{
395             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
396
397             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
398
399             shardManager.tell(new FindLocalShard("non-existent", false), getRef());
400
401             LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
402
403             assertEquals("getShardName", "non-existent", notFound.getShardName());
404         }};
405     }
406
407     @Test
408     public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
409         new JavaTestKit(getSystem()) {{
410             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
411
412             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
413             shardManager.tell(new ActorInitialized(), mockShardActor);
414
415             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
416
417             LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
418
419             assertTrue("Found path contains " + found.getPath().path().toString(),
420                     found.getPath().path().toString().contains("member-1-shard-default-config"));
421         }};
422     }
423
424     @Test
425     public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
426         new JavaTestKit(getSystem()) {{
427             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
428
429             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
430
431             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
432         }};
433     }
434
435     @Test
436     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
437         new JavaTestKit(getSystem()) {{
438             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
439
440             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
441
442             // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
443             // delayed until we send ActorInitialized.
444             Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
445                     new Timeout(5, TimeUnit.SECONDS));
446
447             shardManager.tell(new ActorInitialized(), mockShardActor);
448
449             Object resp = Await.result(future, duration("5 seconds"));
450             assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
451         }};
452     }
453
454     @Test
455     public void testOnRecoveryJournalIsCleaned() {
456         InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
457                 ImmutableSet.of("foo")));
458         InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules(
459                 ImmutableSet.of("bar")));
460         InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
461
462         new JavaTestKit(getSystem()) {{
463             TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
464                     Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
465
466             shardManager.underlyingActor().waitForRecoveryComplete();
467             InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
468
469             // Journal entries up to the last one should've been deleted
470             Map<Long, Object> journal = InMemoryJournal.get(shardMgrID);
471             synchronized (journal) {
472                 assertEquals("Journal size", 1, journal.size());
473                 assertEquals("Journal entry seq #", Long.valueOf(2), journal.keySet().iterator().next());
474             }
475         }};
476     }
477
478     @Test
479     public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception {
480         final ImmutableSet<String> persistedModules = ImmutableSet.of("foo", "bar");
481         InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
482                 persistedModules));
483         new JavaTestKit(getSystem()) {{
484             TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
485                     Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
486
487             shardManager.underlyingActor().waitForRecoveryComplete();
488
489             Collection<String> knownModules = shardManager.underlyingActor().getKnownModules();
490
491             assertEquals("getKnownModules", persistedModules, Sets.newHashSet(knownModules));
492         }};
493     }
494
495     @Test
496     public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules()
497             throws Exception {
498         new JavaTestKit(getSystem()) {{
499             final TestActorRef<ShardManager> shardManager =
500                     TestActorRef.create(getSystem(), newShardMgrProps());
501
502             assertEquals("getKnownModules size", 0, shardManager.underlyingActor().getKnownModules().size());
503
504             ModuleIdentifier foo = mock(ModuleIdentifier.class);
505             when(foo.getNamespace()).thenReturn(new URI("foo"));
506
507             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
508             moduleIdentifierSet.add(foo);
509
510             SchemaContext schemaContext = mock(SchemaContext.class);
511             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
512
513             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
514
515             assertEquals("getKnownModules", Sets.newHashSet("foo"),
516                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
517
518             ModuleIdentifier bar = mock(ModuleIdentifier.class);
519             when(bar.getNamespace()).thenReturn(new URI("bar"));
520
521             moduleIdentifierSet.add(bar);
522
523             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
524
525             assertEquals("getKnownModules", Sets.newHashSet("foo", "bar"),
526                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
527         }};
528     }
529
530     @Test
531     public void testOnUpdateSchemaContextDoNotUpdateKnownModulesIfTheyDoNotContainASuperSetOfKnownModules()
532             throws Exception {
533         new JavaTestKit(getSystem()) {{
534             final TestActorRef<ShardManager> shardManager =
535                     TestActorRef.create(getSystem(), newShardMgrProps());
536
537             SchemaContext schemaContext = mock(SchemaContext.class);
538             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
539
540             ModuleIdentifier foo = mock(ModuleIdentifier.class);
541             when(foo.getNamespace()).thenReturn(new URI("foo"));
542
543             moduleIdentifierSet.add(foo);
544
545             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
546
547             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
548
549             assertEquals("getKnownModules", Sets.newHashSet("foo"),
550                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
551
552             //Create a completely different SchemaContext with only the bar module in it
553             //schemaContext = mock(SchemaContext.class);
554             moduleIdentifierSet.clear();
555             ModuleIdentifier bar = mock(ModuleIdentifier.class);
556             when(bar.getNamespace()).thenReturn(new URI("bar"));
557
558             moduleIdentifierSet.add(bar);
559
560             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
561
562             assertEquals("getKnownModules", Sets.newHashSet("foo"),
563                     Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
564
565         }};
566     }
567
568     @Test
569     public void testRecoveryApplicable(){
570         new JavaTestKit(getSystem()) {
571             {
572                 final Props persistentProps = ShardManager.props(
573                         new MockClusterWrapper(),
574                         new MockConfiguration(),
575                         DatastoreContext.newBuilder().persistent(true).build(), ready);
576                 final TestActorRef<ShardManager> persistentShardManager =
577                         TestActorRef.create(getSystem(), persistentProps);
578
579                 DataPersistenceProvider dataPersistenceProvider1 = persistentShardManager.underlyingActor().getDataPersistenceProvider();
580
581                 assertTrue("Recovery Applicable", dataPersistenceProvider1.isRecoveryApplicable());
582
583                 final Props nonPersistentProps = ShardManager.props(
584                         new MockClusterWrapper(),
585                         new MockConfiguration(),
586                         DatastoreContext.newBuilder().persistent(false).build(), ready);
587                 final TestActorRef<ShardManager> nonPersistentShardManager =
588                         TestActorRef.create(getSystem(), nonPersistentProps);
589
590                 DataPersistenceProvider dataPersistenceProvider2 = nonPersistentShardManager.underlyingActor().getDataPersistenceProvider();
591
592                 assertFalse("Recovery Not Applicable", dataPersistenceProvider2.isRecoveryApplicable());
593
594
595             }};
596
597     }
598
599     @Test
600     public void testOnUpdateSchemaContextUpdateKnownModulesCallsDataPersistenceProvider()
601             throws Exception {
602         final CountDownLatch persistLatch = new CountDownLatch(1);
603         final Creator<ShardManager> creator = new Creator<ShardManager>() {
604             private static final long serialVersionUID = 1L;
605             @Override
606             public ShardManager create() throws Exception {
607                 return new ShardManager(new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(), ready) {
608                     @Override
609                     protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
610                         DataPersistenceProviderMonitor dataPersistenceProviderMonitor
611                                 = new DataPersistenceProviderMonitor();
612                         dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
613                         return dataPersistenceProviderMonitor;
614                     }
615                 };
616             }
617         };
618
619         new JavaTestKit(getSystem()) {{
620
621             final TestActorRef<ShardManager> shardManager =
622                     TestActorRef.create(getSystem(), Props.create(new DelegatingShardManagerCreator(creator)));
623
624             ModuleIdentifier foo = mock(ModuleIdentifier.class);
625             when(foo.getNamespace()).thenReturn(new URI("foo"));
626
627             Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
628             moduleIdentifierSet.add(foo);
629
630             SchemaContext schemaContext = mock(SchemaContext.class);
631             when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
632
633             shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
634
635             assertEquals("Persisted", true,
636                     Uninterruptibles.awaitUninterruptibly(persistLatch, 5, TimeUnit.SECONDS));
637
638         }};
639     }
640
641     @Test
642     public void testRoleChangeNotificationReleaseReady() throws Exception {
643         new JavaTestKit(getSystem()) {
644             {
645                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
646
647                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
648                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
649                         memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
650
651                 verify(ready, times(1)).countDown();
652
653             }};
654     }
655
656     @Test
657     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
658         new JavaTestKit(getSystem()) {
659             {
660                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
661
662                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
663                         "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
664
665                 verify(ready, never()).countDown();
666
667             }};
668     }
669
670
671     @Test
672     public void testByDefaultSyncStatusIsFalse() throws Exception{
673         final Props persistentProps = ShardManager.props(
674                 new MockClusterWrapper(),
675                 new MockConfiguration(),
676                 DatastoreContext.newBuilder().persistent(true).build(), ready);
677         final TestActorRef<ShardManager> shardManager =
678                 TestActorRef.create(getSystem(), persistentProps);
679
680         ShardManager shardManagerActor = shardManager.underlyingActor();
681
682         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
683     }
684
685     @Test
686     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
687         final Props persistentProps = ShardManager.props(
688                 new MockClusterWrapper(),
689                 new MockConfiguration(),
690                 DatastoreContext.newBuilder().persistent(true).build(), ready);
691         final TestActorRef<ShardManager> shardManager =
692                 TestActorRef.create(getSystem(), persistentProps);
693
694         ShardManager shardManagerActor = shardManager.underlyingActor();
695         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
696                 RaftState.Follower.name(), RaftState.Leader.name()));
697
698         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
699     }
700
701     @Test
702     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
703         final Props persistentProps = ShardManager.props(
704                 new MockClusterWrapper(),
705                 new MockConfiguration(),
706                 DatastoreContext.newBuilder().persistent(true).build(), ready);
707         final TestActorRef<ShardManager> shardManager =
708                 TestActorRef.create(getSystem(), persistentProps);
709
710         ShardManager shardManagerActor = shardManager.underlyingActor();
711         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
712                 RaftState.Follower.name(), RaftState.Candidate.name()));
713
714         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
715
716         // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
717         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
718
719         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
720     }
721
722     @Test
723     public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
724         final Props persistentProps = ShardManager.props(
725                 new MockClusterWrapper(),
726                 new MockConfiguration(),
727                 DatastoreContext.newBuilder().persistent(true).build(), ready);
728         final TestActorRef<ShardManager> shardManager =
729                 TestActorRef.create(getSystem(), persistentProps);
730
731         ShardManager shardManagerActor = shardManager.underlyingActor();
732         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
733                 RaftState.Candidate.name(), RaftState.Follower.name()));
734
735         // Initially will be false
736         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
737
738         // Send status true will make sync status true
739         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
740
741         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
742
743         // Send status false will make sync status false
744         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-default-unknown"));
745
746         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
747
748     }
749
750     @Test
751     public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
752         final Props persistentProps = ShardManager.props(
753                 new MockClusterWrapper(),
754                 new MockConfiguration() {
755                     @Override
756                     public List<String> getMemberShardNames(String memberName) {
757                         return Arrays.asList("default", "astronauts");
758                     }
759                 },
760                 DatastoreContext.newBuilder().persistent(true).build(), ready);
761         final TestActorRef<ShardManager> shardManager =
762                 TestActorRef.create(getSystem(), persistentProps);
763
764         ShardManager shardManagerActor = shardManager.underlyingActor();
765
766         // Initially will be false
767         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
768
769         // Make default shard leader
770         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
771                 RaftState.Follower.name(), RaftState.Leader.name()));
772
773         // default = Leader, astronauts is unknown so sync status remains false
774         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
775
776         // Make astronauts shard leader as well
777         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
778                 RaftState.Follower.name(), RaftState.Leader.name()));
779
780         // Now sync status should be true
781         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
782
783         // Make astronauts a Follower
784         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
785                 RaftState.Leader.name(), RaftState.Follower.name()));
786
787         // Sync status is not true
788         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
789
790         // Make the astronauts follower sync status true
791         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-astronauts-unknown"));
792
793         // Sync status is now true
794         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
795
796     }
797
798     private static class TestShardManager extends ShardManager {
799         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
800
801         TestShardManager(String shardMrgIDSuffix) {
802             super(new MockClusterWrapper(), new MockConfiguration(),
803                     DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready);
804         }
805
806         @Override
807         public void handleRecover(Object message) throws Exception {
808             try {
809                 super.handleRecover(message);
810             } finally {
811                 if(message instanceof RecoveryCompleted) {
812                     recoveryComplete.countDown();
813                 }
814             }
815         }
816
817         void waitForRecoveryComplete() {
818             assertEquals("Recovery complete", true,
819                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
820         }
821     }
822
823     @SuppressWarnings("serial")
824     static class TestShardManagerCreator implements Creator<TestShardManager> {
825         String shardMrgIDSuffix;
826
827         TestShardManagerCreator(String shardMrgIDSuffix) {
828             this.shardMrgIDSuffix = shardMrgIDSuffix;
829         }
830
831         @Override
832         public TestShardManager create() throws Exception {
833             return new TestShardManager(shardMrgIDSuffix);
834         }
835
836     }
837
838     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
839         private static final long serialVersionUID = 1L;
840         private final Creator<ShardManager> delegate;
841
842         public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
843             this.delegate = delegate;
844         }
845
846         @Override
847         public ShardManager create() throws Exception {
848             return delegate.create();
849         }
850     }
851
852     private static class ForwardingShardManager extends ShardManager {
853         private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
854         private CountDownLatch memberUpReceived = new CountDownLatch(1);
855         private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
856         private final ActorRef shardActor;
857         private final String name;
858
859         protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration,
860                 DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, String name,
861                 ActorRef shardActor) {
862             super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch);
863             this.shardActor = shardActor;
864             this.name = name;
865         }
866
867         @Override
868         public void handleCommand(Object message) throws Exception {
869             try{
870                 super.handleCommand(message);
871             } finally {
872                 if(message instanceof FindPrimary) {
873                     findPrimaryMessageReceived.countDown();
874                 } else if(message instanceof ClusterEvent.MemberUp) {
875                     String role = ((ClusterEvent.MemberUp)message).member().roles().head();
876                     if(!getCluster().getCurrentMemberName().equals(role)) {
877                         memberUpReceived.countDown();
878                     }
879                 } else if(message instanceof ClusterEvent.MemberRemoved) {
880                     String role = ((ClusterEvent.MemberRemoved)message).member().roles().head();
881                     if(!getCluster().getCurrentMemberName().equals(role)) {
882                         memberRemovedReceived.countDown();
883                     }
884                 }
885             }
886         }
887
888         @Override
889         public String persistenceId() {
890             return name;
891         }
892
893         @Override
894         protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
895             return shardActor;
896         }
897
898         void waitForMemberUp() {
899             assertEquals("MemberUp received", true,
900                     Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
901             memberUpReceived = new CountDownLatch(1);
902         }
903
904         void waitForMemberRemoved() {
905             assertEquals("MemberRemoved received", true,
906                     Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
907             memberRemovedReceived = new CountDownLatch(1);
908         }
909
910         void verifyFindPrimary() {
911             assertEquals("FindPrimary received", true,
912                     Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
913             findPrimaryMessageReceived = new CountDownLatch(1);
914         }
915     }
916 }