BUG 2187 - Persisting shard list in ShardManager
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardManagerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Mockito.mock;
17 import static org.mockito.Mockito.never;
18 import static org.mockito.Mockito.times;
19 import static org.mockito.Mockito.verify;
20 import akka.actor.ActorRef;
21 import akka.actor.ActorSystem;
22 import akka.actor.AddressFromURIString;
23 import akka.actor.PoisonPill;
24 import akka.actor.Props;
25 import akka.actor.Status;
26 import akka.actor.Status.Failure;
27 import akka.cluster.Cluster;
28 import akka.cluster.ClusterEvent;
29 import akka.dispatch.Dispatchers;
30 import akka.japi.Creator;
31 import akka.pattern.Patterns;
32 import akka.persistence.RecoveryCompleted;
33 import akka.testkit.JavaTestKit;
34 import akka.testkit.TestActorRef;
35 import akka.util.Timeout;
36 import com.google.common.base.Optional;
37 import com.google.common.collect.ImmutableMap;
38 import com.google.common.collect.ImmutableSet;
39 import com.google.common.collect.Sets;
40 import com.google.common.util.concurrent.Uninterruptibles;
41 import com.typesafe.config.ConfigFactory;
42 import java.net.URI;
43 import java.util.AbstractMap;
44 import java.util.Arrays;
45 import java.util.Collection;
46 import java.util.Collections;
47 import java.util.HashMap;
48 import java.util.HashSet;
49 import java.util.List;
50 import java.util.Map;
51 import java.util.Map.Entry;
52 import java.util.Set;
53 import java.util.concurrent.CountDownLatch;
54 import java.util.concurrent.TimeUnit;
55 import org.junit.After;
56 import org.junit.Before;
57 import org.junit.Test;
58 import org.mockito.Mock;
59 import org.mockito.Mockito;
60 import org.mockito.MockitoAnnotations;
61 import org.opendaylight.controller.cluster.datastore.config.Configuration;
62 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
63 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
64 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
65 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
66 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
67 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
68 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
69 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
70 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
71 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
72 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
73 import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply;
74 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
75 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
76 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
77 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
78 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
79 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
80 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
81 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
82 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
83 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
84 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
85 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
86 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
87 import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
88 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
89 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
90 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
91 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
92 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
93 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
94 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
95 import org.opendaylight.controller.cluster.raft.RaftState;
96 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
97 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
98 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
99 import org.opendaylight.controller.cluster.raft.messages.AddServer;
100 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
101 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
102 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
103 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
104 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
105 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
106 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
107 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
108 import scala.concurrent.Await;
109 import scala.concurrent.Future;
110 import scala.concurrent.duration.FiniteDuration;
111
112 public class ShardManagerTest extends AbstractActorTest {
113     private static int ID_COUNTER = 1;
114
115     private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
116     private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix;
117
118     @Mock
119     private static CountDownLatch ready;
120
121     private static TestActorRef<MessageCollectorActor> mockShardActor;
122
123     private static String mockShardName;
124
125     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
126             dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
127                    .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
128
129     private static ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
130         String name = new ShardIdentifier(shardName, memberName,"config").toString();
131         return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
132     }
133
134     private final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
135
136     @Before
137     public void setUp() {
138         MockitoAnnotations.initMocks(this);
139
140         InMemoryJournal.clear();
141         InMemorySnapshotStore.clear();
142
143         if(mockShardActor == null) {
144             mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString();
145             mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), mockShardName);
146         }
147
148         mockShardActor.underlyingActor().clear();
149     }
150
151     @After
152     public void tearDown() {
153         InMemoryJournal.clear();
154         InMemorySnapshotStore.clear();
155     }
156
157     private Props newShardMgrProps() {
158         return newShardMgrProps(new MockConfiguration());
159     }
160
161     private static DatastoreContextFactory newDatastoreContextFactory(DatastoreContext datastoreContext) {
162         DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class);
163         Mockito.doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext();
164         Mockito.doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(Mockito.anyString());
165         return mockFactory;
166     }
167
168     private Props newShardMgrProps(Configuration config) {
169         return TestShardManager.builder(datastoreContextBuilder).configuration(config).props();
170     }
171
172     private Props newPropsShardMgrWithMockShardActor() {
173         return newPropsShardMgrWithMockShardActor("shardManager", mockShardActor, new MockClusterWrapper(),
174                 new MockConfiguration());
175     }
176
177     private Props newPropsShardMgrWithMockShardActor(final String name, final ActorRef shardActor,
178             final ClusterWrapper clusterWrapper, final Configuration config) {
179         Creator<ShardManager> creator = new Creator<ShardManager>() {
180             private static final long serialVersionUID = 1L;
181             @Override
182             public ShardManager create() throws Exception {
183                 return new ForwardingShardManager(ShardManager.builder().cluster(clusterWrapper).configuration(config).
184                         datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build())).
185                         waitTillReadyCountdownLatch(ready).primaryShardInfoCache(primaryShardInfoCache), name, shardActor);
186             }
187         };
188
189         return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
190     }
191
192     private TestShardManager newTestShardManager() {
193         return newTestShardManager(newShardMgrProps());
194     }
195
196     private TestShardManager newTestShardManager(Props props) {
197         TestActorRef<TestShardManager> shardManagerActor = TestActorRef.create(getSystem(), props);
198         TestShardManager shardManager = shardManagerActor.underlyingActor();
199         shardManager.waitForRecoveryComplete();
200         return shardManager;
201     }
202
203     @Test
204     public void testPerShardDatastoreContext() throws Exception {
205         final DatastoreContextFactory mockFactory = newDatastoreContextFactory(
206                 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
207
208         Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
209                 shardElectionTimeoutFactor(6).build()).when(mockFactory).getShardDatastoreContext("default");
210
211         Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
212                 shardElectionTimeoutFactor(7).build()).when(mockFactory).getShardDatastoreContext("topology");
213
214         final MockConfiguration mockConfig = new MockConfiguration() {
215             @Override
216             public Collection<String> getMemberShardNames(String memberName) {
217                 return Arrays.asList("default", "topology");
218             }
219
220             @Override
221             public Collection<String> getMembersFromShardName(String shardName) {
222                 return Arrays.asList("member-1");
223             }
224         };
225
226         final TestActorRef<MessageCollectorActor> defaultShardActor = TestActorRef.create(getSystem(),
227                 Props.create(MessageCollectorActor.class), "default");
228         final TestActorRef<MessageCollectorActor> topologyShardActor = TestActorRef.create(getSystem(),
229                 Props.create(MessageCollectorActor.class), "topology");
230
231         final Map<String, Entry<ActorRef, DatastoreContext>> shardInfoMap = Collections.synchronizedMap(
232                 new HashMap<String, Entry<ActorRef, DatastoreContext>>());
233         shardInfoMap.put("default", new AbstractMap.SimpleEntry<ActorRef, DatastoreContext>(defaultShardActor, null));
234         shardInfoMap.put("topology", new AbstractMap.SimpleEntry<ActorRef, DatastoreContext>(topologyShardActor, null));
235
236         final CountDownLatch newShardActorLatch = new CountDownLatch(2);
237         final Creator<ShardManager> creator = new Creator<ShardManager>() {
238             private static final long serialVersionUID = 1L;
239             @Override
240             public ShardManager create() throws Exception {
241                 return new ShardManager(ShardManager.builder().cluster(new MockClusterWrapper()).configuration(mockConfig).
242                         datastoreContextFactory(mockFactory).waitTillReadyCountdownLatch(ready).
243                         primaryShardInfoCache(primaryShardInfoCache)) {
244                     @Override
245                     protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
246                         Entry<ActorRef, DatastoreContext> entry = shardInfoMap.get(info.getShardName());
247                         ActorRef ref = null;
248                         if(entry != null) {
249                             ref = entry.getKey();
250                             entry.setValue(info.getDatastoreContext());
251                         }
252
253                         newShardActorLatch.countDown();
254                         return ref;
255                     }
256                 };
257             }
258         };
259
260         JavaTestKit kit = new JavaTestKit(getSystem());
261
262         final ActorRef shardManager = getSystem().actorOf(Props.create(new DelegatingShardManagerCreator(creator)).
263                     withDispatcher(Dispatchers.DefaultDispatcherId()));
264
265         shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), kit.getRef());
266
267         assertEquals("Shard actors created", true, newShardActorLatch.await(5, TimeUnit.SECONDS));
268         assertEquals("getShardElectionTimeoutFactor", 6, shardInfoMap.get("default").getValue().
269                 getShardElectionTimeoutFactor());
270         assertEquals("getShardElectionTimeoutFactor", 7, shardInfoMap.get("topology").getValue().
271                 getShardElectionTimeoutFactor());
272
273         DatastoreContextFactory newMockFactory = newDatastoreContextFactory(
274                 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
275         Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
276                 shardElectionTimeoutFactor(66).build()).when(newMockFactory).getShardDatastoreContext("default");
277
278         Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
279                 shardElectionTimeoutFactor(77).build()).when(newMockFactory).getShardDatastoreContext("topology");
280
281         shardManager.tell(newMockFactory, kit.getRef());
282
283         DatastoreContext newContext = MessageCollectorActor.expectFirstMatching(defaultShardActor, DatastoreContext.class);
284         assertEquals("getShardElectionTimeoutFactor", 66, newContext.getShardElectionTimeoutFactor());
285
286         newContext = MessageCollectorActor.expectFirstMatching(topologyShardActor, DatastoreContext.class);
287         assertEquals("getShardElectionTimeoutFactor", 77, newContext.getShardElectionTimeoutFactor());
288
289         defaultShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
290         topologyShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
291     }
292
293     @Test
294     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
295         new JavaTestKit(getSystem()) {{
296             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
297
298             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
299
300             shardManager.tell(new FindPrimary("non-existent", false), getRef());
301
302             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
303         }};
304     }
305
306     @Test
307     public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception {
308         new JavaTestKit(getSystem()) {{
309             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
310
311             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
312
313             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
314             shardManager.tell(new ActorInitialized(), mockShardActor);
315
316             DataTree mockDataTree = mock(DataTree.class);
317             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
318                     DataStoreVersions.CURRENT_VERSION), getRef());
319
320             MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
321             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
322                     RaftState.Leader.name())), mockShardActor);
323
324             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
325
326             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
327             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
328                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
329             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
330         }};
331     }
332
333     @Test
334     public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() throws Exception {
335         new JavaTestKit(getSystem()) {{
336             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
337
338             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
339             shardManager.tell(new ActorInitialized(), mockShardActor);
340
341             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
342             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
343             shardManager.tell(new RoleChangeNotification(memberId1,
344                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
345             shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION), mockShardActor);
346
347             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
348
349             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
350         }};
351     }
352
353     @Test
354     public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception {
355         new JavaTestKit(getSystem()) {{
356             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
357
358             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
359             shardManager.tell(new ActorInitialized(), mockShardActor);
360
361             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
362             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
363
364             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
365             shardManager.tell(new RoleChangeNotification(memberId1,
366                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
367             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
368             shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, Optional.<DataTree>absent(),
369                     leaderVersion), mockShardActor);
370
371             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
372
373             RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
374             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
375                     primaryFound.getPrimaryPath().contains("member-2-shard-default"));
376             assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
377         }};
378     }
379
380     @Test
381     public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception {
382         new JavaTestKit(getSystem()) {{
383             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
384
385             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
386
387             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
388         }};
389     }
390
391     @Test
392     public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception {
393         new JavaTestKit(getSystem()) {{
394             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
395
396             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
397             shardManager.tell(new ActorInitialized(), mockShardActor);
398
399             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
400
401             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
402         }};
403     }
404
405     @Test
406     public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception {
407         new JavaTestKit(getSystem()) {{
408             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
409
410             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
411             shardManager.tell(new ActorInitialized(), mockShardActor);
412
413             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
414             shardManager.tell(new RoleChangeNotification(memberId,
415                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
416
417             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
418
419             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
420
421             DataTree mockDataTree = mock(DataTree.class);
422             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
423                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
424
425             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
426
427             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
428             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
429                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
430             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
431         }};
432     }
433
434     @Test
435     public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
436         new JavaTestKit(getSystem()) {{
437             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
438
439             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
440
441             // We're passing waitUntilInitialized = true to FindPrimary so the response should be
442             // delayed until we send ActorInitialized and RoleChangeNotification.
443             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
444
445             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
446
447             shardManager.tell(new ActorInitialized(), mockShardActor);
448
449             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
450
451             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
452             shardManager.tell(new RoleChangeNotification(memberId,
453                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor);
454
455             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
456
457             DataTree mockDataTree = mock(DataTree.class);
458             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
459                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
460
461             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
462             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
463                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
464             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
465
466             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
467         }};
468     }
469
470     @Test
471     public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception {
472         new JavaTestKit(getSystem()) {{
473             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
474
475             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
476
477             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
478
479             expectMsgClass(duration("2 seconds"), NotInitializedException.class);
480
481             shardManager.tell(new ActorInitialized(), mockShardActor);
482
483             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
484         }};
485     }
486
487     @Test
488     public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception {
489         new JavaTestKit(getSystem()) {{
490             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
491
492             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
493             shardManager.tell(new ActorInitialized(), mockShardActor);
494             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
495                     null, RaftState.Candidate.name()), mockShardActor);
496
497             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
498
499             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
500         }};
501     }
502
503     @Test
504     public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() throws Exception {
505         new JavaTestKit(getSystem()) {{
506             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
507
508             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
509             shardManager.tell(new ActorInitialized(), mockShardActor);
510             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
511                     null, RaftState.IsolatedLeader.name()), mockShardActor);
512
513             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
514
515             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
516         }};
517     }
518
519     @Test
520     public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
521         new JavaTestKit(getSystem()) {{
522             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
523
524             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
525             shardManager.tell(new ActorInitialized(), mockShardActor);
526
527             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
528
529             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
530         }};
531     }
532
533     @Test
534     public void testOnReceiveFindPrimaryForRemoteShard() throws Exception {
535         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
536
537         // Create an ActorSystem ShardManager actor for member-1.
538
539         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
540         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
541
542         ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
543
544         final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
545                 newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
546                         new MockConfiguration()), shardManagerID);
547
548         // Create an ActorSystem ShardManager actor for member-2.
549
550         final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
551
552         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
553
554         final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
555
556         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
557                 put("default", Arrays.asList("member-1", "member-2")).
558                 put("astronauts", Arrays.asList("member-2")).build());
559
560         final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
561                 newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
562                         mockConfig2), shardManagerID);
563
564         new JavaTestKit(system1) {{
565
566             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
567             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
568
569             shardManager2.tell(new ActorInitialized(), mockShardActor2);
570
571             String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
572             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
573             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2,
574                     Optional.of(mock(DataTree.class)), leaderVersion), mockShardActor2);
575             shardManager2.tell(new RoleChangeNotification(memberId2,
576                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
577
578             shardManager1.underlyingActor().waitForMemberUp();
579
580             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
581
582             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
583             String path = found.getPrimaryPath();
584             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
585             assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
586
587             shardManager2.underlyingActor().verifyFindPrimary();
588
589             Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
590
591             shardManager1.underlyingActor().waitForMemberRemoved();
592
593             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
594
595             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
596         }};
597
598         JavaTestKit.shutdownActorSystem(system1);
599         JavaTestKit.shutdownActorSystem(system2);
600     }
601
602     @Test
603     public void testShardAvailabilityOnChangeOfMemberReachability() throws Exception {
604         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
605
606         // Create an ActorSystem ShardManager actor for member-1.
607
608         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
609         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
610
611         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
612
613         final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
614             newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
615                 new MockConfiguration()), shardManagerID);
616
617         // Create an ActorSystem ShardManager actor for member-2.
618
619         final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
620
621         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
622
623         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
624
625         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
626             put("default", Arrays.asList("member-1", "member-2")).build());
627
628         final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
629             newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
630                 mockConfig2), shardManagerID);
631
632         new JavaTestKit(system1) {{
633
634             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
635             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
636             shardManager1.tell(new ActorInitialized(), mockShardActor1);
637             shardManager2.tell(new ActorInitialized(), mockShardActor2);
638
639             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
640             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
641             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
642                 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
643             shardManager1.tell(new RoleChangeNotification(memberId1,
644                 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
645             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)),
646                     DataStoreVersions.CURRENT_VERSION),
647                 mockShardActor2);
648             shardManager2.tell(new RoleChangeNotification(memberId2,
649                 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
650             shardManager1.underlyingActor().waitForMemberUp();
651
652             shardManager1.tell(new FindPrimary("default", true), getRef());
653
654             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
655             String path = found.getPrimaryPath();
656             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
657
658             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
659                 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
660
661             shardManager1.underlyingActor().waitForUnreachableMember();
662
663             PeerDown peerDown = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
664             assertEquals("getMemberName", "member-2", peerDown.getMemberName());
665             MessageCollectorActor.clearMessages(mockShardActor1);
666
667             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
668                     createMemberRemoved("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
669
670             MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
671
672             shardManager1.tell(new FindPrimary("default", true), getRef());
673
674             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
675
676             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
677                 createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
678
679             shardManager1.underlyingActor().waitForReachableMember();
680
681             PeerUp peerUp = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
682             assertEquals("getMemberName", "member-2", peerUp.getMemberName());
683             MessageCollectorActor.clearMessages(mockShardActor1);
684
685             shardManager1.tell(new FindPrimary("default", true), getRef());
686
687             RemotePrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
688             String path1 = found1.getPrimaryPath();
689             assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
690
691             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
692                     createMemberUp("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
693
694             MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
695
696         }};
697
698         JavaTestKit.shutdownActorSystem(system1);
699         JavaTestKit.shutdownActorSystem(system2);
700     }
701
702     @Test
703     public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() throws Exception {
704         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
705
706         // Create an ActorSystem ShardManager actor for member-1.
707
708         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
709         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
710
711         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
712
713         final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
714             newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
715                 new MockConfiguration()), shardManagerID);
716
717         // Create an ActorSystem ShardManager actor for member-2.
718
719         final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
720
721         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
722
723         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
724
725         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
726             put("default", Arrays.asList("member-1", "member-2")).build());
727
728         final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
729             newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
730                 mockConfig2), shardManagerID);
731
732         new JavaTestKit(system1) {{
733
734             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
735             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
736             shardManager1.tell(new ActorInitialized(), mockShardActor1);
737             shardManager2.tell(new ActorInitialized(), mockShardActor2);
738
739             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
740             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
741             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
742                 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
743             shardManager1.tell(new RoleChangeNotification(memberId1,
744                 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
745             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)),
746                     DataStoreVersions.CURRENT_VERSION),
747                 mockShardActor2);
748             shardManager2.tell(new RoleChangeNotification(memberId2,
749                 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
750             shardManager1.underlyingActor().waitForMemberUp();
751
752             shardManager1.tell(new FindPrimary("default", true), getRef());
753
754             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
755             String path = found.getPrimaryPath();
756             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
757
758             primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(system1.actorSelection(
759                     mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION, Optional.<DataTree>absent()));
760
761             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
762                 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
763
764             shardManager1.underlyingActor().waitForUnreachableMember();
765
766             shardManager1.tell(new FindPrimary("default", true), getRef());
767
768             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
769
770             assertNull("Expected primaryShardInfoCache entry removed", primaryShardInfoCache.getIfPresent("default"));
771
772             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, Optional.of(mock(DataTree.class)),
773                     DataStoreVersions.CURRENT_VERSION), mockShardActor1);
774             shardManager1.tell(new RoleChangeNotification(memberId1,
775                 RaftState.Follower.name(), RaftState.Leader.name()), mockShardActor1);
776
777             shardManager1.tell(new FindPrimary("default", true), getRef());
778
779             LocalPrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
780             String path1 = found1.getPrimaryPath();
781             assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
782
783         }};
784
785         JavaTestKit.shutdownActorSystem(system1);
786         JavaTestKit.shutdownActorSystem(system2);
787     }
788
789
790     @Test
791     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
792         new JavaTestKit(getSystem()) {{
793             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
794
795             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
796
797             shardManager.tell(new FindLocalShard("non-existent", false), getRef());
798
799             LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
800
801             assertEquals("getShardName", "non-existent", notFound.getShardName());
802         }};
803     }
804
805     @Test
806     public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
807         new JavaTestKit(getSystem()) {{
808             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
809
810             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
811             shardManager.tell(new ActorInitialized(), mockShardActor);
812
813             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
814
815             LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
816
817             assertTrue("Found path contains " + found.getPath().path().toString(),
818                     found.getPath().path().toString().contains("member-1-shard-default-config"));
819         }};
820     }
821
822     @Test
823     public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
824         new JavaTestKit(getSystem()) {{
825             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
826
827             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
828
829             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
830         }};
831     }
832
833     @Test
834     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
835         new JavaTestKit(getSystem()) {{
836             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
837
838             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
839
840             // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
841             // delayed until we send ActorInitialized.
842             Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
843                     new Timeout(5, TimeUnit.SECONDS));
844
845             shardManager.tell(new ActorInitialized(), mockShardActor);
846
847             Object resp = Await.result(future, duration("5 seconds"));
848             assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
849         }};
850     }
851
852     @Test
853     public void testOnRecoveryJournalIsCleaned() {
854         InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
855                 ImmutableSet.of("foo")));
856         InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules(
857                 ImmutableSet.of("bar")));
858         InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
859
860         new JavaTestKit(getSystem()) {{
861             TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
862
863             shardManager.underlyingActor().waitForRecoveryComplete();
864             InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
865
866             // Journal entries up to the last one should've been deleted
867             Map<Long, Object> journal = InMemoryJournal.get(shardMgrID);
868             synchronized (journal) {
869                 assertEquals("Journal size", 0, journal.size());
870             }
871         }};
872     }
873
874     @Test
875     public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
876         TestShardManager shardManager = newTestShardManager();
877
878         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
879         shardManager.onReceiveCommand(new RoleChangeNotification(
880                 memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
881
882         verify(ready, never()).countDown();
883
884         shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
885                 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION));
886
887         verify(ready, times(1)).countDown();
888     }
889
890     @Test
891     public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
892         new JavaTestKit(getSystem()) {{
893             TestShardManager shardManager = newTestShardManager();
894
895             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
896             shardManager.onReceiveCommand(new RoleChangeNotification(
897                     memberId, null, RaftState.Follower.name()));
898
899             verify(ready, never()).countDown();
900
901             shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
902
903             shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId,
904                     "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
905                     DataStoreVersions.CURRENT_VERSION));
906
907             verify(ready, times(1)).countDown();
908         }};
909     }
910
911     @Test
912     public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
913         new JavaTestKit(getSystem()) {{
914             TestShardManager shardManager = newTestShardManager();
915
916             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
917             shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
918
919             verify(ready, never()).countDown();
920
921             shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId,
922                     "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
923                     DataStoreVersions.CURRENT_VERSION));
924
925             shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
926
927             verify(ready, times(1)).countDown();
928         }};
929     }
930
931     @Test
932     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
933         TestShardManager shardManager = newTestShardManager();
934
935         shardManager.onReceiveCommand(new RoleChangeNotification(
936                 "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
937
938         verify(ready, never()).countDown();
939     }
940
941     @Test
942     public void testByDefaultSyncStatusIsFalse() throws Exception{
943         TestShardManager shardManager = newTestShardManager();
944
945         assertEquals(false, shardManager.getMBean().getSyncStatus());
946     }
947
948     @Test
949     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
950         TestShardManager shardManager = newTestShardManager();
951
952         shardManager.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
953                 RaftState.Follower.name(), RaftState.Leader.name()));
954
955         assertEquals(true, shardManager.getMBean().getSyncStatus());
956     }
957
958     @Test
959     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
960         TestShardManager shardManager = newTestShardManager();
961
962         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
963         shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
964                 RaftState.Follower.name(), RaftState.Candidate.name()));
965
966         assertEquals(false, shardManager.getMBean().getSyncStatus());
967
968         // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
969         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(
970                 true, shardId));
971
972         assertEquals(false, shardManager.getMBean().getSyncStatus());
973     }
974
975     @Test
976     public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
977         TestShardManager shardManager = newTestShardManager();
978
979         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
980         shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
981                 RaftState.Candidate.name(), RaftState.Follower.name()));
982
983         // Initially will be false
984         assertEquals(false, shardManager.getMBean().getSyncStatus());
985
986         // Send status true will make sync status true
987         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, shardId));
988
989         assertEquals(true, shardManager.getMBean().getSyncStatus());
990
991         // Send status false will make sync status false
992         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(false, shardId));
993
994         assertEquals(false, shardManager.getMBean().getSyncStatus());
995
996     }
997
998     @Test
999     public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
1000         TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
1001             @Override
1002             public List<String> getMemberShardNames(String memberName) {
1003                 return Arrays.asList("default", "astronauts");
1004             }
1005         }));
1006
1007         // Initially will be false
1008         assertEquals(false, shardManager.getMBean().getSyncStatus());
1009
1010         // Make default shard leader
1011         String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix;
1012         shardManager.onReceiveCommand(new RoleChangeNotification(defaultShardId,
1013                 RaftState.Follower.name(), RaftState.Leader.name()));
1014
1015         // default = Leader, astronauts is unknown so sync status remains false
1016         assertEquals(false, shardManager.getMBean().getSyncStatus());
1017
1018         // Make astronauts shard leader as well
1019         String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1020         shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
1021                 RaftState.Follower.name(), RaftState.Leader.name()));
1022
1023         // Now sync status should be true
1024         assertEquals(true, shardManager.getMBean().getSyncStatus());
1025
1026         // Make astronauts a Follower
1027         shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
1028                 RaftState.Leader.name(), RaftState.Follower.name()));
1029
1030         // Sync status is not true
1031         assertEquals(false, shardManager.getMBean().getSyncStatus());
1032
1033         // Make the astronauts follower sync status true
1034         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
1035
1036         // Sync status is now true
1037         assertEquals(true, shardManager.getMBean().getSyncStatus());
1038
1039     }
1040
1041     @Test
1042     public void testOnReceiveSwitchShardBehavior() throws Exception {
1043         new JavaTestKit(getSystem()) {{
1044             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
1045
1046             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1047             shardManager.tell(new ActorInitialized(), mockShardActor);
1048
1049             shardManager.tell(new SwitchShardBehavior(mockShardName, "Leader", 1000), getRef());
1050
1051             SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor, SwitchBehavior.class);
1052
1053             assertEquals(RaftState.Leader, switchBehavior.getNewState());
1054             assertEquals(1000, switchBehavior.getNewTerm());
1055         }};
1056     }
1057
1058     @Test
1059     public void testOnReceiveCreateShard() {
1060         new JavaTestKit(getSystem()) {{
1061             datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1062
1063             ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
1064                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1065
1066             SchemaContext schemaContext = TestModel.createTestContext();
1067             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1068
1069             DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100).
1070                     persistent(false).build();
1071             Shard.Builder shardBuilder = Shard.builder();
1072
1073             ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1074                     "foo", null, Arrays.asList("member-1", "member-5", "member-6"));
1075             shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), getRef());
1076
1077             expectMsgClass(duration("5 seconds"), CreateShardReply.class);
1078
1079             shardManager.tell(new FindLocalShard("foo", true), getRef());
1080
1081             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1082
1083             assertEquals("isRecoveryApplicable", false, shardBuilder.getDatastoreContext().isPersistent());
1084             assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig().
1085                     getPeerAddressResolver() instanceof ShardPeerAddressResolver);
1086             assertEquals("peerMembers", Sets.newHashSet(new ShardIdentifier("foo", "member-5", shardMrgIDSuffix).toString(),
1087                     new ShardIdentifier("foo", "member-6", shardMrgIDSuffix).toString()),
1088                     shardBuilder.getPeerAddresses().keySet());
1089             assertEquals("ShardIdentifier", new ShardIdentifier("foo", "member-1", shardMrgIDSuffix),
1090                     shardBuilder.getId());
1091             assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1092
1093             // Send CreateShard with same name - should fail.
1094
1095             shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1096
1097             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1098         }};
1099     }
1100
1101     @Test
1102     public void testOnReceiveCreateShardWithNoInitialSchemaContext() {
1103         new JavaTestKit(getSystem()) {{
1104             ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
1105                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1106
1107             Shard.Builder shardBuilder = Shard.builder();
1108
1109             ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1110                     "foo", null, Arrays.asList("member-1"));
1111             shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1112
1113             expectMsgClass(duration("5 seconds"), CreateShardReply.class);
1114
1115             SchemaContext schemaContext = TestModel.createTestContext();
1116             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1117
1118             shardManager.tell(new FindLocalShard("foo", true), getRef());
1119
1120             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1121
1122             assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1123             assertNotNull("schemaContext is null", shardBuilder.getDatastoreContext());
1124         }};
1125     }
1126
1127     @Test
1128     public void testGetSnapshot() throws Throwable {
1129         JavaTestKit kit = new JavaTestKit(getSystem());
1130
1131         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1132                    put("shard1", Arrays.asList("member-1")).
1133                    put("shard2", Arrays.asList("member-1")).build());
1134
1135         ActorRef shardManager = getSystem().actorOf(newShardMgrProps(mockConfig).withDispatcher(
1136                 Dispatchers.DefaultDispatcherId()));
1137
1138         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1139         Failure failure = kit.expectMsgClass(Failure.class);
1140         assertEquals("Failure cause type", IllegalStateException.class, failure.cause().getClass());
1141
1142         kit = new JavaTestKit(getSystem());
1143
1144         shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1145
1146         shardManager.tell(new FindLocalShard("shard1", true), kit.getRef());
1147         kit.expectMsgClass(LocalShardFound.class);
1148         shardManager.tell(new FindLocalShard("shard2", true), kit.getRef());
1149         kit.expectMsgClass(LocalShardFound.class);
1150
1151         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1152
1153         DatastoreSnapshot datastoreSnapshot = kit.expectMsgClass(DatastoreSnapshot.class);
1154
1155         assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1156         List<ShardSnapshot> shardSnapshots = datastoreSnapshot.getShardSnapshots();
1157         Set<String> actualShardNames = new HashSet<>();
1158         for(ShardSnapshot s: shardSnapshots) {
1159             actualShardNames.add(s.getName());
1160         }
1161
1162         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), actualShardNames);
1163
1164         shardManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
1165     }
1166
1167     @Test
1168     public void testAddShardReplicaForNonExistentShard() throws Exception {
1169         new JavaTestKit(getSystem()) {{
1170             ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
1171                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1172
1173             shardManager.tell(new AddShardReplica("model-inventory"), getRef());
1174             Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class);
1175
1176             assertEquals("Failure obtained", true,
1177                           (resp.cause() instanceof IllegalArgumentException));
1178         }};
1179     }
1180
1181     @Test
1182     public void testAddShardReplicaForAlreadyCreatedShard() throws Exception {
1183         new JavaTestKit(getSystem()) {{
1184             ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
1185             shardManager.tell(new AddShardReplica("default"), getRef());
1186             Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class);
1187             assertEquals("Failure obtained", true,
1188                           (resp.cause() instanceof IllegalArgumentException));
1189         }};
1190     }
1191
1192     @Test
1193     public void testAddShardReplica() throws Exception {
1194         MockConfiguration mockConfig =
1195                 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1196                    put("default", Arrays.asList("member-1", "member-2")).
1197                    put("astronauts", Arrays.asList("member-2")).build());
1198
1199         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1200
1201         // Create an ActorSystem ShardManager actor for member-1.
1202         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
1203         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1204         ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1205         final TestActorRef<ForwardingShardManager> newReplicaShardManager = TestActorRef.create(system1,
1206                 newPropsShardMgrWithMockShardActor("shardManager1", mockDefaultShardActor,
1207                    new ClusterWrapperImpl(system1), mockConfig), shardManagerID);
1208
1209         // Create an ActorSystem ShardManager actor for member-2.
1210         final ActorSystem system2 = ActorSystem.create("cluster-test",
1211             ConfigFactory.load().getConfig("Member2"));
1212         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1213
1214         String name = new ShardIdentifier("astronauts", "member-2", "config").toString();
1215         final TestActorRef<MockRespondActor> mockShardLeaderActor =
1216             TestActorRef.create(system2, Props.create(MockRespondActor.class), name);
1217         final TestActorRef<ForwardingShardManager> leaderShardManager = TestActorRef.create(system2,
1218                 newPropsShardMgrWithMockShardActor("shardManager2", mockShardLeaderActor,
1219                         new ClusterWrapperImpl(system2), mockConfig), shardManagerID);
1220
1221         new JavaTestKit(system1) {{
1222
1223             newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1224             leaderShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1225
1226             leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1227
1228             String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
1229             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1230             leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2,
1231                     Optional.of(mock(DataTree.class)), leaderVersion), mockShardLeaderActor);
1232             leaderShardManager.tell(new RoleChangeNotification(memberId2,
1233                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardLeaderActor);
1234
1235             newReplicaShardManager.underlyingActor().waitForMemberUp();
1236             leaderShardManager.underlyingActor().waitForMemberUp();
1237
1238             //construct a mock response message
1239             AddServerReply response = new AddServerReply(ServerChangeStatus.OK, memberId2);
1240             mockShardLeaderActor.underlyingActor().updateResponse(response);
1241             newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
1242             AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1243                 AddServer.class);
1244             String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1245             assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
1246             newReplicaShardManager.underlyingActor()
1247                 .verifySnapshotPersisted(Sets.newHashSet("default", "astronauts"));
1248             expectMsgClass(duration("5 seconds"), Status.Success.class);
1249         }};
1250
1251         JavaTestKit.shutdownActorSystem(system1);
1252         JavaTestKit.shutdownActorSystem(system2);
1253     }
1254
1255     @Test
1256     public void testAddShardReplicaWithFindPrimaryTimeout() throws Exception {
1257         MockConfiguration mockConfig =
1258                 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1259                    put("default", Arrays.asList("member-1", "member-2")).
1260                    put("astronauts", Arrays.asList("member-2")).build());
1261
1262         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1263
1264         // Create an ActorSystem ShardManager actor for member-1.
1265         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
1266         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1267         ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1268         final TestActorRef<ForwardingShardManager> newReplicaShardManager = TestActorRef.create(system1,
1269                 newPropsShardMgrWithMockShardActor("shardManager1", mockDefaultShardActor,
1270                    new ClusterWrapperImpl(system1), mockConfig), shardManagerID);
1271
1272         new JavaTestKit(system1) {{
1273
1274             newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1275             MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2", getRef().path().toString());
1276             newReplicaShardManager.underlyingActor().waitForMemberUp();
1277
1278             newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
1279             Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class);
1280             assertEquals("Failure obtained", true,
1281                           (resp.cause() instanceof RuntimeException));
1282         }};
1283
1284         JavaTestKit.shutdownActorSystem(system1);
1285     }
1286
1287     @Test
1288     public void testRemoveShardReplicaForNonExistentShard() throws Exception {
1289         new JavaTestKit(getSystem()) {{
1290             ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
1291                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1292
1293             shardManager.tell(new RemoveShardReplica("model-inventory"), getRef());
1294             Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class);
1295             assertEquals("Failure obtained", true,
1296                          (resp.cause() instanceof IllegalArgumentException));
1297         }};
1298
1299     }
1300
1301     @Test
1302     public void testShardPersistenceWithRestoredData() throws Exception {
1303         new JavaTestKit(getSystem()) {{
1304             MockConfiguration mockConfig =
1305                 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1306                    put("default", Arrays.asList("member-1", "member-2")).
1307                    put("astronauts", Arrays.asList("member-2")).
1308                    put("people", Arrays.asList("member-1", "member-2")).build());
1309             String[] restoredShards = {"default", "astronauts"};
1310             ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards));
1311             InMemorySnapshotStore.addSnapshot(shardMgrID, snapshot);
1312
1313             //create shardManager to come up with restored data
1314             TestActorRef<TestShardManager> newRestoredShardManager = TestActorRef.create(getSystem(),
1315                     newShardMgrProps(mockConfig));
1316
1317             newRestoredShardManager.underlyingActor().waitForRecoveryComplete();
1318
1319             newRestoredShardManager.tell(new FindLocalShard("people", false), getRef());
1320             LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
1321             assertEquals("for uninitialized shard", "people", notFound.getShardName());
1322
1323             //Verify a local shard is created for the restored shards,
1324             //although we expect a NotInitializedException for the shards as the actor initialization
1325             //message is not sent for them
1326             newRestoredShardManager.tell(new FindLocalShard("default", false), getRef());
1327             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1328
1329             newRestoredShardManager.tell(new FindLocalShard("astronauts", false), getRef());
1330             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1331         }};
1332     }
1333
1334
1335     private static class TestShardManager extends ShardManager {
1336         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
1337
1338         private TestShardManager(Builder builder) {
1339             super(builder);
1340         }
1341
1342         @Override
1343         public void handleRecover(Object message) throws Exception {
1344             try {
1345                 super.handleRecover(message);
1346             } finally {
1347                 if(message instanceof RecoveryCompleted) {
1348                     recoveryComplete.countDown();
1349                 }
1350             }
1351         }
1352
1353         void waitForRecoveryComplete() {
1354             assertEquals("Recovery complete", true,
1355                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
1356         }
1357
1358         public static Builder builder(DatastoreContext.Builder datastoreContextBuilder) {
1359             return new Builder(datastoreContextBuilder);
1360         }
1361
1362         private static class Builder extends ShardManager.Builder {
1363             Builder(DatastoreContext.Builder datastoreContextBuilder) {
1364                 cluster(new MockClusterWrapper()).configuration(new MockConfiguration());
1365                 datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build()));
1366                 waitTillReadyCountdownLatch(ready).primaryShardInfoCache(new PrimaryShardInfoFutureCache());
1367             }
1368
1369             @Override
1370             public Props props() {
1371                 verify();
1372                 return Props.create(TestShardManager.class, this);
1373             }
1374         }
1375     }
1376
1377     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
1378         private static final long serialVersionUID = 1L;
1379         private final Creator<ShardManager> delegate;
1380
1381         public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
1382             this.delegate = delegate;
1383         }
1384
1385         @Override
1386         public ShardManager create() throws Exception {
1387             return delegate.create();
1388         }
1389     }
1390
1391     private static class ForwardingShardManager extends ShardManager {
1392         private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
1393         private CountDownLatch memberUpReceived = new CountDownLatch(1);
1394         private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
1395         private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
1396         private CountDownLatch memberReachableReceived = new CountDownLatch(1);
1397         private final ActorRef shardActor;
1398         private final String name;
1399         private final CountDownLatch snapshotPersist = new CountDownLatch(1);
1400         private ShardManagerSnapshot snapshot;
1401
1402         public ForwardingShardManager(Builder builder, String name, ActorRef shardActor) {
1403             super(builder);
1404             this.shardActor = shardActor;
1405             this.name = name;
1406         }
1407
1408         @Override
1409         public void handleCommand(Object message) throws Exception {
1410             try{
1411                 super.handleCommand(message);
1412             } finally {
1413                 if(message instanceof FindPrimary) {
1414                     findPrimaryMessageReceived.countDown();
1415                 } else if(message instanceof ClusterEvent.MemberUp) {
1416                     String role = ((ClusterEvent.MemberUp)message).member().roles().head();
1417                     if(!getCluster().getCurrentMemberName().equals(role)) {
1418                         memberUpReceived.countDown();
1419                     }
1420                 } else if(message instanceof ClusterEvent.MemberRemoved) {
1421                     String role = ((ClusterEvent.MemberRemoved)message).member().roles().head();
1422                     if(!getCluster().getCurrentMemberName().equals(role)) {
1423                         memberRemovedReceived.countDown();
1424                     }
1425                 } else if(message instanceof ClusterEvent.UnreachableMember) {
1426                     String role = ((ClusterEvent.UnreachableMember)message).member().roles().head();
1427                     if(!getCluster().getCurrentMemberName().equals(role)) {
1428                         memberUnreachableReceived.countDown();
1429                     }
1430                 } else if(message instanceof ClusterEvent.ReachableMember) {
1431                     String role = ((ClusterEvent.ReachableMember)message).member().roles().head();
1432                     if(!getCluster().getCurrentMemberName().equals(role)) {
1433                         memberReachableReceived.countDown();
1434                     }
1435                 }
1436             }
1437         }
1438
1439         @Override
1440         public String persistenceId() {
1441             return name;
1442         }
1443
1444         @Override
1445         protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
1446             return shardActor;
1447         }
1448
1449         void waitForMemberUp() {
1450             assertEquals("MemberUp received", true,
1451                     Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
1452             memberUpReceived = new CountDownLatch(1);
1453         }
1454
1455         void waitForMemberRemoved() {
1456             assertEquals("MemberRemoved received", true,
1457                     Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
1458             memberRemovedReceived = new CountDownLatch(1);
1459         }
1460
1461         void waitForUnreachableMember() {
1462             assertEquals("UnreachableMember received", true,
1463                 Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS
1464                 ));
1465             memberUnreachableReceived = new CountDownLatch(1);
1466         }
1467
1468         void waitForReachableMember() {
1469             assertEquals("ReachableMember received", true,
1470                 Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
1471             memberReachableReceived = new CountDownLatch(1);
1472         }
1473
1474         void verifyFindPrimary() {
1475             assertEquals("FindPrimary received", true,
1476                     Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
1477             findPrimaryMessageReceived = new CountDownLatch(1);
1478         }
1479
1480         @Override
1481         public void saveSnapshot(Object obj) {
1482             snapshot = (ShardManagerSnapshot) obj;
1483             snapshotPersist.countDown();
1484         }
1485
1486         void verifySnapshotPersisted(Set<String> shardList) {
1487             assertEquals("saveSnapshot invoked", true,
1488                 Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
1489             assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
1490         }
1491     }
1492
1493     private static class MockRespondActor extends MessageCollectorActor {
1494
1495         private volatile Object responseMsg;
1496
1497         public void updateResponse(Object response) {
1498             responseMsg = response;
1499         }
1500
1501         @Override
1502         public void onReceive(Object message) throws Exception {
1503             super.onReceive(message);
1504             if (message instanceof AddServer) {
1505                 if (responseMsg != null) {
1506                     getSender().tell(responseMsg, getSelf());
1507                     responseMsg = null;
1508                 }
1509             }
1510         }
1511     }
1512 }