Bug 2187: Bootstrap EOS shard when no local shards configured
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardManagerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Mockito.mock;
17 import static org.mockito.Mockito.never;
18 import static org.mockito.Mockito.times;
19 import static org.mockito.Mockito.verify;
20 import akka.actor.ActorRef;
21 import akka.actor.ActorSystem;
22 import akka.actor.AddressFromURIString;
23 import akka.actor.PoisonPill;
24 import akka.actor.Props;
25 import akka.actor.Status;
26 import akka.actor.Status.Failure;
27 import akka.actor.Status.Success;
28 import akka.cluster.Cluster;
29 import akka.cluster.ClusterEvent;
30 import akka.dispatch.Dispatchers;
31 import akka.japi.Creator;
32 import akka.pattern.Patterns;
33 import akka.persistence.RecoveryCompleted;
34 import akka.serialization.Serialization;
35 import akka.testkit.JavaTestKit;
36 import akka.testkit.TestActorRef;
37 import akka.util.Timeout;
38 import com.google.common.base.Function;
39 import com.google.common.base.Optional;
40 import com.google.common.collect.ImmutableMap;
41 import com.google.common.collect.ImmutableSet;
42 import com.google.common.collect.Sets;
43 import com.google.common.util.concurrent.Uninterruptibles;
44 import com.typesafe.config.ConfigFactory;
45 import java.net.URI;
46 import java.util.AbstractMap;
47 import java.util.Arrays;
48 import java.util.Collection;
49 import java.util.Collections;
50 import java.util.HashMap;
51 import java.util.HashSet;
52 import java.util.List;
53 import java.util.Map;
54 import java.util.Map.Entry;
55 import java.util.Set;
56 import java.util.concurrent.CountDownLatch;
57 import java.util.concurrent.TimeUnit;
58 import java.util.concurrent.TimeoutException;
59 import org.junit.After;
60 import org.junit.Before;
61 import org.junit.Test;
62 import org.mockito.Mock;
63 import org.mockito.Mockito;
64 import org.mockito.MockitoAnnotations;
65 import org.opendaylight.controller.cluster.datastore.config.Configuration;
66 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
67 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
68 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
69 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
70 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
71 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
72 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
73 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
74 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
75 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
76 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
77 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
78 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
79 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
80 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
81 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
82 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
83 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
84 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
85 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
86 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
87 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
88 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
89 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
90 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
91 import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
92 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
93 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
94 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
95 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
96 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
97 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
98 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
99 import org.opendaylight.controller.cluster.raft.RaftState;
100 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
101 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
102 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
103 import org.opendaylight.controller.cluster.raft.messages.AddServer;
104 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
105 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
106 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
107 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
108 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
109 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
110 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
111 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
112 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
113 import scala.concurrent.Await;
114 import scala.concurrent.Future;
115 import scala.concurrent.duration.FiniteDuration;
116
117 public class ShardManagerTest extends AbstractActorTest {
118     private static int ID_COUNTER = 1;
119
120     private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
121     private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
122
123     @Mock
124     private static CountDownLatch ready;
125
126     private static TestActorRef<MessageCollectorActor> mockShardActor;
127
128     private static String mockShardName;
129
130     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
131             dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
132                    .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
133
134     private static ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
135         String name = new ShardIdentifier(shardName, memberName,"config").toString();
136         return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
137     }
138
139     private final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
140
141     @Before
142     public void setUp() {
143         MockitoAnnotations.initMocks(this);
144
145         InMemoryJournal.clear();
146         InMemorySnapshotStore.clear();
147
148         if(mockShardActor == null) {
149             mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString();
150             mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), mockShardName);
151         }
152
153         mockShardActor.underlyingActor().clear();
154     }
155
156     @After
157     public void tearDown() {
158         InMemoryJournal.clear();
159         InMemorySnapshotStore.clear();
160     }
161
162     private Props newShardMgrProps() {
163         return newShardMgrProps(new MockConfiguration());
164     }
165
166     private static DatastoreContextFactory newDatastoreContextFactory(DatastoreContext datastoreContext) {
167         DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class);
168         Mockito.doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext();
169         Mockito.doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(Mockito.anyString());
170         return mockFactory;
171     }
172
173     private Props newShardMgrProps(Configuration config) {
174         return TestShardManager.builder(datastoreContextBuilder).configuration(config).props();
175     }
176
177     private Props newPropsShardMgrWithMockShardActor() {
178         return newPropsShardMgrWithMockShardActor("shardManager", mockShardActor, new MockClusterWrapper(),
179                 new MockConfiguration());
180     }
181
182     private Props newPropsShardMgrWithMockShardActor(final String name, final ActorRef shardActor,
183             final ClusterWrapper clusterWrapper, final Configuration config) {
184         Creator<ShardManager> creator = new Creator<ShardManager>() {
185             private static final long serialVersionUID = 1L;
186             @Override
187             public ShardManager create() throws Exception {
188                 return new ForwardingShardManager(ShardManager.builder().cluster(clusterWrapper).configuration(config).
189                         datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build())).
190                         waitTillReadyCountdownLatch(ready).primaryShardInfoCache(primaryShardInfoCache), name, shardActor);
191             }
192         };
193
194         return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
195     }
196
197     private TestShardManager newTestShardManager() {
198         return newTestShardManager(newShardMgrProps());
199     }
200
201     private TestShardManager newTestShardManager(Props props) {
202         TestActorRef<TestShardManager> shardManagerActor = TestActorRef.create(getSystem(), props);
203         TestShardManager shardManager = shardManagerActor.underlyingActor();
204         shardManager.waitForRecoveryComplete();
205         return shardManager;
206     }
207
208     @Test
209     public void testPerShardDatastoreContext() throws Exception {
210         final DatastoreContextFactory mockFactory = newDatastoreContextFactory(
211                 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
212
213         Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
214                 shardElectionTimeoutFactor(6).build()).when(mockFactory).getShardDatastoreContext("default");
215
216         Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
217                 shardElectionTimeoutFactor(7).build()).when(mockFactory).getShardDatastoreContext("topology");
218
219         final MockConfiguration mockConfig = new MockConfiguration() {
220             @Override
221             public Collection<String> getMemberShardNames(String memberName) {
222                 return Arrays.asList("default", "topology");
223             }
224
225             @Override
226             public Collection<String> getMembersFromShardName(String shardName) {
227                 return Arrays.asList("member-1");
228             }
229         };
230
231         final TestActorRef<MessageCollectorActor> defaultShardActor = TestActorRef.create(getSystem(),
232                 Props.create(MessageCollectorActor.class), "default");
233         final TestActorRef<MessageCollectorActor> topologyShardActor = TestActorRef.create(getSystem(),
234                 Props.create(MessageCollectorActor.class), "topology");
235
236         final Map<String, Entry<ActorRef, DatastoreContext>> shardInfoMap = Collections.synchronizedMap(
237                 new HashMap<String, Entry<ActorRef, DatastoreContext>>());
238         shardInfoMap.put("default", new AbstractMap.SimpleEntry<ActorRef, DatastoreContext>(defaultShardActor, null));
239         shardInfoMap.put("topology", new AbstractMap.SimpleEntry<ActorRef, DatastoreContext>(topologyShardActor, null));
240
241         final CountDownLatch newShardActorLatch = new CountDownLatch(2);
242         final Creator<ShardManager> creator = new Creator<ShardManager>() {
243             private static final long serialVersionUID = 1L;
244             @Override
245             public ShardManager create() throws Exception {
246                 return new ShardManager(ShardManager.builder().cluster(new MockClusterWrapper()).configuration(mockConfig).
247                         datastoreContextFactory(mockFactory).waitTillReadyCountdownLatch(ready).
248                         primaryShardInfoCache(primaryShardInfoCache)) {
249                     @Override
250                     protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
251                         Entry<ActorRef, DatastoreContext> entry = shardInfoMap.get(info.getShardName());
252                         ActorRef ref = null;
253                         if(entry != null) {
254                             ref = entry.getKey();
255                             entry.setValue(info.getDatastoreContext());
256                         }
257
258                         newShardActorLatch.countDown();
259                         return ref;
260                     }
261                 };
262             }
263         };
264
265         JavaTestKit kit = new JavaTestKit(getSystem());
266
267         final ActorRef shardManager = getSystem().actorOf(Props.create(new DelegatingShardManagerCreator(creator)).
268                     withDispatcher(Dispatchers.DefaultDispatcherId()));
269
270         shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), kit.getRef());
271
272         assertEquals("Shard actors created", true, newShardActorLatch.await(5, TimeUnit.SECONDS));
273         assertEquals("getShardElectionTimeoutFactor", 6, shardInfoMap.get("default").getValue().
274                 getShardElectionTimeoutFactor());
275         assertEquals("getShardElectionTimeoutFactor", 7, shardInfoMap.get("topology").getValue().
276                 getShardElectionTimeoutFactor());
277
278         DatastoreContextFactory newMockFactory = newDatastoreContextFactory(
279                 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
280         Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
281                 shardElectionTimeoutFactor(66).build()).when(newMockFactory).getShardDatastoreContext("default");
282
283         Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
284                 shardElectionTimeoutFactor(77).build()).when(newMockFactory).getShardDatastoreContext("topology");
285
286         shardManager.tell(newMockFactory, kit.getRef());
287
288         DatastoreContext newContext = MessageCollectorActor.expectFirstMatching(defaultShardActor, DatastoreContext.class);
289         assertEquals("getShardElectionTimeoutFactor", 66, newContext.getShardElectionTimeoutFactor());
290
291         newContext = MessageCollectorActor.expectFirstMatching(topologyShardActor, DatastoreContext.class);
292         assertEquals("getShardElectionTimeoutFactor", 77, newContext.getShardElectionTimeoutFactor());
293
294         defaultShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
295         topologyShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
296     }
297
298     @Test
299     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
300         new JavaTestKit(getSystem()) {{
301             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
302
303             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
304
305             shardManager.tell(new FindPrimary("non-existent", false), getRef());
306
307             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
308         }};
309     }
310
311     @Test
312     public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception {
313         new JavaTestKit(getSystem()) {{
314             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
315
316             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
317
318             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
319             shardManager.tell(new ActorInitialized(), mockShardActor);
320
321             DataTree mockDataTree = mock(DataTree.class);
322             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
323                     DataStoreVersions.CURRENT_VERSION), getRef());
324
325             MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
326             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
327                     RaftState.Leader.name())), mockShardActor);
328
329             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
330
331             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
332             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
333                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
334             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
335         }};
336     }
337
338     @Test
339     public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() throws Exception {
340         new JavaTestKit(getSystem()) {{
341             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
342
343             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
344             shardManager.tell(new ActorInitialized(), mockShardActor);
345
346             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
347             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
348             shardManager.tell(new RoleChangeNotification(memberId1,
349                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
350             shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION), mockShardActor);
351
352             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
353
354             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
355         }};
356     }
357
358     @Test
359     public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception {
360         new JavaTestKit(getSystem()) {{
361             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
362
363             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
364             shardManager.tell(new ActorInitialized(), mockShardActor);
365
366             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
367             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
368
369             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
370             shardManager.tell(new RoleChangeNotification(memberId1,
371                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
372             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
373             shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, Optional.<DataTree>absent(),
374                     leaderVersion), mockShardActor);
375
376             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
377
378             RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
379             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
380                     primaryFound.getPrimaryPath().contains("member-2-shard-default"));
381             assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
382         }};
383     }
384
385     @Test
386     public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception {
387         new JavaTestKit(getSystem()) {{
388             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
389
390             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
391
392             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
393         }};
394     }
395
396     @Test
397     public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception {
398         new JavaTestKit(getSystem()) {{
399             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
400
401             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
402             shardManager.tell(new ActorInitialized(), mockShardActor);
403
404             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
405
406             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
407         }};
408     }
409
410     @Test
411     public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception {
412         new JavaTestKit(getSystem()) {{
413             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
414
415             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
416             shardManager.tell(new ActorInitialized(), mockShardActor);
417
418             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
419             shardManager.tell(new RoleChangeNotification(memberId,
420                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
421
422             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
423
424             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
425
426             DataTree mockDataTree = mock(DataTree.class);
427             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
428                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
429
430             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
431
432             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
433             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
434                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
435             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
436         }};
437     }
438
439     @Test
440     public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
441         new JavaTestKit(getSystem()) {{
442             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
443
444             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
445
446             // We're passing waitUntilInitialized = true to FindPrimary so the response should be
447             // delayed until we send ActorInitialized and RoleChangeNotification.
448             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
449
450             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
451
452             shardManager.tell(new ActorInitialized(), mockShardActor);
453
454             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
455
456             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
457             shardManager.tell(new RoleChangeNotification(memberId,
458                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor);
459
460             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
461
462             DataTree mockDataTree = mock(DataTree.class);
463             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
464                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
465
466             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
467             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
468                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
469             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
470
471             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
472         }};
473     }
474
475     @Test
476     public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception {
477         new JavaTestKit(getSystem()) {{
478             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
479
480             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
481
482             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
483
484             expectMsgClass(duration("2 seconds"), NotInitializedException.class);
485
486             shardManager.tell(new ActorInitialized(), mockShardActor);
487
488             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
489         }};
490     }
491
492     @Test
493     public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception {
494         new JavaTestKit(getSystem()) {{
495             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
496
497             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
498             shardManager.tell(new ActorInitialized(), mockShardActor);
499             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
500                     null, RaftState.Candidate.name()), mockShardActor);
501
502             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
503
504             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
505         }};
506     }
507
508     @Test
509     public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() throws Exception {
510         new JavaTestKit(getSystem()) {{
511             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
512
513             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
514             shardManager.tell(new ActorInitialized(), mockShardActor);
515             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
516                     null, RaftState.IsolatedLeader.name()), mockShardActor);
517
518             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
519
520             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
521         }};
522     }
523
524     @Test
525     public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
526         new JavaTestKit(getSystem()) {{
527             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
528
529             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
530             shardManager.tell(new ActorInitialized(), mockShardActor);
531
532             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
533
534             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
535         }};
536     }
537
538     @Test
539     public void testOnReceiveFindPrimaryForRemoteShard() throws Exception {
540         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
541
542         // Create an ActorSystem ShardManager actor for member-1.
543
544         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
545         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
546
547         ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
548
549         final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
550                 newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
551                         new MockConfiguration()), shardManagerID);
552
553         // Create an ActorSystem ShardManager actor for member-2.
554
555         final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
556
557         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
558
559         final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
560
561         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
562                 put("default", Arrays.asList("member-1", "member-2")).
563                 put("astronauts", Arrays.asList("member-2")).build());
564
565         final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
566                 newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
567                         mockConfig2), shardManagerID);
568
569         new JavaTestKit(system1) {{
570
571             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
572             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
573
574             shardManager2.tell(new ActorInitialized(), mockShardActor2);
575
576             String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
577             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
578             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2,
579                     Optional.of(mock(DataTree.class)), leaderVersion), mockShardActor2);
580             shardManager2.tell(new RoleChangeNotification(memberId2,
581                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
582
583             shardManager1.underlyingActor().waitForMemberUp();
584
585             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
586
587             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
588             String path = found.getPrimaryPath();
589             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
590             assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
591
592             shardManager2.underlyingActor().verifyFindPrimary();
593
594             Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
595
596             shardManager1.underlyingActor().waitForMemberRemoved();
597
598             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
599
600             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
601         }};
602
603         JavaTestKit.shutdownActorSystem(system1);
604         JavaTestKit.shutdownActorSystem(system2);
605     }
606
607     @Test
608     public void testShardAvailabilityOnChangeOfMemberReachability() throws Exception {
609         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
610
611         // Create an ActorSystem ShardManager actor for member-1.
612
613         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
614         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
615
616         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
617
618         final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
619             newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
620                 new MockConfiguration()), shardManagerID);
621
622         // Create an ActorSystem ShardManager actor for member-2.
623
624         final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
625
626         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
627
628         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
629
630         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
631             put("default", Arrays.asList("member-1", "member-2")).build());
632
633         final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
634             newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
635                 mockConfig2), shardManagerID);
636
637         new JavaTestKit(system1) {{
638
639             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
640             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
641             shardManager1.tell(new ActorInitialized(), mockShardActor1);
642             shardManager2.tell(new ActorInitialized(), mockShardActor2);
643
644             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
645             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
646             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
647                 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
648             shardManager1.tell(new RoleChangeNotification(memberId1,
649                 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
650             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)),
651                     DataStoreVersions.CURRENT_VERSION),
652                 mockShardActor2);
653             shardManager2.tell(new RoleChangeNotification(memberId2,
654                 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
655             shardManager1.underlyingActor().waitForMemberUp();
656
657             shardManager1.tell(new FindPrimary("default", true), getRef());
658
659             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
660             String path = found.getPrimaryPath();
661             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
662
663             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
664                 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
665
666             shardManager1.underlyingActor().waitForUnreachableMember();
667
668             PeerDown peerDown = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
669             assertEquals("getMemberName", "member-2", peerDown.getMemberName());
670             MessageCollectorActor.clearMessages(mockShardActor1);
671
672             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
673                     createMemberRemoved("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
674
675             MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
676
677             shardManager1.tell(new FindPrimary("default", true), getRef());
678
679             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
680
681             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
682                 createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
683
684             shardManager1.underlyingActor().waitForReachableMember();
685
686             PeerUp peerUp = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
687             assertEquals("getMemberName", "member-2", peerUp.getMemberName());
688             MessageCollectorActor.clearMessages(mockShardActor1);
689
690             shardManager1.tell(new FindPrimary("default", true), getRef());
691
692             RemotePrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
693             String path1 = found1.getPrimaryPath();
694             assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
695
696             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
697                     createMemberUp("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
698
699             MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
700
701         }};
702
703         JavaTestKit.shutdownActorSystem(system1);
704         JavaTestKit.shutdownActorSystem(system2);
705     }
706
707     @Test
708     public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() throws Exception {
709         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
710
711         // Create an ActorSystem ShardManager actor for member-1.
712
713         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
714         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
715
716         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
717
718         final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
719             newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
720                 new MockConfiguration()), shardManagerID);
721
722         // Create an ActorSystem ShardManager actor for member-2.
723
724         final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
725
726         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
727
728         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
729
730         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
731             put("default", Arrays.asList("member-1", "member-2")).build());
732
733         final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
734             newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
735                 mockConfig2), shardManagerID);
736
737         new JavaTestKit(system1) {{
738
739             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
740             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
741             shardManager1.tell(new ActorInitialized(), mockShardActor1);
742             shardManager2.tell(new ActorInitialized(), mockShardActor2);
743
744             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
745             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
746             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
747                 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
748             shardManager1.tell(new RoleChangeNotification(memberId1,
749                 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
750             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)),
751                     DataStoreVersions.CURRENT_VERSION),
752                 mockShardActor2);
753             shardManager2.tell(new RoleChangeNotification(memberId2,
754                 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
755             shardManager1.underlyingActor().waitForMemberUp();
756
757             shardManager1.tell(new FindPrimary("default", true), getRef());
758
759             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
760             String path = found.getPrimaryPath();
761             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
762
763             primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(system1.actorSelection(
764                     mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION, Optional.<DataTree>absent()));
765
766             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
767                 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
768
769             shardManager1.underlyingActor().waitForUnreachableMember();
770
771             shardManager1.tell(new FindPrimary("default", true), getRef());
772
773             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
774
775             assertNull("Expected primaryShardInfoCache entry removed", primaryShardInfoCache.getIfPresent("default"));
776
777             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, Optional.of(mock(DataTree.class)),
778                     DataStoreVersions.CURRENT_VERSION), mockShardActor1);
779             shardManager1.tell(new RoleChangeNotification(memberId1,
780                 RaftState.Follower.name(), RaftState.Leader.name()), mockShardActor1);
781
782             shardManager1.tell(new FindPrimary("default", true), getRef());
783
784             LocalPrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
785             String path1 = found1.getPrimaryPath();
786             assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
787
788         }};
789
790         JavaTestKit.shutdownActorSystem(system1);
791         JavaTestKit.shutdownActorSystem(system2);
792     }
793
794
795     @Test
796     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
797         new JavaTestKit(getSystem()) {{
798             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
799
800             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
801
802             shardManager.tell(new FindLocalShard("non-existent", false), getRef());
803
804             LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
805
806             assertEquals("getShardName", "non-existent", notFound.getShardName());
807         }};
808     }
809
810     @Test
811     public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
812         new JavaTestKit(getSystem()) {{
813             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
814
815             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
816             shardManager.tell(new ActorInitialized(), mockShardActor);
817
818             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
819
820             LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
821
822             assertTrue("Found path contains " + found.getPath().path().toString(),
823                     found.getPath().path().toString().contains("member-1-shard-default-config"));
824         }};
825     }
826
827     @Test
828     public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
829         new JavaTestKit(getSystem()) {{
830             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
831
832             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
833
834             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
835         }};
836     }
837
838     @Test
839     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
840         new JavaTestKit(getSystem()) {{
841             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
842
843             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
844
845             // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
846             // delayed until we send ActorInitialized.
847             Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
848                     new Timeout(5, TimeUnit.SECONDS));
849
850             shardManager.tell(new ActorInitialized(), mockShardActor);
851
852             Object resp = Await.result(future, duration("5 seconds"));
853             assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
854         }};
855     }
856
857     @Test
858     public void testOnRecoveryJournalIsCleaned() {
859         String persistenceID = "shard-manager-" + shardMrgIDSuffix;
860         InMemoryJournal.addEntry(persistenceID, 1L, new ShardManager.SchemaContextModules(
861                 ImmutableSet.of("foo")));
862         InMemoryJournal.addEntry(persistenceID, 2L, new ShardManager.SchemaContextModules(
863                 ImmutableSet.of("bar")));
864         InMemoryJournal.addDeleteMessagesCompleteLatch(persistenceID);
865
866         TestShardManager shardManager = newTestShardManager();
867
868         InMemoryJournal.waitForDeleteMessagesComplete(persistenceID);
869
870         // Journal entries up to the last one should've been deleted
871         Map<Long, Object> journal = InMemoryJournal.get(persistenceID);
872         synchronized (journal) {
873             assertEquals("Journal size", 0, journal.size());
874         }
875     }
876
877     @Test
878     public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
879         TestShardManager shardManager = newTestShardManager();
880
881         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
882         shardManager.onReceiveCommand(new RoleChangeNotification(
883                 memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
884
885         verify(ready, never()).countDown();
886
887         shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
888                 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION));
889
890         verify(ready, times(1)).countDown();
891     }
892
893     @Test
894     public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
895         new JavaTestKit(getSystem()) {{
896             TestShardManager shardManager = newTestShardManager();
897
898             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
899             shardManager.onReceiveCommand(new RoleChangeNotification(
900                     memberId, null, RaftState.Follower.name()));
901
902             verify(ready, never()).countDown();
903
904             shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
905
906             shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId,
907                     "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
908                     DataStoreVersions.CURRENT_VERSION));
909
910             verify(ready, times(1)).countDown();
911         }};
912     }
913
914     @Test
915     public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
916         new JavaTestKit(getSystem()) {{
917             TestShardManager shardManager = newTestShardManager();
918
919             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
920             shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
921
922             verify(ready, never()).countDown();
923
924             shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId,
925                     "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
926                     DataStoreVersions.CURRENT_VERSION));
927
928             shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
929
930             verify(ready, times(1)).countDown();
931         }};
932     }
933
934     @Test
935     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
936         TestShardManager shardManager = newTestShardManager();
937
938         shardManager.onReceiveCommand(new RoleChangeNotification(
939                 "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
940
941         verify(ready, never()).countDown();
942     }
943
944     @Test
945     public void testByDefaultSyncStatusIsFalse() throws Exception{
946         TestShardManager shardManager = newTestShardManager();
947
948         assertEquals(false, shardManager.getMBean().getSyncStatus());
949     }
950
951     @Test
952     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
953         TestShardManager shardManager = newTestShardManager();
954
955         shardManager.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
956                 RaftState.Follower.name(), RaftState.Leader.name()));
957
958         assertEquals(true, shardManager.getMBean().getSyncStatus());
959     }
960
961     @Test
962     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
963         TestShardManager shardManager = newTestShardManager();
964
965         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
966         shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
967                 RaftState.Follower.name(), RaftState.Candidate.name()));
968
969         assertEquals(false, shardManager.getMBean().getSyncStatus());
970
971         // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
972         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(
973                 true, shardId));
974
975         assertEquals(false, shardManager.getMBean().getSyncStatus());
976     }
977
978     @Test
979     public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
980         TestShardManager shardManager = newTestShardManager();
981
982         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
983         shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
984                 RaftState.Candidate.name(), RaftState.Follower.name()));
985
986         // Initially will be false
987         assertEquals(false, shardManager.getMBean().getSyncStatus());
988
989         // Send status true will make sync status true
990         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, shardId));
991
992         assertEquals(true, shardManager.getMBean().getSyncStatus());
993
994         // Send status false will make sync status false
995         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(false, shardId));
996
997         assertEquals(false, shardManager.getMBean().getSyncStatus());
998
999     }
1000
1001     @Test
1002     public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
1003         TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
1004             @Override
1005             public List<String> getMemberShardNames(String memberName) {
1006                 return Arrays.asList("default", "astronauts");
1007             }
1008         }));
1009
1010         // Initially will be false
1011         assertEquals(false, shardManager.getMBean().getSyncStatus());
1012
1013         // Make default shard leader
1014         String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix;
1015         shardManager.onReceiveCommand(new RoleChangeNotification(defaultShardId,
1016                 RaftState.Follower.name(), RaftState.Leader.name()));
1017
1018         // default = Leader, astronauts is unknown so sync status remains false
1019         assertEquals(false, shardManager.getMBean().getSyncStatus());
1020
1021         // Make astronauts shard leader as well
1022         String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1023         shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
1024                 RaftState.Follower.name(), RaftState.Leader.name()));
1025
1026         // Now sync status should be true
1027         assertEquals(true, shardManager.getMBean().getSyncStatus());
1028
1029         // Make astronauts a Follower
1030         shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
1031                 RaftState.Leader.name(), RaftState.Follower.name()));
1032
1033         // Sync status is not true
1034         assertEquals(false, shardManager.getMBean().getSyncStatus());
1035
1036         // Make the astronauts follower sync status true
1037         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
1038
1039         // Sync status is now true
1040         assertEquals(true, shardManager.getMBean().getSyncStatus());
1041
1042     }
1043
1044     @Test
1045     public void testOnReceiveSwitchShardBehavior() throws Exception {
1046         new JavaTestKit(getSystem()) {{
1047             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
1048
1049             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1050             shardManager.tell(new ActorInitialized(), mockShardActor);
1051
1052             shardManager.tell(new SwitchShardBehavior(mockShardName, "Leader", 1000), getRef());
1053
1054             SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor, SwitchBehavior.class);
1055
1056             assertEquals(RaftState.Leader, switchBehavior.getNewState());
1057             assertEquals(1000, switchBehavior.getNewTerm());
1058         }};
1059     }
1060
1061     @Test
1062     public void testOnCreateShard() {
1063         new JavaTestKit(getSystem()) {{
1064             datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1065
1066             ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
1067                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1068
1069             SchemaContext schemaContext = TestModel.createTestContext();
1070             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1071
1072             DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100).
1073                     persistent(false).build();
1074             Shard.Builder shardBuilder = Shard.builder();
1075
1076             ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1077                     "foo", null, Arrays.asList("member-1", "member-5", "member-6"));
1078             shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), getRef());
1079
1080             expectMsgClass(duration("5 seconds"), Success.class);
1081
1082             shardManager.tell(new FindLocalShard("foo", true), getRef());
1083
1084             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1085
1086             assertEquals("isRecoveryApplicable", false, shardBuilder.getDatastoreContext().isPersistent());
1087             assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig().
1088                     getPeerAddressResolver() instanceof ShardPeerAddressResolver);
1089             assertEquals("peerMembers", Sets.newHashSet(new ShardIdentifier("foo", "member-5", shardMrgIDSuffix).toString(),
1090                     new ShardIdentifier("foo", "member-6", shardMrgIDSuffix).toString()),
1091                     shardBuilder.getPeerAddresses().keySet());
1092             assertEquals("ShardIdentifier", new ShardIdentifier("foo", "member-1", shardMrgIDSuffix),
1093                     shardBuilder.getId());
1094             assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1095
1096             // Send CreateShard with same name - should return Success with a message.
1097
1098             shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1099
1100             Success success = expectMsgClass(duration("5 seconds"), Success.class);
1101             assertNotNull("Success status is null", success.status());
1102         }};
1103     }
1104
1105     @Test
1106     public void testOnCreateShardWithLocalMemberNotInShardConfig() {
1107         new JavaTestKit(getSystem()) {{
1108             datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1109
1110             ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
1111                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1112
1113             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1114
1115             Shard.Builder shardBuilder = Shard.builder();
1116             ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1117                     "foo", null, Arrays.asList("member-5", "member-6"));
1118
1119             shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1120             expectMsgClass(duration("5 seconds"), Success.class);
1121
1122             shardManager.tell(new FindLocalShard("foo", true), getRef());
1123             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1124
1125             assertEquals("peerMembers size", 0, shardBuilder.getPeerAddresses().size());
1126             assertEquals("schemaContext", DisableElectionsRaftPolicy.class.getName(),
1127                     shardBuilder.getDatastoreContext().getShardRaftConfig().getCustomRaftPolicyImplementationClass());
1128         }};
1129     }
1130
1131     @Test
1132     public void testOnCreateShardWithNoInitialSchemaContext() {
1133         new JavaTestKit(getSystem()) {{
1134             ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
1135                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1136
1137             Shard.Builder shardBuilder = Shard.builder();
1138
1139             ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1140                     "foo", null, Arrays.asList("member-1"));
1141             shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1142
1143             expectMsgClass(duration("5 seconds"), Success.class);
1144
1145             SchemaContext schemaContext = TestModel.createTestContext();
1146             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1147
1148             shardManager.tell(new FindLocalShard("foo", true), getRef());
1149
1150             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1151
1152             assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1153             assertNotNull("schemaContext is null", shardBuilder.getDatastoreContext());
1154         }};
1155     }
1156
1157     @Test
1158     public void testGetSnapshot() throws Throwable {
1159         JavaTestKit kit = new JavaTestKit(getSystem());
1160
1161         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1162                    put("shard1", Arrays.asList("member-1")).
1163                    put("shard2", Arrays.asList("member-1")).build());
1164
1165         ActorRef shardManager = getSystem().actorOf(newShardMgrProps(mockConfig).withDispatcher(
1166                 Dispatchers.DefaultDispatcherId()));
1167
1168         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1169         Failure failure = kit.expectMsgClass(Failure.class);
1170         assertEquals("Failure cause type", IllegalStateException.class, failure.cause().getClass());
1171
1172         kit = new JavaTestKit(getSystem());
1173
1174         shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1175
1176         shardManager.tell(new FindLocalShard("shard1", true), kit.getRef());
1177         kit.expectMsgClass(LocalShardFound.class);
1178         shardManager.tell(new FindLocalShard("shard2", true), kit.getRef());
1179         kit.expectMsgClass(LocalShardFound.class);
1180
1181         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1182
1183         DatastoreSnapshot datastoreSnapshot = kit.expectMsgClass(DatastoreSnapshot.class);
1184
1185         assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1186         List<ShardSnapshot> shardSnapshots = datastoreSnapshot.getShardSnapshots();
1187         Set<String> actualShardNames = new HashSet<>();
1188         for(ShardSnapshot s: shardSnapshots) {
1189             actualShardNames.add(s.getName());
1190         }
1191
1192         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), actualShardNames);
1193
1194         shardManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
1195     }
1196
1197     @Test
1198     public void testAddShardReplicaForNonExistentShardConfig() throws Exception {
1199         new JavaTestKit(getSystem()) {{
1200             ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
1201                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1202
1203             shardManager.tell(new AddShardReplica("model-inventory"), getRef());
1204             Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class);
1205
1206             assertEquals("Failure obtained", true,
1207                           (resp.cause() instanceof IllegalArgumentException));
1208         }};
1209     }
1210
1211     @Test
1212     public void testAddShardReplica() throws Exception {
1213         MockConfiguration mockConfig =
1214                 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1215                    put("default", Arrays.asList("member-1", "member-2")).
1216                    put("astronauts", Arrays.asList("member-2")).build());
1217
1218         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1219
1220         // Create an ActorSystem ShardManager actor for member-1.
1221         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
1222         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1223         ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1224         final TestActorRef<ForwardingShardManager> newReplicaShardManager = TestActorRef.create(system1,
1225                 newPropsShardMgrWithMockShardActor("shardManager1", mockDefaultShardActor,
1226                    new ClusterWrapperImpl(system1), mockConfig), shardManagerID);
1227
1228         // Create an ActorSystem ShardManager actor for member-2.
1229         final ActorSystem system2 = ActorSystem.create("cluster-test",
1230             ConfigFactory.load().getConfig("Member2"));
1231         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1232
1233         String name = new ShardIdentifier("astronauts", "member-2", "config").toString();
1234         final TestActorRef<MockRespondActor> mockShardLeaderActor =
1235             TestActorRef.create(system2, Props.create(MockRespondActor.class), name);
1236         final TestActorRef<ForwardingShardManager> leaderShardManager = TestActorRef.create(system2,
1237                 newPropsShardMgrWithMockShardActor("shardManager2", mockShardLeaderActor,
1238                         new ClusterWrapperImpl(system2), mockConfig), shardManagerID);
1239
1240         new JavaTestKit(system1) {{
1241
1242             newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1243             leaderShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1244
1245             leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1246
1247             String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
1248             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1249             leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2,
1250                     Optional.of(mock(DataTree.class)), leaderVersion), mockShardLeaderActor);
1251             leaderShardManager.tell(new RoleChangeNotification(memberId2,
1252                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardLeaderActor);
1253
1254             newReplicaShardManager.underlyingActor().waitForMemberUp();
1255             leaderShardManager.underlyingActor().waitForMemberUp();
1256
1257             //construct a mock response message
1258             AddServerReply response = new AddServerReply(ServerChangeStatus.OK, memberId2);
1259             mockShardLeaderActor.underlyingActor().updateResponse(response);
1260             newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
1261             AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1262                 AddServer.class);
1263             String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1264             assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
1265             newReplicaShardManager.underlyingActor()
1266                 .verifySnapshotPersisted(Sets.newHashSet("default", "astronauts"));
1267             expectMsgClass(duration("5 seconds"), Status.Success.class);
1268         }};
1269
1270         JavaTestKit.shutdownActorSystem(system1);
1271         JavaTestKit.shutdownActorSystem(system2);
1272     }
1273
1274     @Test
1275     public void testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader() throws Exception {
1276         new JavaTestKit(getSystem()) {{
1277             TestActorRef<ForwardingShardManager> shardManager = TestActorRef.create(getSystem(),
1278                     newPropsShardMgrWithMockShardActor(), shardMgrID);
1279
1280             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1281             shardManager.tell(new ActorInitialized(), mockShardActor);
1282
1283             String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix;
1284             AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null);
1285             ActorRef leaderShardActor = shardManager.underlyingActor().getContext().actorOf(
1286                     Props.create(MockRespondActor.class, addServerReply), leaderId);
1287
1288             MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString());
1289
1290             String newReplicaId = "member-1-shard-default-" + shardMrgIDSuffix;
1291             shardManager.tell(new RoleChangeNotification(newReplicaId,
1292                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
1293             shardManager.tell(new ShardLeaderStateChanged(newReplicaId, leaderId, Optional.<DataTree>absent(),
1294                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
1295
1296             shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1297
1298             MessageCollectorActor.expectFirstMatching(leaderShardActor, AddServer.class);
1299
1300             Failure resp = expectMsgClass(duration("5 seconds"), Failure.class);
1301             assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1302
1303             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1304             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1305
1306             // Send message again to verify previous in progress state is cleared
1307
1308             shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1309             resp = expectMsgClass(duration("5 seconds"), Failure.class);
1310             assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1311
1312             // Send message again with an AddServer timeout to verify the pre-existing shard actor isn't terminated.
1313
1314             shardManager.tell(newDatastoreContextFactory(datastoreContextBuilder.
1315                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build()), getRef());
1316             leaderShardActor.tell(MockRespondActor.CLEAR_RESPONSE, ActorRef.noSender());
1317             shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1318             expectMsgClass(duration("5 seconds"), Failure.class);
1319
1320             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1321             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1322
1323             leaderShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
1324         }};
1325     }
1326
1327     @Test
1328     public void testAddShardReplicaWithPreExistingLocalReplicaLeader() throws Exception {
1329         new JavaTestKit(getSystem()) {{
1330             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1331             ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
1332
1333             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1334             shardManager.tell(new ActorInitialized(), mockShardActor);
1335             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mock(DataTree.class)),
1336                     DataStoreVersions.CURRENT_VERSION), getRef());
1337             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
1338                     RaftState.Leader.name())), mockShardActor);
1339
1340             shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1341             Failure resp = expectMsgClass(duration("5 seconds"), Failure.class);
1342             assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1343
1344             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1345             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1346         }};
1347     }
1348
1349     @Test
1350     public void testAddShardReplicaWithAddServerReplyFailure() throws Exception {
1351         new JavaTestKit(getSystem()) {{
1352             JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
1353
1354             MockConfiguration mockConfig =
1355                     new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1356                        put("astronauts", Arrays.asList("member-2")).build());
1357
1358             ActorRef mockNewReplicaShardActor = newMockShardActor(getSystem(), "astronauts", "member-1");
1359             TestActorRef<ForwardingShardManager> shardManager = TestActorRef.create(getSystem(),
1360                     newPropsShardMgrWithMockShardActor("newReplicaShardManager", mockNewReplicaShardActor,
1361                             new MockClusterWrapper(), mockConfig), shardMgrID);
1362             shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1363
1364             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1365
1366             JavaTestKit terminateWatcher = new JavaTestKit(getSystem());
1367             terminateWatcher.watch(mockNewReplicaShardActor);
1368
1369             shardManager.tell(new AddShardReplica("astronauts"), getRef());
1370
1371             AddServer addServerMsg = mockShardLeaderKit.expectMsgClass(AddServer.class);
1372             assertEquals("AddServer serverId", "member-1-shard-astronauts-" + shardMrgIDSuffix,
1373                     addServerMsg.getNewServerId());
1374             mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.TIMEOUT, null));
1375
1376             Failure failure = expectMsgClass(duration("5 seconds"), Failure.class);
1377             assertEquals("Failure cause", TimeoutException.class, failure.cause().getClass());
1378
1379             shardManager.tell(new FindLocalShard("astronauts", false), getRef());
1380             expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
1381
1382             terminateWatcher.expectTerminated(mockNewReplicaShardActor);
1383
1384             shardManager.tell(new AddShardReplica("astronauts"), getRef());
1385             mockShardLeaderKit.expectMsgClass(AddServer.class);
1386             mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.NO_LEADER, null));
1387             failure = expectMsgClass(duration("5 seconds"), Failure.class);
1388             assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass());
1389         }};
1390     }
1391
1392     @Test
1393     public void testAddShardReplicaWithAlreadyInProgress() throws Exception {
1394         new JavaTestKit(getSystem()) {{
1395             JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
1396             JavaTestKit secondRequestKit = new JavaTestKit(getSystem());
1397
1398             MockConfiguration mockConfig =
1399                     new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1400                        put("astronauts", Arrays.asList("member-2")).build());
1401
1402             TestActorRef<ForwardingShardManager> shardManager = TestActorRef.create(getSystem(),
1403                     newPropsShardMgrWithMockShardActor("newReplicaShardManager", mockShardActor,
1404                             new MockClusterWrapper(), mockConfig), shardMgrID);
1405             shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1406
1407             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1408
1409             shardManager.tell(new AddShardReplica("astronauts"), getRef());
1410
1411             mockShardLeaderKit.expectMsgClass(AddServer.class);
1412
1413             shardManager.tell(new AddShardReplica("astronauts"), secondRequestKit.getRef());
1414
1415             secondRequestKit.expectMsgClass(duration("5 seconds"), Failure.class);
1416         }};
1417     }
1418
1419     @Test
1420     public void testAddShardReplicaWithFindPrimaryTimeout() throws Exception {
1421         new JavaTestKit(getSystem()) {{
1422             MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1423                        put("astronauts", Arrays.asList("member-2")).build());
1424
1425             ActorRef newReplicaShardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(
1426                     "shardManager", mockShardActor, new MockClusterWrapper(), mockConfig));
1427
1428             newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1429             MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2", getRef().path().toString());
1430
1431             newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
1432             Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class);
1433             assertEquals("Failure obtained", true,
1434                           (resp.cause() instanceof RuntimeException));
1435         }};
1436     }
1437
1438     @Test
1439     public void testRemoveShardReplicaForNonExistentShard() throws Exception {
1440         new JavaTestKit(getSystem()) {{
1441             ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
1442                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1443
1444             shardManager.tell(new RemoveShardReplica("model-inventory"), getRef());
1445             Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class);
1446             assertEquals("Failure obtained", true,
1447                          (resp.cause() instanceof IllegalArgumentException));
1448         }};
1449
1450     }
1451
1452     @Test
1453     public void testShardPersistenceWithRestoredData() throws Exception {
1454         new JavaTestKit(getSystem()) {{
1455             MockConfiguration mockConfig =
1456                 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1457                    put("default", Arrays.asList("member-1", "member-2")).
1458                    put("astronauts", Arrays.asList("member-2")).
1459                    put("people", Arrays.asList("member-1", "member-2")).build());
1460             String[] restoredShards = {"default", "astronauts"};
1461             ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards));
1462             InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot);
1463
1464             //create shardManager to come up with restored data
1465             TestActorRef<TestShardManager> newRestoredShardManager = TestActorRef.create(getSystem(),
1466                     newShardMgrProps(mockConfig));
1467
1468             newRestoredShardManager.underlyingActor().waitForRecoveryComplete();
1469
1470             newRestoredShardManager.tell(new FindLocalShard("people", false), getRef());
1471             LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
1472             assertEquals("for uninitialized shard", "people", notFound.getShardName());
1473
1474             //Verify a local shard is created for the restored shards,
1475             //although we expect a NotInitializedException for the shards as the actor initialization
1476             //message is not sent for them
1477             newRestoredShardManager.tell(new FindLocalShard("default", false), getRef());
1478             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1479
1480             newRestoredShardManager.tell(new FindLocalShard("astronauts", false), getRef());
1481             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1482         }};
1483     }
1484
1485
1486     private static class TestShardManager extends ShardManager {
1487         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
1488
1489         private TestShardManager(Builder builder) {
1490             super(builder);
1491         }
1492
1493         @Override
1494         public void handleRecover(Object message) throws Exception {
1495             try {
1496                 super.handleRecover(message);
1497             } finally {
1498                 if(message instanceof RecoveryCompleted) {
1499                     recoveryComplete.countDown();
1500                 }
1501             }
1502         }
1503
1504         void waitForRecoveryComplete() {
1505             assertEquals("Recovery complete", true,
1506                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
1507         }
1508
1509         public static Builder builder(DatastoreContext.Builder datastoreContextBuilder) {
1510             return new Builder(datastoreContextBuilder);
1511         }
1512
1513         private static class Builder extends ShardManager.Builder {
1514             Builder(DatastoreContext.Builder datastoreContextBuilder) {
1515                 cluster(new MockClusterWrapper()).configuration(new MockConfiguration());
1516                 datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build()));
1517                 waitTillReadyCountdownLatch(ready).primaryShardInfoCache(new PrimaryShardInfoFutureCache());
1518             }
1519
1520             @Override
1521             public Props props() {
1522                 verify();
1523                 return Props.create(TestShardManager.class, this);
1524             }
1525         }
1526     }
1527
1528     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
1529         private static final long serialVersionUID = 1L;
1530         private final Creator<ShardManager> delegate;
1531
1532         public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
1533             this.delegate = delegate;
1534         }
1535
1536         @Override
1537         public ShardManager create() throws Exception {
1538             return delegate.create();
1539         }
1540     }
1541
1542     interface MessageInterceptor extends Function<Object, Object> {
1543         boolean canIntercept(Object message);
1544     }
1545
1546     private MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) {
1547         return new MessageInterceptor(){
1548             @Override
1549             public Object apply(Object message) {
1550                 return new RemotePrimaryShardFound(Serialization.serializedActorPath(primaryActor), (short) 1);
1551             }
1552
1553             @Override
1554             public boolean canIntercept(Object message) {
1555                 return message instanceof FindPrimary;
1556             }
1557         };
1558     }
1559
1560     private static class ForwardingShardManager extends ShardManager {
1561         private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
1562         private CountDownLatch memberUpReceived = new CountDownLatch(1);
1563         private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
1564         private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
1565         private CountDownLatch memberReachableReceived = new CountDownLatch(1);
1566         private final ActorRef shardActor;
1567         private final String name;
1568         private final CountDownLatch snapshotPersist = new CountDownLatch(1);
1569         private ShardManagerSnapshot snapshot;
1570         private volatile MessageInterceptor messageInterceptor;
1571
1572         public ForwardingShardManager(Builder builder, String name, ActorRef shardActor) {
1573             super(builder);
1574             this.shardActor = shardActor;
1575             this.name = name;
1576         }
1577
1578         void setMessageInterceptor(MessageInterceptor messageInterceptor) {
1579             this.messageInterceptor = messageInterceptor;
1580         }
1581
1582
1583         @Override
1584         public void handleCommand(Object message) throws Exception {
1585             try{
1586                 if(messageInterceptor != null && messageInterceptor.canIntercept(message)) {
1587                     getSender().tell(messageInterceptor.apply(message), getSelf());
1588                 } else {
1589                     super.handleCommand(message);
1590                 }
1591             } finally {
1592                 if(message instanceof FindPrimary) {
1593                     findPrimaryMessageReceived.countDown();
1594                 } else if(message instanceof ClusterEvent.MemberUp) {
1595                     String role = ((ClusterEvent.MemberUp)message).member().roles().head();
1596                     if(!getCluster().getCurrentMemberName().equals(role)) {
1597                         memberUpReceived.countDown();
1598                     }
1599                 } else if(message instanceof ClusterEvent.MemberRemoved) {
1600                     String role = ((ClusterEvent.MemberRemoved)message).member().roles().head();
1601                     if(!getCluster().getCurrentMemberName().equals(role)) {
1602                         memberRemovedReceived.countDown();
1603                     }
1604                 } else if(message instanceof ClusterEvent.UnreachableMember) {
1605                     String role = ((ClusterEvent.UnreachableMember)message).member().roles().head();
1606                     if(!getCluster().getCurrentMemberName().equals(role)) {
1607                         memberUnreachableReceived.countDown();
1608                     }
1609                 } else if(message instanceof ClusterEvent.ReachableMember) {
1610                     String role = ((ClusterEvent.ReachableMember)message).member().roles().head();
1611                     if(!getCluster().getCurrentMemberName().equals(role)) {
1612                         memberReachableReceived.countDown();
1613                     }
1614                 }
1615             }
1616         }
1617
1618         @Override
1619         public String persistenceId() {
1620             return name;
1621         }
1622
1623         @Override
1624         protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
1625             return shardActor;
1626         }
1627
1628         void waitForMemberUp() {
1629             assertEquals("MemberUp received", true,
1630                     Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
1631             memberUpReceived = new CountDownLatch(1);
1632         }
1633
1634         void waitForMemberRemoved() {
1635             assertEquals("MemberRemoved received", true,
1636                     Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
1637             memberRemovedReceived = new CountDownLatch(1);
1638         }
1639
1640         void waitForUnreachableMember() {
1641             assertEquals("UnreachableMember received", true,
1642                 Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS
1643                 ));
1644             memberUnreachableReceived = new CountDownLatch(1);
1645         }
1646
1647         void waitForReachableMember() {
1648             assertEquals("ReachableMember received", true,
1649                 Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
1650             memberReachableReceived = new CountDownLatch(1);
1651         }
1652
1653         void verifyFindPrimary() {
1654             assertEquals("FindPrimary received", true,
1655                     Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
1656             findPrimaryMessageReceived = new CountDownLatch(1);
1657         }
1658
1659         @Override
1660         public void saveSnapshot(Object obj) {
1661             snapshot = (ShardManagerSnapshot) obj;
1662             snapshotPersist.countDown();
1663         }
1664
1665         void verifySnapshotPersisted(Set<String> shardList) {
1666             assertEquals("saveSnapshot invoked", true,
1667                 Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
1668             assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
1669         }
1670     }
1671
1672     private static class MockRespondActor extends MessageCollectorActor {
1673         static final String CLEAR_RESPONSE = "clear-response";
1674
1675         private volatile Object responseMsg;
1676
1677         @SuppressWarnings("unused")
1678         public MockRespondActor() {
1679         }
1680
1681         @SuppressWarnings("unused")
1682         public MockRespondActor(Object responseMsg) {
1683             this.responseMsg = responseMsg;
1684         }
1685
1686         public void updateResponse(Object response) {
1687             responseMsg = response;
1688         }
1689
1690         @Override
1691         public void onReceive(Object message) throws Exception {
1692             super.onReceive(message);
1693             if (message instanceof AddServer) {
1694                 if (responseMsg != null) {
1695                     getSender().tell(responseMsg, getSelf());
1696                 }
1697             } if(message.equals(CLEAR_RESPONSE)) {
1698                 responseMsg = null;
1699             }
1700         }
1701     }
1702 }