Bug 4105: Pass ModuleShardConfiguration with CreateShard
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardManagerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Mockito.mock;
17 import static org.mockito.Mockito.never;
18 import static org.mockito.Mockito.times;
19 import static org.mockito.Mockito.verify;
20 import akka.actor.ActorRef;
21 import akka.actor.ActorSystem;
22 import akka.actor.AddressFromURIString;
23 import akka.actor.Props;
24 import akka.cluster.Cluster;
25 import akka.cluster.ClusterEvent;
26 import akka.dispatch.Dispatchers;
27 import akka.japi.Creator;
28 import akka.pattern.Patterns;
29 import akka.persistence.RecoveryCompleted;
30 import akka.testkit.JavaTestKit;
31 import akka.testkit.TestActorRef;
32 import akka.util.Timeout;
33 import com.google.common.base.Optional;
34 import com.google.common.collect.ImmutableMap;
35 import com.google.common.collect.ImmutableSet;
36 import com.google.common.collect.Sets;
37 import com.google.common.util.concurrent.Uninterruptibles;
38 import com.typesafe.config.ConfigFactory;
39 import java.net.URI;
40 import java.util.Arrays;
41 import java.util.List;
42 import java.util.Map;
43 import java.util.concurrent.CountDownLatch;
44 import java.util.concurrent.TimeUnit;
45 import org.junit.After;
46 import org.junit.Before;
47 import org.junit.Test;
48 import org.mockito.Mock;
49 import org.mockito.MockitoAnnotations;
50 import org.opendaylight.controller.cluster.datastore.config.Configuration;
51 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
52 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
53 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
54 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
55 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
56 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
57 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
58 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
59 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
60 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
61 import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply;
62 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
63 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
64 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
65 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
66 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
67 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
68 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
69 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
70 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
71 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
72 import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
73 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
74 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
75 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
76 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
77 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
78 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
79 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
80 import org.opendaylight.controller.cluster.raft.RaftState;
81 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
82 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
83 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
84 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
85 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
86 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
87 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
88 import scala.concurrent.Await;
89 import scala.concurrent.Future;
90 import scala.concurrent.duration.FiniteDuration;
91
92 public class ShardManagerTest extends AbstractActorTest {
93     private static int ID_COUNTER = 1;
94
95     private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
96     private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix;
97
98     @Mock
99     private static CountDownLatch ready;
100
101     private static TestActorRef<MessageCollectorActor> mockShardActor;
102
103     private static String mockShardName;
104
105     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
106             dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
107                    .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
108
109     private static ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
110         String name = new ShardIdentifier(shardName, memberName,"config").toString();
111         return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
112     }
113
114     private final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
115
116     @Before
117     public void setUp() {
118         MockitoAnnotations.initMocks(this);
119
120         InMemoryJournal.clear();
121
122         if(mockShardActor == null) {
123             mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString();
124             mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), mockShardName);
125         }
126
127         mockShardActor.underlyingActor().clear();
128     }
129
130     @After
131     public void tearDown() {
132         InMemoryJournal.clear();
133     }
134
135     private Props newShardMgrProps() {
136         return newShardMgrProps(new MockConfiguration());
137     }
138
139     private Props newShardMgrProps(Configuration config) {
140         return ShardManager.props(new MockClusterWrapper(), config, datastoreContextBuilder.build(), ready,
141                 primaryShardInfoCache);
142     }
143
144     private Props newPropsShardMgrWithMockShardActor() {
145         return newPropsShardMgrWithMockShardActor("shardManager", mockShardActor, new MockClusterWrapper(),
146                 new MockConfiguration());
147     }
148
149     private Props newPropsShardMgrWithMockShardActor(final String name, final ActorRef shardActor,
150             final ClusterWrapper clusterWrapper, final Configuration config) {
151         Creator<ShardManager> creator = new Creator<ShardManager>() {
152             private static final long serialVersionUID = 1L;
153             @Override
154             public ShardManager create() throws Exception {
155                 return new ForwardingShardManager(clusterWrapper, config, datastoreContextBuilder.build(),
156                         ready, name, shardActor, primaryShardInfoCache);
157             }
158         };
159
160         return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
161     }
162
163     @Test
164     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
165         new JavaTestKit(getSystem()) {{
166             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
167
168             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
169
170             shardManager.tell(new FindPrimary("non-existent", false), getRef());
171
172             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
173         }};
174     }
175
176     @Test
177     public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception {
178         new JavaTestKit(getSystem()) {{
179             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
180
181             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
182
183             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
184             shardManager.tell(new ActorInitialized(), mockShardActor);
185
186             DataTree mockDataTree = mock(DataTree.class);
187             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
188                     DataStoreVersions.CURRENT_VERSION), getRef());
189
190             MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
191             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
192                     RaftState.Leader.name())), mockShardActor);
193
194             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
195
196             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
197             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
198                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
199             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
200         }};
201     }
202
203     @Test
204     public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() throws Exception {
205         new JavaTestKit(getSystem()) {{
206             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
207
208             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
209             shardManager.tell(new ActorInitialized(), mockShardActor);
210
211             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
212             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
213             shardManager.tell(new RoleChangeNotification(memberId1,
214                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
215             shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION), mockShardActor);
216
217             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
218
219             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
220         }};
221     }
222
223     @Test
224     public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception {
225         new JavaTestKit(getSystem()) {{
226             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
227
228             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
229             shardManager.tell(new ActorInitialized(), mockShardActor);
230
231             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
232             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
233
234             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
235             shardManager.tell(new RoleChangeNotification(memberId1,
236                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
237             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
238             shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, Optional.<DataTree>absent(),
239                     leaderVersion), mockShardActor);
240
241             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
242
243             RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
244             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
245                     primaryFound.getPrimaryPath().contains("member-2-shard-default"));
246             assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
247         }};
248     }
249
250     @Test
251     public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception {
252         new JavaTestKit(getSystem()) {{
253             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
254
255             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
256
257             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
258         }};
259     }
260
261     @Test
262     public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception {
263         new JavaTestKit(getSystem()) {{
264             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
265
266             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
267             shardManager.tell(new ActorInitialized(), mockShardActor);
268
269             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
270
271             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
272         }};
273     }
274
275     @Test
276     public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception {
277         new JavaTestKit(getSystem()) {{
278             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
279
280             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
281             shardManager.tell(new ActorInitialized(), mockShardActor);
282
283             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
284             shardManager.tell(new RoleChangeNotification(memberId,
285                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
286
287             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
288
289             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
290
291             DataTree mockDataTree = mock(DataTree.class);
292             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
293                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
294
295             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
296
297             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
298             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
299                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
300             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
301         }};
302     }
303
304     @Test
305     public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
306         new JavaTestKit(getSystem()) {{
307             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
308
309             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
310
311             // We're passing waitUntilInitialized = true to FindPrimary so the response should be
312             // delayed until we send ActorInitialized and RoleChangeNotification.
313             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
314
315             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
316
317             shardManager.tell(new ActorInitialized(), mockShardActor);
318
319             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
320
321             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
322             shardManager.tell(new RoleChangeNotification(memberId,
323                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor);
324
325             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
326
327             DataTree mockDataTree = mock(DataTree.class);
328             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
329                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
330
331             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
332             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
333                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
334             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
335
336             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
337         }};
338     }
339
340     @Test
341     public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception {
342         new JavaTestKit(getSystem()) {{
343             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
344
345             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
346
347             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
348
349             expectMsgClass(duration("2 seconds"), NotInitializedException.class);
350
351             shardManager.tell(new ActorInitialized(), mockShardActor);
352
353             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
354         }};
355     }
356
357     @Test
358     public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception {
359         new JavaTestKit(getSystem()) {{
360             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
361
362             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
363             shardManager.tell(new ActorInitialized(), mockShardActor);
364             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
365                     null, RaftState.Candidate.name()), mockShardActor);
366
367             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
368
369             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
370         }};
371     }
372
373     @Test
374     public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() throws Exception {
375         new JavaTestKit(getSystem()) {{
376             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
377
378             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
379             shardManager.tell(new ActorInitialized(), mockShardActor);
380             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
381                     null, RaftState.IsolatedLeader.name()), mockShardActor);
382
383             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
384
385             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
386         }};
387     }
388
389     @Test
390     public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
391         new JavaTestKit(getSystem()) {{
392             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
393
394             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
395             shardManager.tell(new ActorInitialized(), mockShardActor);
396
397             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
398
399             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
400         }};
401     }
402
403     @Test
404     public void testOnReceiveFindPrimaryForRemoteShard() throws Exception {
405         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
406
407         // Create an ActorSystem ShardManager actor for member-1.
408
409         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
410         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
411
412         ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
413
414         final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
415                 newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
416                         new MockConfiguration()), shardManagerID);
417
418         // Create an ActorSystem ShardManager actor for member-2.
419
420         final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
421
422         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
423
424         final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
425
426         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
427                 put("default", Arrays.asList("member-1", "member-2")).
428                 put("astronauts", Arrays.asList("member-2")).build());
429
430         final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
431                 newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
432                         mockConfig2), shardManagerID);
433
434         new JavaTestKit(system1) {{
435
436             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
437             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
438
439             shardManager2.tell(new ActorInitialized(), mockShardActor2);
440
441             String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
442             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
443             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2,
444                     Optional.of(mock(DataTree.class)), leaderVersion), mockShardActor2);
445             shardManager2.tell(new RoleChangeNotification(memberId2,
446                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
447
448             shardManager1.underlyingActor().waitForMemberUp();
449
450             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
451
452             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
453             String path = found.getPrimaryPath();
454             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
455             assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
456
457             shardManager2.underlyingActor().verifyFindPrimary();
458
459             Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
460
461             shardManager1.underlyingActor().waitForMemberRemoved();
462
463             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
464
465             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
466         }};
467
468         JavaTestKit.shutdownActorSystem(system1);
469         JavaTestKit.shutdownActorSystem(system2);
470     }
471
472     @Test
473     public void testShardAvailabilityOnChangeOfMemberReachability() throws Exception {
474         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
475
476         // Create an ActorSystem ShardManager actor for member-1.
477
478         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
479         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
480
481         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
482
483         final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
484             newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
485                 new MockConfiguration()), shardManagerID);
486
487         // Create an ActorSystem ShardManager actor for member-2.
488
489         final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
490
491         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
492
493         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
494
495         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
496             put("default", Arrays.asList("member-1", "member-2")).build());
497
498         final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
499             newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
500                 mockConfig2), shardManagerID);
501
502         new JavaTestKit(system1) {{
503
504             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
505             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
506             shardManager1.tell(new ActorInitialized(), mockShardActor1);
507             shardManager2.tell(new ActorInitialized(), mockShardActor2);
508
509             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
510             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
511             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
512                 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
513             shardManager1.tell(new RoleChangeNotification(memberId1,
514                 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
515             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)),
516                     DataStoreVersions.CURRENT_VERSION),
517                 mockShardActor2);
518             shardManager2.tell(new RoleChangeNotification(memberId2,
519                 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
520             shardManager1.underlyingActor().waitForMemberUp();
521
522             shardManager1.tell(new FindPrimary("default", true), getRef());
523
524             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
525             String path = found.getPrimaryPath();
526             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
527
528             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
529                 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
530
531             shardManager1.underlyingActor().waitForUnreachableMember();
532
533             PeerDown peerDown = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
534             assertEquals("getMemberName", "member-2", peerDown.getMemberName());
535             MessageCollectorActor.clearMessages(mockShardActor1);
536
537             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
538                     createMemberRemoved("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
539
540             MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
541
542             shardManager1.tell(new FindPrimary("default", true), getRef());
543
544             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
545
546             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
547                 createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
548
549             shardManager1.underlyingActor().waitForReachableMember();
550
551             PeerUp peerUp = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
552             assertEquals("getMemberName", "member-2", peerUp.getMemberName());
553             MessageCollectorActor.clearMessages(mockShardActor1);
554
555             shardManager1.tell(new FindPrimary("default", true), getRef());
556
557             RemotePrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
558             String path1 = found1.getPrimaryPath();
559             assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
560
561             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
562                     createMemberUp("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
563
564             MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
565
566         }};
567
568         JavaTestKit.shutdownActorSystem(system1);
569         JavaTestKit.shutdownActorSystem(system2);
570     }
571
572     @Test
573     public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() throws Exception {
574         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
575
576         // Create an ActorSystem ShardManager actor for member-1.
577
578         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
579         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
580
581         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
582
583         final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
584             newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
585                 new MockConfiguration()), shardManagerID);
586
587         // Create an ActorSystem ShardManager actor for member-2.
588
589         final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
590
591         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
592
593         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
594
595         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
596             put("default", Arrays.asList("member-1", "member-2")).build());
597
598         final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
599             newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
600                 mockConfig2), shardManagerID);
601
602         new JavaTestKit(system1) {{
603
604             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
605             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
606             shardManager1.tell(new ActorInitialized(), mockShardActor1);
607             shardManager2.tell(new ActorInitialized(), mockShardActor2);
608
609             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
610             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
611             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
612                 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
613             shardManager1.tell(new RoleChangeNotification(memberId1,
614                 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
615             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)),
616                     DataStoreVersions.CURRENT_VERSION),
617                 mockShardActor2);
618             shardManager2.tell(new RoleChangeNotification(memberId2,
619                 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
620             shardManager1.underlyingActor().waitForMemberUp();
621
622             shardManager1.tell(new FindPrimary("default", true), getRef());
623
624             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
625             String path = found.getPrimaryPath();
626             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
627
628             primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(system1.actorSelection(
629                     mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION, Optional.<DataTree>absent()));
630
631             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
632                 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
633
634             shardManager1.underlyingActor().waitForUnreachableMember();
635
636             shardManager1.tell(new FindPrimary("default", true), getRef());
637
638             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
639
640             assertNull("Expected primaryShardInfoCache entry removed", primaryShardInfoCache.getIfPresent("default"));
641
642             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, Optional.of(mock(DataTree.class)),
643                     DataStoreVersions.CURRENT_VERSION), mockShardActor1);
644             shardManager1.tell(new RoleChangeNotification(memberId1,
645                 RaftState.Follower.name(), RaftState.Leader.name()), mockShardActor1);
646
647             shardManager1.tell(new FindPrimary("default", true), getRef());
648
649             LocalPrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
650             String path1 = found1.getPrimaryPath();
651             assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
652
653         }};
654
655         JavaTestKit.shutdownActorSystem(system1);
656         JavaTestKit.shutdownActorSystem(system2);
657     }
658
659
660     @Test
661     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
662         new JavaTestKit(getSystem()) {{
663             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
664
665             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
666
667             shardManager.tell(new FindLocalShard("non-existent", false), getRef());
668
669             LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
670
671             assertEquals("getShardName", "non-existent", notFound.getShardName());
672         }};
673     }
674
675     @Test
676     public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
677         new JavaTestKit(getSystem()) {{
678             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
679
680             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
681             shardManager.tell(new ActorInitialized(), mockShardActor);
682
683             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
684
685             LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
686
687             assertTrue("Found path contains " + found.getPath().path().toString(),
688                     found.getPath().path().toString().contains("member-1-shard-default-config"));
689         }};
690     }
691
692     @Test
693     public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
694         new JavaTestKit(getSystem()) {{
695             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
696
697             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
698
699             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
700         }};
701     }
702
703     @Test
704     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
705         new JavaTestKit(getSystem()) {{
706             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
707
708             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
709
710             // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
711             // delayed until we send ActorInitialized.
712             Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
713                     new Timeout(5, TimeUnit.SECONDS));
714
715             shardManager.tell(new ActorInitialized(), mockShardActor);
716
717             Object resp = Await.result(future, duration("5 seconds"));
718             assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
719         }};
720     }
721
722     @Test
723     public void testOnRecoveryJournalIsCleaned() {
724         InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
725                 ImmutableSet.of("foo")));
726         InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules(
727                 ImmutableSet.of("bar")));
728         InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
729
730         new JavaTestKit(getSystem()) {{
731             TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
732                     Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
733
734             shardManager.underlyingActor().waitForRecoveryComplete();
735             InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
736
737             // Journal entries up to the last one should've been deleted
738             Map<Long, Object> journal = InMemoryJournal.get(shardMgrID);
739             synchronized (journal) {
740                 assertEquals("Journal size", 0, journal.size());
741             }
742         }};
743     }
744
745     @Test
746     public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
747         new JavaTestKit(getSystem()) {
748             {
749                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
750
751                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
752                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
753                         memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
754
755                 verify(ready, never()).countDown();
756
757                 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
758                         Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION));
759
760                 verify(ready, times(1)).countDown();
761
762             }};
763     }
764
765     @Test
766     public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
767         new JavaTestKit(getSystem()) {
768             {
769                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
770
771                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
772                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
773                         memberId, null, RaftState.Follower.name()));
774
775                 verify(ready, never()).countDown();
776
777                 shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
778
779                 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId,
780                         "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
781                         DataStoreVersions.CURRENT_VERSION));
782
783                 verify(ready, times(1)).countDown();
784
785             }};
786     }
787
788     @Test
789     public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
790         new JavaTestKit(getSystem()) {
791             {
792                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
793
794                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
795                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
796                         memberId, null, RaftState.Follower.name()));
797
798                 verify(ready, never()).countDown();
799
800                 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId,
801                         "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
802                         DataStoreVersions.CURRENT_VERSION));
803
804                 shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
805
806                 verify(ready, times(1)).countDown();
807
808             }};
809     }
810
811     @Test
812     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
813         new JavaTestKit(getSystem()) {
814             {
815                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
816
817                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
818                         "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
819
820                 verify(ready, never()).countDown();
821
822             }};
823     }
824
825
826     @Test
827     public void testByDefaultSyncStatusIsFalse() throws Exception{
828         final Props persistentProps = newShardMgrProps();
829         final TestActorRef<ShardManager> shardManager =
830                 TestActorRef.create(getSystem(), persistentProps);
831
832         ShardManager shardManagerActor = shardManager.underlyingActor();
833
834         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
835     }
836
837     @Test
838     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
839         final Props persistentProps = ShardManager.props(
840                 new MockClusterWrapper(),
841                 new MockConfiguration(),
842                 DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
843         final TestActorRef<ShardManager> shardManager =
844                 TestActorRef.create(getSystem(), persistentProps);
845
846         ShardManager shardManagerActor = shardManager.underlyingActor();
847         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
848                 RaftState.Follower.name(), RaftState.Leader.name()));
849
850         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
851     }
852
853     @Test
854     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
855         final Props persistentProps = newShardMgrProps();
856         final TestActorRef<ShardManager> shardManager =
857                 TestActorRef.create(getSystem(), persistentProps);
858
859         ShardManager shardManagerActor = shardManager.underlyingActor();
860         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
861                 RaftState.Follower.name(), RaftState.Candidate.name()));
862
863         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
864
865         // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
866         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
867
868         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
869     }
870
871     @Test
872     public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
873         final Props persistentProps = ShardManager.props(
874                 new MockClusterWrapper(),
875                 new MockConfiguration(),
876                 DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
877         final TestActorRef<ShardManager> shardManager =
878                 TestActorRef.create(getSystem(), persistentProps);
879
880         ShardManager shardManagerActor = shardManager.underlyingActor();
881         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
882                 RaftState.Candidate.name(), RaftState.Follower.name()));
883
884         // Initially will be false
885         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
886
887         // Send status true will make sync status true
888         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
889
890         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
891
892         // Send status false will make sync status false
893         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-default-unknown"));
894
895         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
896
897     }
898
899     @Test
900     public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
901         final Props persistentProps = ShardManager.props(
902                 new MockClusterWrapper(),
903                 new MockConfiguration() {
904                     @Override
905                     public List<String> getMemberShardNames(String memberName) {
906                         return Arrays.asList("default", "astronauts");
907                     }
908                 },
909                 DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
910         final TestActorRef<ShardManager> shardManager =
911                 TestActorRef.create(getSystem(), persistentProps);
912
913         ShardManager shardManagerActor = shardManager.underlyingActor();
914
915         // Initially will be false
916         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
917
918         // Make default shard leader
919         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
920                 RaftState.Follower.name(), RaftState.Leader.name()));
921
922         // default = Leader, astronauts is unknown so sync status remains false
923         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
924
925         // Make astronauts shard leader as well
926         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
927                 RaftState.Follower.name(), RaftState.Leader.name()));
928
929         // Now sync status should be true
930         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
931
932         // Make astronauts a Follower
933         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
934                 RaftState.Leader.name(), RaftState.Follower.name()));
935
936         // Sync status is not true
937         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
938
939         // Make the astronauts follower sync status true
940         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-astronauts-unknown"));
941
942         // Sync status is now true
943         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
944
945     }
946
947     @Test
948     public void testOnReceiveSwitchShardBehavior() throws Exception {
949         new JavaTestKit(getSystem()) {{
950             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
951
952             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
953             shardManager.tell(new ActorInitialized(), mockShardActor);
954
955             shardManager.tell(new SwitchShardBehavior(mockShardName, "Leader", 1000), getRef());
956
957             SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor, SwitchBehavior.class);
958
959             assertEquals(RaftState.Leader, switchBehavior.getNewState());
960             assertEquals(1000, switchBehavior.getNewTerm());
961         }};
962     }
963
964     public void testOnReceiveCreateShard() {
965         new JavaTestKit(getSystem()) {{
966             datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
967
968             ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
969                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
970
971             SchemaContext schemaContext = TestModel.createTestContext();
972             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
973
974             DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100).
975                     persistent(false).build();
976             TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator();
977
978             ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
979                     "foo", null, Arrays.asList("member-1", "member-5", "member-6"));
980             shardManager.tell(new CreateShard(config, shardPropsCreator, datastoreContext), getRef());
981
982             expectMsgClass(duration("5 seconds"), CreateShardReply.class);
983
984             shardManager.tell(new FindLocalShard("foo", true), getRef());
985
986             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
987
988             assertEquals("isRecoveryApplicable", false, shardPropsCreator.datastoreContext.isPersistent());
989             assertEquals("peerMembers", Sets.newHashSet(new ShardIdentifier("foo", "member-5", shardMrgIDSuffix).toString(),
990                     new ShardIdentifier("foo", "member-6", shardMrgIDSuffix).toString()),
991                     shardPropsCreator.peerAddresses.keySet());
992             assertEquals("ShardIdentifier", new ShardIdentifier("foo", "member-1", shardMrgIDSuffix),
993                     shardPropsCreator.shardId);
994             assertSame("schemaContext", schemaContext, shardPropsCreator.schemaContext);
995
996             // Send CreateShard with same name - should fail.
997
998             shardManager.tell(new CreateShard(config, shardPropsCreator, null), getRef());
999
1000             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1001         }};
1002     }
1003
1004     @Test
1005     public void testOnReceiveCreateShardWithNoInitialSchemaContext() {
1006         new JavaTestKit(getSystem()) {{
1007             ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
1008                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1009
1010             TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator();
1011
1012             ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1013                     "foo", null, Arrays.asList("member-1"));
1014             shardManager.tell(new CreateShard(config, shardPropsCreator, null), getRef());
1015
1016             expectMsgClass(duration("5 seconds"), CreateShardReply.class);
1017
1018             SchemaContext schemaContext = TestModel.createTestContext();
1019             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1020
1021             shardManager.tell(new FindLocalShard("foo", true), getRef());
1022
1023             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1024
1025             assertSame("schemaContext", schemaContext, shardPropsCreator.schemaContext);
1026             assertNotNull("schemaContext is null", shardPropsCreator.datastoreContext);
1027         }};
1028     }
1029
1030     private static class TestShardPropsCreator implements ShardPropsCreator {
1031         ShardIdentifier shardId;
1032         Map<String, String> peerAddresses;
1033         SchemaContext schemaContext;
1034         DatastoreContext datastoreContext;
1035
1036         @Override
1037         public Props newProps(ShardIdentifier shardId, Map<String, String> peerAddresses,
1038                 DatastoreContext datastoreContext, SchemaContext schemaContext) {
1039             this.shardId = shardId;
1040             this.peerAddresses = peerAddresses;
1041             this.schemaContext = schemaContext;
1042             this.datastoreContext = datastoreContext;
1043             return Shard.props(shardId, peerAddresses, datastoreContext, schemaContext);
1044         }
1045
1046     }
1047
1048     private static class TestShardManager extends ShardManager {
1049         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
1050
1051         TestShardManager(String shardMrgIDSuffix) {
1052             super(new MockClusterWrapper(), new MockConfiguration(),
1053                     DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready,
1054                     new PrimaryShardInfoFutureCache());
1055         }
1056
1057         @Override
1058         public void handleRecover(Object message) throws Exception {
1059             try {
1060                 super.handleRecover(message);
1061             } finally {
1062                 if(message instanceof RecoveryCompleted) {
1063                     recoveryComplete.countDown();
1064                 }
1065             }
1066         }
1067
1068         void waitForRecoveryComplete() {
1069             assertEquals("Recovery complete", true,
1070                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
1071         }
1072     }
1073
1074     @SuppressWarnings("serial")
1075     static class TestShardManagerCreator implements Creator<TestShardManager> {
1076         String shardMrgIDSuffix;
1077
1078         TestShardManagerCreator(String shardMrgIDSuffix) {
1079             this.shardMrgIDSuffix = shardMrgIDSuffix;
1080         }
1081
1082         @Override
1083         public TestShardManager create() throws Exception {
1084             return new TestShardManager(shardMrgIDSuffix);
1085         }
1086
1087     }
1088
1089     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
1090         private static final long serialVersionUID = 1L;
1091         private final Creator<ShardManager> delegate;
1092
1093         public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
1094             this.delegate = delegate;
1095         }
1096
1097         @Override
1098         public ShardManager create() throws Exception {
1099             return delegate.create();
1100         }
1101     }
1102
1103     private static class ForwardingShardManager extends ShardManager {
1104         private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
1105         private CountDownLatch memberUpReceived = new CountDownLatch(1);
1106         private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
1107         private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
1108         private CountDownLatch memberReachableReceived = new CountDownLatch(1);
1109         private final ActorRef shardActor;
1110         private final String name;
1111
1112         protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration,
1113                 DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, String name,
1114                 ActorRef shardActor, PrimaryShardInfoFutureCache primaryShardInfoCache) {
1115             super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch, primaryShardInfoCache);
1116             this.shardActor = shardActor;
1117             this.name = name;
1118         }
1119
1120         @Override
1121         public void handleCommand(Object message) throws Exception {
1122             try{
1123                 super.handleCommand(message);
1124             } finally {
1125                 if(message instanceof FindPrimary) {
1126                     findPrimaryMessageReceived.countDown();
1127                 } else if(message instanceof ClusterEvent.MemberUp) {
1128                     String role = ((ClusterEvent.MemberUp)message).member().roles().head();
1129                     if(!getCluster().getCurrentMemberName().equals(role)) {
1130                         memberUpReceived.countDown();
1131                     }
1132                 } else if(message instanceof ClusterEvent.MemberRemoved) {
1133                     String role = ((ClusterEvent.MemberRemoved)message).member().roles().head();
1134                     if(!getCluster().getCurrentMemberName().equals(role)) {
1135                         memberRemovedReceived.countDown();
1136                     }
1137                 } else if(message instanceof ClusterEvent.UnreachableMember) {
1138                     String role = ((ClusterEvent.UnreachableMember)message).member().roles().head();
1139                     if(!getCluster().getCurrentMemberName().equals(role)) {
1140                         memberUnreachableReceived.countDown();
1141                     }
1142                 } else if(message instanceof ClusterEvent.ReachableMember) {
1143                     String role = ((ClusterEvent.ReachableMember)message).member().roles().head();
1144                     if(!getCluster().getCurrentMemberName().equals(role)) {
1145                         memberReachableReceived.countDown();
1146                     }
1147                 }
1148             }
1149         }
1150
1151         @Override
1152         public String persistenceId() {
1153             return name;
1154         }
1155
1156         @Override
1157         protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
1158             return shardActor;
1159         }
1160
1161         void waitForMemberUp() {
1162             assertEquals("MemberUp received", true,
1163                     Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
1164             memberUpReceived = new CountDownLatch(1);
1165         }
1166
1167         void waitForMemberRemoved() {
1168             assertEquals("MemberRemoved received", true,
1169                     Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
1170             memberRemovedReceived = new CountDownLatch(1);
1171         }
1172
1173         void waitForUnreachableMember() {
1174             assertEquals("UnreachableMember received", true,
1175                 Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS
1176                 ));
1177             memberUnreachableReceived = new CountDownLatch(1);
1178         }
1179
1180         void waitForReachableMember() {
1181             assertEquals("ReachableMember received", true,
1182                 Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
1183             memberReachableReceived = new CountDownLatch(1);
1184         }
1185
1186         void verifyFindPrimary() {
1187             assertEquals("FindPrimary received", true,
1188                     Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
1189             findPrimaryMessageReceived = new CountDownLatch(1);
1190         }
1191     }
1192 }