1ffa6790d8477f30755c1ce4513e02ef48321e18
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardManagerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Mockito.mock;
17 import static org.mockito.Mockito.never;
18 import static org.mockito.Mockito.times;
19 import static org.mockito.Mockito.verify;
20 import akka.actor.ActorRef;
21 import akka.actor.ActorSystem;
22 import akka.actor.AddressFromURIString;
23 import akka.actor.Props;
24 import akka.cluster.Cluster;
25 import akka.cluster.ClusterEvent;
26 import akka.dispatch.Dispatchers;
27 import akka.japi.Creator;
28 import akka.pattern.Patterns;
29 import akka.persistence.RecoveryCompleted;
30 import akka.testkit.JavaTestKit;
31 import akka.testkit.TestActorRef;
32 import akka.util.Timeout;
33 import com.google.common.base.Optional;
34 import com.google.common.collect.ImmutableMap;
35 import com.google.common.collect.ImmutableSet;
36 import com.google.common.collect.Sets;
37 import com.google.common.util.concurrent.Uninterruptibles;
38 import com.typesafe.config.ConfigFactory;
39 import java.util.Arrays;
40 import java.util.Collections;
41 import java.util.List;
42 import java.util.Map;
43 import java.util.concurrent.CountDownLatch;
44 import java.util.concurrent.TimeUnit;
45 import org.junit.After;
46 import org.junit.Before;
47 import org.junit.Test;
48 import org.mockito.Mock;
49 import org.mockito.MockitoAnnotations;
50 import org.opendaylight.controller.cluster.datastore.config.Configuration;
51 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
52 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
53 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
54 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
55 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
56 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
57 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
58 import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply;
59 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
60 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
61 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
62 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
63 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
64 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
65 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
66 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
67 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
68 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
69 import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
70 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
71 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
72 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
73 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
74 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
75 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
76 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
77 import org.opendaylight.controller.cluster.raft.RaftState;
78 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
79 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
80 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
81 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
82 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
83 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
84 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
85 import scala.concurrent.Await;
86 import scala.concurrent.Future;
87 import scala.concurrent.duration.FiniteDuration;
88
89 public class ShardManagerTest extends AbstractActorTest {
90     private static int ID_COUNTER = 1;
91
92     private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
93     private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix;
94
95     @Mock
96     private static CountDownLatch ready;
97
98     private static TestActorRef<MessageCollectorActor> mockShardActor;
99
100     private static String mockShardName;
101
102     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
103             dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
104                    .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
105
106     private static ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
107         String name = new ShardIdentifier(shardName, memberName,"config").toString();
108         return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
109     }
110
111     private final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
112
113     @Before
114     public void setUp() {
115         MockitoAnnotations.initMocks(this);
116
117         InMemoryJournal.clear();
118
119         if(mockShardActor == null) {
120             mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString();
121             mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), mockShardName);
122         }
123
124         mockShardActor.underlyingActor().clear();
125     }
126
127     @After
128     public void tearDown() {
129         InMemoryJournal.clear();
130     }
131
132     private Props newShardMgrProps(boolean persistent) {
133         return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
134                 datastoreContextBuilder.persistent(persistent).build(), ready, primaryShardInfoCache);
135     }
136
137     private Props newPropsShardMgrWithMockShardActor() {
138         return newPropsShardMgrWithMockShardActor("shardManager", mockShardActor, new MockClusterWrapper(),
139                 new MockConfiguration());
140     }
141
142     private Props newPropsShardMgrWithMockShardActor(final String name, final ActorRef shardActor,
143             final ClusterWrapper clusterWrapper, final Configuration config) {
144         Creator<ShardManager> creator = new Creator<ShardManager>() {
145             private static final long serialVersionUID = 1L;
146             @Override
147             public ShardManager create() throws Exception {
148                 return new ForwardingShardManager(clusterWrapper, config, datastoreContextBuilder.build(),
149                         ready, name, shardActor, primaryShardInfoCache);
150             }
151         };
152
153         return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
154     }
155
156     @Test
157     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
158         new JavaTestKit(getSystem()) {{
159             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
160
161             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
162
163             shardManager.tell(new FindPrimary("non-existent", false), getRef());
164
165             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
166         }};
167     }
168
169     @Test
170     public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception {
171         new JavaTestKit(getSystem()) {{
172             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
173
174             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
175
176             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
177             shardManager.tell(new ActorInitialized(), mockShardActor);
178
179             DataTree mockDataTree = mock(DataTree.class);
180             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
181                     DataStoreVersions.CURRENT_VERSION), getRef());
182
183             MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
184             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
185                     RaftState.Leader.name())), mockShardActor);
186
187             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
188
189             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
190             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
191                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
192             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
193         }};
194     }
195
196     @Test
197     public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() throws Exception {
198         new JavaTestKit(getSystem()) {{
199             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
200
201             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
202             shardManager.tell(new ActorInitialized(), mockShardActor);
203
204             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
205             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
206             shardManager.tell(new RoleChangeNotification(memberId1,
207                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
208             shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION), mockShardActor);
209
210             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
211
212             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
213         }};
214     }
215
216     @Test
217     public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception {
218         new JavaTestKit(getSystem()) {{
219             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
220
221             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
222             shardManager.tell(new ActorInitialized(), mockShardActor);
223
224             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
225             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
226
227             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
228             shardManager.tell(new RoleChangeNotification(memberId1,
229                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
230             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
231             shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, Optional.<DataTree>absent(),
232                     leaderVersion), mockShardActor);
233
234             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
235
236             RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
237             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
238                     primaryFound.getPrimaryPath().contains("member-2-shard-default"));
239             assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
240         }};
241     }
242
243     @Test
244     public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception {
245         new JavaTestKit(getSystem()) {{
246             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
247
248             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
249
250             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
251         }};
252     }
253
254     @Test
255     public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception {
256         new JavaTestKit(getSystem()) {{
257             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
258
259             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
260             shardManager.tell(new ActorInitialized(), mockShardActor);
261
262             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
263
264             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
265         }};
266     }
267
268     @Test
269     public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception {
270         new JavaTestKit(getSystem()) {{
271             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
272
273             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
274             shardManager.tell(new ActorInitialized(), mockShardActor);
275
276             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
277             shardManager.tell(new RoleChangeNotification(memberId,
278                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
279
280             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
281
282             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
283
284             DataTree mockDataTree = mock(DataTree.class);
285             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
286                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
287
288             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
289
290             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
291             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
292                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
293             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
294         }};
295     }
296
297     @Test
298     public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
299         new JavaTestKit(getSystem()) {{
300             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
301
302             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
303
304             // We're passing waitUntilInitialized = true to FindPrimary so the response should be
305             // delayed until we send ActorInitialized and RoleChangeNotification.
306             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
307
308             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
309
310             shardManager.tell(new ActorInitialized(), mockShardActor);
311
312             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
313
314             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
315             shardManager.tell(new RoleChangeNotification(memberId,
316                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor);
317
318             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
319
320             DataTree mockDataTree = mock(DataTree.class);
321             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
322                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
323
324             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
325             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
326                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
327             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
328
329             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
330         }};
331     }
332
333     @Test
334     public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception {
335         new JavaTestKit(getSystem()) {{
336             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
337
338             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
339
340             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
341
342             expectMsgClass(duration("2 seconds"), NotInitializedException.class);
343
344             shardManager.tell(new ActorInitialized(), mockShardActor);
345
346             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
347         }};
348     }
349
350     @Test
351     public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception {
352         new JavaTestKit(getSystem()) {{
353             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
354
355             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
356             shardManager.tell(new ActorInitialized(), mockShardActor);
357             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
358                     null, RaftState.Candidate.name()), mockShardActor);
359
360             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
361
362             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
363         }};
364     }
365
366     @Test
367     public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() throws Exception {
368         new JavaTestKit(getSystem()) {{
369             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
370
371             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
372             shardManager.tell(new ActorInitialized(), mockShardActor);
373             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
374                     null, RaftState.IsolatedLeader.name()), mockShardActor);
375
376             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
377
378             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
379         }};
380     }
381
382     @Test
383     public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
384         new JavaTestKit(getSystem()) {{
385             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
386
387             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
388             shardManager.tell(new ActorInitialized(), mockShardActor);
389
390             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
391
392             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
393         }};
394     }
395
396     @Test
397     public void testOnReceiveFindPrimaryForRemoteShard() throws Exception {
398         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
399
400         // Create an ActorSystem ShardManager actor for member-1.
401
402         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
403         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
404
405         ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
406
407         final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
408                 newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
409                         new MockConfiguration()), shardManagerID);
410
411         // Create an ActorSystem ShardManager actor for member-2.
412
413         final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
414
415         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
416
417         final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
418
419         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
420                 put("default", Arrays.asList("member-1", "member-2")).
421                 put("astronauts", Arrays.asList("member-2")).build());
422
423         final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
424                 newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
425                         mockConfig2), shardManagerID);
426
427         new JavaTestKit(system1) {{
428
429             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
430             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
431
432             shardManager2.tell(new ActorInitialized(), mockShardActor2);
433
434             String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
435             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
436             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2,
437                     Optional.of(mock(DataTree.class)), leaderVersion), mockShardActor2);
438             shardManager2.tell(new RoleChangeNotification(memberId2,
439                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
440
441             shardManager1.underlyingActor().waitForMemberUp();
442
443             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
444
445             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
446             String path = found.getPrimaryPath();
447             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
448             assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
449
450             shardManager2.underlyingActor().verifyFindPrimary();
451
452             Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
453
454             shardManager1.underlyingActor().waitForMemberRemoved();
455
456             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
457
458             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
459         }};
460
461         JavaTestKit.shutdownActorSystem(system1);
462         JavaTestKit.shutdownActorSystem(system2);
463     }
464
465     @Test
466     public void testShardAvailabilityOnChangeOfMemberReachability() throws Exception {
467         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
468
469         // Create an ActorSystem ShardManager actor for member-1.
470
471         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
472         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
473
474         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
475
476         final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
477             newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
478                 new MockConfiguration()), shardManagerID);
479
480         // Create an ActorSystem ShardManager actor for member-2.
481
482         final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
483
484         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
485
486         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
487
488         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
489             put("default", Arrays.asList("member-1", "member-2")).build());
490
491         final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
492             newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
493                 mockConfig2), shardManagerID);
494
495         new JavaTestKit(system1) {{
496
497             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
498             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
499             shardManager1.tell(new ActorInitialized(), mockShardActor1);
500             shardManager2.tell(new ActorInitialized(), mockShardActor2);
501
502             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
503             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
504             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
505                 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
506             shardManager1.tell(new RoleChangeNotification(memberId1,
507                 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
508             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)),
509                     DataStoreVersions.CURRENT_VERSION),
510                 mockShardActor2);
511             shardManager2.tell(new RoleChangeNotification(memberId2,
512                 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
513             shardManager1.underlyingActor().waitForMemberUp();
514
515             shardManager1.tell(new FindPrimary("default", true), getRef());
516
517             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
518             String path = found.getPrimaryPath();
519             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
520
521             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
522                 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
523
524             shardManager1.underlyingActor().waitForUnreachableMember();
525
526             PeerDown peerDown = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
527             assertEquals("getMemberName", "member-2", peerDown.getMemberName());
528             MessageCollectorActor.clearMessages(mockShardActor1);
529
530             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
531                     createMemberRemoved("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
532
533             MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
534
535             shardManager1.tell(new FindPrimary("default", true), getRef());
536
537             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
538
539             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
540                 createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
541
542             shardManager1.underlyingActor().waitForReachableMember();
543
544             PeerUp peerUp = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
545             assertEquals("getMemberName", "member-2", peerUp.getMemberName());
546             MessageCollectorActor.clearMessages(mockShardActor1);
547
548             shardManager1.tell(new FindPrimary("default", true), getRef());
549
550             RemotePrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
551             String path1 = found1.getPrimaryPath();
552             assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
553
554             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
555                     createMemberUp("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
556
557             MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
558
559         }};
560
561         JavaTestKit.shutdownActorSystem(system1);
562         JavaTestKit.shutdownActorSystem(system2);
563     }
564
565     @Test
566     public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() throws Exception {
567         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
568
569         // Create an ActorSystem ShardManager actor for member-1.
570
571         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
572         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
573
574         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
575
576         final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
577             newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
578                 new MockConfiguration()), shardManagerID);
579
580         // Create an ActorSystem ShardManager actor for member-2.
581
582         final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
583
584         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
585
586         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
587
588         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
589             put("default", Arrays.asList("member-1", "member-2")).build());
590
591         final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
592             newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
593                 mockConfig2), shardManagerID);
594
595         new JavaTestKit(system1) {{
596
597             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
598             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
599             shardManager1.tell(new ActorInitialized(), mockShardActor1);
600             shardManager2.tell(new ActorInitialized(), mockShardActor2);
601
602             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
603             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
604             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
605                 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
606             shardManager1.tell(new RoleChangeNotification(memberId1,
607                 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
608             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)),
609                     DataStoreVersions.CURRENT_VERSION),
610                 mockShardActor2);
611             shardManager2.tell(new RoleChangeNotification(memberId2,
612                 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
613             shardManager1.underlyingActor().waitForMemberUp();
614
615             shardManager1.tell(new FindPrimary("default", true), getRef());
616
617             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
618             String path = found.getPrimaryPath();
619             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
620
621             primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(system1.actorSelection(
622                     mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION, Optional.<DataTree>absent()));
623
624             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
625                 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
626
627             shardManager1.underlyingActor().waitForUnreachableMember();
628
629             shardManager1.tell(new FindPrimary("default", true), getRef());
630
631             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
632
633             assertNull("Expected primaryShardInfoCache entry removed", primaryShardInfoCache.getIfPresent("default"));
634
635             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, Optional.of(mock(DataTree.class)),
636                     DataStoreVersions.CURRENT_VERSION), mockShardActor1);
637             shardManager1.tell(new RoleChangeNotification(memberId1,
638                 RaftState.Follower.name(), RaftState.Leader.name()), mockShardActor1);
639
640             shardManager1.tell(new FindPrimary("default", true), getRef());
641
642             LocalPrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
643             String path1 = found1.getPrimaryPath();
644             assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
645
646         }};
647
648         JavaTestKit.shutdownActorSystem(system1);
649         JavaTestKit.shutdownActorSystem(system2);
650     }
651
652
653     @Test
654     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
655         new JavaTestKit(getSystem()) {{
656             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
657
658             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
659
660             shardManager.tell(new FindLocalShard("non-existent", false), getRef());
661
662             LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
663
664             assertEquals("getShardName", "non-existent", notFound.getShardName());
665         }};
666     }
667
668     @Test
669     public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
670         new JavaTestKit(getSystem()) {{
671             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
672
673             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
674             shardManager.tell(new ActorInitialized(), mockShardActor);
675
676             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
677
678             LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
679
680             assertTrue("Found path contains " + found.getPath().path().toString(),
681                     found.getPath().path().toString().contains("member-1-shard-default-config"));
682         }};
683     }
684
685     @Test
686     public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
687         new JavaTestKit(getSystem()) {{
688             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
689
690             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
691
692             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
693         }};
694     }
695
696     @Test
697     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
698         new JavaTestKit(getSystem()) {{
699             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
700
701             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
702
703             // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
704             // delayed until we send ActorInitialized.
705             Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
706                     new Timeout(5, TimeUnit.SECONDS));
707
708             shardManager.tell(new ActorInitialized(), mockShardActor);
709
710             Object resp = Await.result(future, duration("5 seconds"));
711             assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
712         }};
713     }
714
715     @Test
716     public void testOnRecoveryJournalIsCleaned() {
717         InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
718                 ImmutableSet.of("foo")));
719         InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules(
720                 ImmutableSet.of("bar")));
721         InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
722
723         new JavaTestKit(getSystem()) {{
724             TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
725                     Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
726
727             shardManager.underlyingActor().waitForRecoveryComplete();
728             InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
729
730             // Journal entries up to the last one should've been deleted
731             Map<Long, Object> journal = InMemoryJournal.get(shardMgrID);
732             synchronized (journal) {
733                 assertEquals("Journal size", 0, journal.size());
734             }
735         }};
736     }
737
738     @Test
739     public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
740         new JavaTestKit(getSystem()) {
741             {
742                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
743
744                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
745                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
746                         memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
747
748                 verify(ready, never()).countDown();
749
750                 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
751                         Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION));
752
753                 verify(ready, times(1)).countDown();
754
755             }};
756     }
757
758     @Test
759     public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
760         new JavaTestKit(getSystem()) {
761             {
762                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
763
764                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
765                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
766                         memberId, null, RaftState.Follower.name()));
767
768                 verify(ready, never()).countDown();
769
770                 shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
771
772                 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId,
773                         "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
774                         DataStoreVersions.CURRENT_VERSION));
775
776                 verify(ready, times(1)).countDown();
777
778             }};
779     }
780
781     @Test
782     public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
783         new JavaTestKit(getSystem()) {
784             {
785                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
786
787                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
788                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
789                         memberId, null, RaftState.Follower.name()));
790
791                 verify(ready, never()).countDown();
792
793                 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId,
794                         "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
795                         DataStoreVersions.CURRENT_VERSION));
796
797                 shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
798
799                 verify(ready, times(1)).countDown();
800
801             }};
802     }
803
804     @Test
805     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
806         new JavaTestKit(getSystem()) {
807             {
808                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
809
810                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
811                         "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
812
813                 verify(ready, never()).countDown();
814
815             }};
816     }
817
818
819     @Test
820     public void testByDefaultSyncStatusIsFalse() throws Exception{
821         final Props persistentProps = newShardMgrProps(true);
822         final TestActorRef<ShardManager> shardManager =
823                 TestActorRef.create(getSystem(), persistentProps);
824
825         ShardManager shardManagerActor = shardManager.underlyingActor();
826
827         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
828     }
829
830     @Test
831     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
832         final Props persistentProps = ShardManager.props(
833                 new MockClusterWrapper(),
834                 new MockConfiguration(),
835                 DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
836         final TestActorRef<ShardManager> shardManager =
837                 TestActorRef.create(getSystem(), persistentProps);
838
839         ShardManager shardManagerActor = shardManager.underlyingActor();
840         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
841                 RaftState.Follower.name(), RaftState.Leader.name()));
842
843         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
844     }
845
846     @Test
847     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
848         final Props persistentProps = newShardMgrProps(true);
849         final TestActorRef<ShardManager> shardManager =
850                 TestActorRef.create(getSystem(), persistentProps);
851
852         ShardManager shardManagerActor = shardManager.underlyingActor();
853         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
854                 RaftState.Follower.name(), RaftState.Candidate.name()));
855
856         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
857
858         // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
859         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
860
861         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
862     }
863
864     @Test
865     public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
866         final Props persistentProps = ShardManager.props(
867                 new MockClusterWrapper(),
868                 new MockConfiguration(),
869                 DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
870         final TestActorRef<ShardManager> shardManager =
871                 TestActorRef.create(getSystem(), persistentProps);
872
873         ShardManager shardManagerActor = shardManager.underlyingActor();
874         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
875                 RaftState.Candidate.name(), RaftState.Follower.name()));
876
877         // Initially will be false
878         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
879
880         // Send status true will make sync status true
881         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
882
883         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
884
885         // Send status false will make sync status false
886         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-default-unknown"));
887
888         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
889
890     }
891
892     @Test
893     public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
894         final Props persistentProps = ShardManager.props(
895                 new MockClusterWrapper(),
896                 new MockConfiguration() {
897                     @Override
898                     public List<String> getMemberShardNames(String memberName) {
899                         return Arrays.asList("default", "astronauts");
900                     }
901                 },
902                 DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
903         final TestActorRef<ShardManager> shardManager =
904                 TestActorRef.create(getSystem(), persistentProps);
905
906         ShardManager shardManagerActor = shardManager.underlyingActor();
907
908         // Initially will be false
909         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
910
911         // Make default shard leader
912         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
913                 RaftState.Follower.name(), RaftState.Leader.name()));
914
915         // default = Leader, astronauts is unknown so sync status remains false
916         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
917
918         // Make astronauts shard leader as well
919         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
920                 RaftState.Follower.name(), RaftState.Leader.name()));
921
922         // Now sync status should be true
923         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
924
925         // Make astronauts a Follower
926         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
927                 RaftState.Leader.name(), RaftState.Follower.name()));
928
929         // Sync status is not true
930         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
931
932         // Make the astronauts follower sync status true
933         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-astronauts-unknown"));
934
935         // Sync status is now true
936         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
937
938     }
939
940     @Test
941     public void testOnReceiveSwitchShardBehavior() throws Exception {
942         new JavaTestKit(getSystem()) {{
943             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
944
945             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
946             shardManager.tell(new ActorInitialized(), mockShardActor);
947
948             shardManager.tell(new SwitchShardBehavior(mockShardName, "Leader", 1000), getRef());
949
950             SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor, SwitchBehavior.class);
951
952             assertEquals(RaftState.Leader, switchBehavior.getNewState());
953             assertEquals(1000, switchBehavior.getNewTerm());
954         }};
955     }
956
957     public void testOnReceiveCreateShard() {
958         new JavaTestKit(getSystem()) {{
959             datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
960
961             ActorRef shardManager = getSystem().actorOf(newShardMgrProps(false));
962
963             SchemaContext schemaContext = TestModel.createTestContext();
964             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
965
966             DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100).
967                     persistent(false).build();
968             TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator();
969
970             shardManager.tell(new CreateShard("foo", Arrays.asList("member-1", "member-5", "member-6"), shardPropsCreator,
971                     datastoreContext), getRef());
972
973             expectMsgClass(duration("5 seconds"), CreateShardReply.class);
974
975             shardManager.tell(new FindLocalShard("foo", true), getRef());
976
977             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
978
979             assertEquals("isRecoveryApplicable", false, shardPropsCreator.datastoreContext.isPersistent());
980             assertEquals("peerMembers", Sets.newHashSet(new ShardIdentifier("foo", "member-5", shardMrgIDSuffix).toString(),
981                     new ShardIdentifier("foo", "member-6", shardMrgIDSuffix).toString()),
982                     shardPropsCreator.peerAddresses.keySet());
983             assertEquals("ShardIdentifier", new ShardIdentifier("foo", "member-1", shardMrgIDSuffix),
984                     shardPropsCreator.shardId);
985             assertSame("schemaContext", schemaContext, shardPropsCreator.schemaContext);
986
987             // Send CreateShard with same name - should fail.
988
989             shardManager.tell(new CreateShard("foo", Collections.<String>emptyList(), shardPropsCreator, null), getRef());
990
991             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
992         }};
993     }
994
995     @Test
996     public void testOnReceiveCreateShardWithNoInitialSchemaContext() {
997         new JavaTestKit(getSystem()) {{
998             ActorRef shardManager = getSystem().actorOf(newShardMgrProps(false));
999
1000             TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator();
1001
1002             shardManager.tell(new CreateShard("foo", Arrays.asList("member-1"), shardPropsCreator, null), getRef());
1003
1004             expectMsgClass(duration("5 seconds"), CreateShardReply.class);
1005
1006             SchemaContext schemaContext = TestModel.createTestContext();
1007             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1008
1009             shardManager.tell(new FindLocalShard("foo", true), getRef());
1010
1011             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1012
1013             assertSame("schemaContext", schemaContext, shardPropsCreator.schemaContext);
1014             assertNotNull("schemaContext is null", shardPropsCreator.datastoreContext);
1015         }};
1016     }
1017
1018     private static class TestShardPropsCreator implements ShardPropsCreator {
1019         ShardIdentifier shardId;
1020         Map<String, String> peerAddresses;
1021         SchemaContext schemaContext;
1022         DatastoreContext datastoreContext;
1023
1024         @Override
1025         public Props newProps(ShardIdentifier shardId, Map<String, String> peerAddresses,
1026                 DatastoreContext datastoreContext, SchemaContext schemaContext) {
1027             this.shardId = shardId;
1028             this.peerAddresses = peerAddresses;
1029             this.schemaContext = schemaContext;
1030             this.datastoreContext = datastoreContext;
1031             return Shard.props(shardId, peerAddresses, datastoreContext, schemaContext);
1032         }
1033
1034     }
1035
1036     private static class TestShardManager extends ShardManager {
1037         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
1038
1039         TestShardManager(String shardMrgIDSuffix) {
1040             super(new MockClusterWrapper(), new MockConfiguration(),
1041                     DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready,
1042                     new PrimaryShardInfoFutureCache());
1043         }
1044
1045         @Override
1046         public void handleRecover(Object message) throws Exception {
1047             try {
1048                 super.handleRecover(message);
1049             } finally {
1050                 if(message instanceof RecoveryCompleted) {
1051                     recoveryComplete.countDown();
1052                 }
1053             }
1054         }
1055
1056         void waitForRecoveryComplete() {
1057             assertEquals("Recovery complete", true,
1058                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
1059         }
1060     }
1061
1062     @SuppressWarnings("serial")
1063     static class TestShardManagerCreator implements Creator<TestShardManager> {
1064         String shardMrgIDSuffix;
1065
1066         TestShardManagerCreator(String shardMrgIDSuffix) {
1067             this.shardMrgIDSuffix = shardMrgIDSuffix;
1068         }
1069
1070         @Override
1071         public TestShardManager create() throws Exception {
1072             return new TestShardManager(shardMrgIDSuffix);
1073         }
1074
1075     }
1076
1077     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
1078         private static final long serialVersionUID = 1L;
1079         private final Creator<ShardManager> delegate;
1080
1081         public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
1082             this.delegate = delegate;
1083         }
1084
1085         @Override
1086         public ShardManager create() throws Exception {
1087             return delegate.create();
1088         }
1089     }
1090
1091     private static class ForwardingShardManager extends ShardManager {
1092         private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
1093         private CountDownLatch memberUpReceived = new CountDownLatch(1);
1094         private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
1095         private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
1096         private CountDownLatch memberReachableReceived = new CountDownLatch(1);
1097         private final ActorRef shardActor;
1098         private final String name;
1099
1100         protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration,
1101                 DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, String name,
1102                 ActorRef shardActor, PrimaryShardInfoFutureCache primaryShardInfoCache) {
1103             super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch, primaryShardInfoCache);
1104             this.shardActor = shardActor;
1105             this.name = name;
1106         }
1107
1108         @Override
1109         public void handleCommand(Object message) throws Exception {
1110             try{
1111                 super.handleCommand(message);
1112             } finally {
1113                 if(message instanceof FindPrimary) {
1114                     findPrimaryMessageReceived.countDown();
1115                 } else if(message instanceof ClusterEvent.MemberUp) {
1116                     String role = ((ClusterEvent.MemberUp)message).member().roles().head();
1117                     if(!getCluster().getCurrentMemberName().equals(role)) {
1118                         memberUpReceived.countDown();
1119                     }
1120                 } else if(message instanceof ClusterEvent.MemberRemoved) {
1121                     String role = ((ClusterEvent.MemberRemoved)message).member().roles().head();
1122                     if(!getCluster().getCurrentMemberName().equals(role)) {
1123                         memberRemovedReceived.countDown();
1124                     }
1125                 } else if(message instanceof ClusterEvent.UnreachableMember) {
1126                     String role = ((ClusterEvent.UnreachableMember)message).member().roles().head();
1127                     if(!getCluster().getCurrentMemberName().equals(role)) {
1128                         memberUnreachableReceived.countDown();
1129                     }
1130                 } else if(message instanceof ClusterEvent.ReachableMember) {
1131                     String role = ((ClusterEvent.ReachableMember)message).member().roles().head();
1132                     if(!getCluster().getCurrentMemberName().equals(role)) {
1133                         memberReachableReceived.countDown();
1134                     }
1135                 }
1136             }
1137         }
1138
1139         @Override
1140         public String persistenceId() {
1141             return name;
1142         }
1143
1144         @Override
1145         protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
1146             return shardActor;
1147         }
1148
1149         void waitForMemberUp() {
1150             assertEquals("MemberUp received", true,
1151                     Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
1152             memberUpReceived = new CountDownLatch(1);
1153         }
1154
1155         void waitForMemberRemoved() {
1156             assertEquals("MemberRemoved received", true,
1157                     Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
1158             memberRemovedReceived = new CountDownLatch(1);
1159         }
1160
1161         void waitForUnreachableMember() {
1162             assertEquals("UnreachableMember received", true,
1163                 Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS
1164                 ));
1165             memberUnreachableReceived = new CountDownLatch(1);
1166         }
1167
1168         void waitForReachableMember() {
1169             assertEquals("ReachableMember received", true,
1170                 Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
1171             memberReachableReceived = new CountDownLatch(1);
1172         }
1173
1174         void verifyFindPrimary() {
1175             assertEquals("FindPrimary received", true,
1176                     Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
1177             findPrimaryMessageReceived = new CountDownLatch(1);
1178         }
1179     }
1180 }