Fix license header violations in sal-distributed-datastore
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardManagerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNull;
13 import static org.junit.Assert.assertSame;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.mock;
16 import static org.mockito.Mockito.never;
17 import static org.mockito.Mockito.times;
18 import static org.mockito.Mockito.verify;
19 import akka.actor.ActorRef;
20 import akka.actor.ActorSystem;
21 import akka.actor.AddressFromURIString;
22 import akka.actor.Props;
23 import akka.cluster.Cluster;
24 import akka.cluster.ClusterEvent;
25 import akka.dispatch.Dispatchers;
26 import akka.japi.Creator;
27 import akka.pattern.Patterns;
28 import akka.persistence.RecoveryCompleted;
29 import akka.testkit.JavaTestKit;
30 import akka.testkit.TestActorRef;
31 import akka.util.Timeout;
32 import com.google.common.base.Optional;
33 import com.google.common.collect.ImmutableMap;
34 import com.google.common.collect.ImmutableSet;
35 import com.google.common.util.concurrent.Uninterruptibles;
36 import com.typesafe.config.ConfigFactory;
37 import java.util.Arrays;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.concurrent.CountDownLatch;
41 import java.util.concurrent.TimeUnit;
42 import org.junit.After;
43 import org.junit.Before;
44 import org.junit.Test;
45 import org.mockito.Mock;
46 import org.mockito.MockitoAnnotations;
47 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
48 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
49 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
50 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
51 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
52 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
53 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
54 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
55 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
56 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
57 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
58 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
59 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
60 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
61 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
62 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
63 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
64 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
65 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
66 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
67 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
68 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
69 import org.opendaylight.controller.cluster.raft.RaftState;
70 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
71 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
72 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
73 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
74 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
75 import scala.concurrent.Await;
76 import scala.concurrent.Future;
77 import scala.concurrent.duration.FiniteDuration;
78
79 public class ShardManagerTest extends AbstractActorTest {
80     private static int ID_COUNTER = 1;
81
82     private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
83     private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix;
84
85     @Mock
86     private static CountDownLatch ready;
87
88     private static TestActorRef<MessageCollectorActor> mockShardActor;
89
90     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
91             dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
92                    .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
93
94     private static ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
95         String name = new ShardIdentifier(shardName, memberName,"config").toString();
96         return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
97     }
98
99     private final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
100
101     @Before
102     public void setUp() {
103         MockitoAnnotations.initMocks(this);
104
105         InMemoryJournal.clear();
106
107         if(mockShardActor == null) {
108             String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString();
109             mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), name);
110         }
111
112         mockShardActor.underlyingActor().clear();
113     }
114
115     @After
116     public void tearDown() {
117         InMemoryJournal.clear();
118     }
119
120     private Props newShardMgrProps(boolean persistent) {
121         return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
122                 datastoreContextBuilder.persistent(persistent).build(), ready, primaryShardInfoCache);
123     }
124
125     private Props newPropsShardMgrWithMockShardActor() {
126         return newPropsShardMgrWithMockShardActor("shardManager", mockShardActor, new MockClusterWrapper(),
127                 new MockConfiguration());
128     }
129
130     private Props newPropsShardMgrWithMockShardActor(final String name, final ActorRef shardActor,
131             final ClusterWrapper clusterWrapper, final Configuration config) {
132         Creator<ShardManager> creator = new Creator<ShardManager>() {
133             private static final long serialVersionUID = 1L;
134             @Override
135             public ShardManager create() throws Exception {
136                 return new ForwardingShardManager(clusterWrapper, config, datastoreContextBuilder.build(),
137                         ready, name, shardActor, primaryShardInfoCache);
138             }
139         };
140
141         return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
142     }
143
144     @Test
145     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
146         new JavaTestKit(getSystem()) {{
147             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
148
149             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
150
151             shardManager.tell(new FindPrimary("non-existent", false), getRef());
152
153             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
154         }};
155     }
156
157     @Test
158     public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception {
159         new JavaTestKit(getSystem()) {{
160             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
161
162             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
163
164             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
165             shardManager.tell(new ActorInitialized(), mockShardActor);
166
167             DataTree mockDataTree = mock(DataTree.class);
168             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
169                     DataStoreVersions.CURRENT_VERSION), getRef());
170
171             MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
172             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
173                     RaftState.Leader.name())), mockShardActor);
174
175             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
176
177             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
178             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
179                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
180             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
181         }};
182     }
183
184     @Test
185     public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() throws Exception {
186         new JavaTestKit(getSystem()) {{
187             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
188
189             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
190             shardManager.tell(new ActorInitialized(), mockShardActor);
191
192             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
193             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
194             shardManager.tell(new RoleChangeNotification(memberId1,
195                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
196             shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION), mockShardActor);
197
198             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
199
200             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
201         }};
202     }
203
204     @Test
205     public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception {
206         new JavaTestKit(getSystem()) {{
207             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
208
209             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
210             shardManager.tell(new ActorInitialized(), mockShardActor);
211
212             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
213             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
214
215             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
216             shardManager.tell(new RoleChangeNotification(memberId1,
217                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
218             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
219             shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, Optional.<DataTree>absent(),
220                     leaderVersion), mockShardActor);
221
222             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
223
224             RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
225             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
226                     primaryFound.getPrimaryPath().contains("member-2-shard-default"));
227             assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
228         }};
229     }
230
231     @Test
232     public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception {
233         new JavaTestKit(getSystem()) {{
234             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
235
236             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
237
238             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
239         }};
240     }
241
242     @Test
243     public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception {
244         new JavaTestKit(getSystem()) {{
245             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
246
247             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
248             shardManager.tell(new ActorInitialized(), mockShardActor);
249
250             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
251
252             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
253         }};
254     }
255
256     @Test
257     public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception {
258         new JavaTestKit(getSystem()) {{
259             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
260
261             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
262             shardManager.tell(new ActorInitialized(), mockShardActor);
263
264             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
265             shardManager.tell(new RoleChangeNotification(memberId,
266                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
267
268             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
269
270             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
271
272             DataTree mockDataTree = mock(DataTree.class);
273             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
274                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
275
276             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
277
278             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
279             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
280                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
281             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
282         }};
283     }
284
285     @Test
286     public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
287         new JavaTestKit(getSystem()) {{
288             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
289
290             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
291
292             // We're passing waitUntilInitialized = true to FindPrimary so the response should be
293             // delayed until we send ActorInitialized and RoleChangeNotification.
294             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
295
296             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
297
298             shardManager.tell(new ActorInitialized(), mockShardActor);
299
300             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
301
302             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
303             shardManager.tell(new RoleChangeNotification(memberId,
304                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor);
305
306             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
307
308             DataTree mockDataTree = mock(DataTree.class);
309             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
310                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
311
312             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
313             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
314                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
315             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
316
317             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
318         }};
319     }
320
321     @Test
322     public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception {
323         new JavaTestKit(getSystem()) {{
324             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
325
326             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
327
328             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
329
330             expectMsgClass(duration("2 seconds"), NotInitializedException.class);
331
332             shardManager.tell(new ActorInitialized(), mockShardActor);
333
334             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
335         }};
336     }
337
338     @Test
339     public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception {
340         new JavaTestKit(getSystem()) {{
341             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
342
343             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
344             shardManager.tell(new ActorInitialized(), mockShardActor);
345             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
346                     null, RaftState.Candidate.name()), mockShardActor);
347
348             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
349
350             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
351         }};
352     }
353
354     @Test
355     public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() throws Exception {
356         new JavaTestKit(getSystem()) {{
357             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
358
359             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
360             shardManager.tell(new ActorInitialized(), mockShardActor);
361             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
362                     null, RaftState.IsolatedLeader.name()), mockShardActor);
363
364             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
365
366             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
367         }};
368     }
369
370     @Test
371     public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
372         new JavaTestKit(getSystem()) {{
373             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
374
375             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
376             shardManager.tell(new ActorInitialized(), mockShardActor);
377
378             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
379
380             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
381         }};
382     }
383
384     @Test
385     public void testOnReceiveFindPrimaryForRemoteShard() throws Exception {
386         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
387
388         // Create an ActorSystem ShardManager actor for member-1.
389
390         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
391         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
392
393         ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
394
395         final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
396                 newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
397                         new MockConfiguration()), shardManagerID);
398
399         // Create an ActorSystem ShardManager actor for member-2.
400
401         final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
402
403         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
404
405         final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
406
407         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
408                 put("default", Arrays.asList("member-1", "member-2")).
409                 put("astronauts", Arrays.asList("member-2")).build());
410
411         final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
412                 newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
413                         mockConfig2), shardManagerID);
414
415         new JavaTestKit(system1) {{
416
417             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
418             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
419
420             shardManager2.tell(new ActorInitialized(), mockShardActor2);
421
422             String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
423             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
424             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2,
425                     Optional.of(mock(DataTree.class)), leaderVersion), mockShardActor2);
426             shardManager2.tell(new RoleChangeNotification(memberId2,
427                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
428
429             shardManager1.underlyingActor().waitForMemberUp();
430
431             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
432
433             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
434             String path = found.getPrimaryPath();
435             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
436             assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
437
438             shardManager2.underlyingActor().verifyFindPrimary();
439
440             Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
441
442             shardManager1.underlyingActor().waitForMemberRemoved();
443
444             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
445
446             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
447         }};
448
449         JavaTestKit.shutdownActorSystem(system1);
450         JavaTestKit.shutdownActorSystem(system2);
451     }
452
453     @Test
454     public void testShardAvailabilityOnChangeOfMemberReachability() throws Exception {
455         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
456
457         // Create an ActorSystem ShardManager actor for member-1.
458
459         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
460         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
461
462         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
463
464         final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
465             newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
466                 new MockConfiguration()), shardManagerID);
467
468         // Create an ActorSystem ShardManager actor for member-2.
469
470         final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
471
472         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
473
474         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
475
476         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
477             put("default", Arrays.asList("member-1", "member-2")).build());
478
479         final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
480             newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
481                 mockConfig2), shardManagerID);
482
483         new JavaTestKit(system1) {{
484
485             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
486             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
487             shardManager1.tell(new ActorInitialized(), mockShardActor1);
488             shardManager2.tell(new ActorInitialized(), mockShardActor2);
489
490             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
491             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
492             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
493                 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
494             shardManager1.tell(new RoleChangeNotification(memberId1,
495                 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
496             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)),
497                     DataStoreVersions.CURRENT_VERSION),
498                 mockShardActor2);
499             shardManager2.tell(new RoleChangeNotification(memberId2,
500                 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
501             shardManager1.underlyingActor().waitForMemberUp();
502
503             shardManager1.tell(new FindPrimary("default", true), getRef());
504
505             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
506             String path = found.getPrimaryPath();
507             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
508
509             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
510                 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
511
512             shardManager1.underlyingActor().waitForUnreachableMember();
513
514             shardManager1.tell(new FindPrimary("default", true), getRef());
515
516             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
517
518             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
519                 createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
520
521             shardManager1.underlyingActor().waitForReachableMember();
522
523             shardManager1.tell(new FindPrimary("default", true), getRef());
524
525             RemotePrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
526             String path1 = found1.getPrimaryPath();
527             assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
528
529         }};
530
531         JavaTestKit.shutdownActorSystem(system1);
532         JavaTestKit.shutdownActorSystem(system2);
533     }
534
535     @Test
536     public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() throws Exception {
537         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
538
539         // Create an ActorSystem ShardManager actor for member-1.
540
541         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
542         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
543
544         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
545
546         final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
547             newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
548                 new MockConfiguration()), shardManagerID);
549
550         // Create an ActorSystem ShardManager actor for member-2.
551
552         final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
553
554         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
555
556         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
557
558         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
559             put("default", Arrays.asList("member-1", "member-2")).build());
560
561         final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
562             newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
563                 mockConfig2), shardManagerID);
564
565         new JavaTestKit(system1) {{
566
567             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
568             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
569             shardManager1.tell(new ActorInitialized(), mockShardActor1);
570             shardManager2.tell(new ActorInitialized(), mockShardActor2);
571
572             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
573             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
574             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
575                 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
576             shardManager1.tell(new RoleChangeNotification(memberId1,
577                 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
578             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)),
579                     DataStoreVersions.CURRENT_VERSION),
580                 mockShardActor2);
581             shardManager2.tell(new RoleChangeNotification(memberId2,
582                 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
583             shardManager1.underlyingActor().waitForMemberUp();
584
585             shardManager1.tell(new FindPrimary("default", true), getRef());
586
587             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
588             String path = found.getPrimaryPath();
589             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
590
591             primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(system1.actorSelection(
592                     mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION, Optional.<DataTree>absent()));
593
594             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
595                 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
596
597             shardManager1.underlyingActor().waitForUnreachableMember();
598
599             shardManager1.tell(new FindPrimary("default", true), getRef());
600
601             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
602
603             assertNull("Expected primaryShardInfoCache entry removed", primaryShardInfoCache.getIfPresent("default"));
604
605             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, Optional.of(mock(DataTree.class)),
606                     DataStoreVersions.CURRENT_VERSION), mockShardActor1);
607             shardManager1.tell(new RoleChangeNotification(memberId1,
608                 RaftState.Follower.name(), RaftState.Leader.name()), mockShardActor1);
609
610             shardManager1.tell(new FindPrimary("default", true), getRef());
611
612             LocalPrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
613             String path1 = found1.getPrimaryPath();
614             assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
615
616         }};
617
618         JavaTestKit.shutdownActorSystem(system1);
619         JavaTestKit.shutdownActorSystem(system2);
620     }
621
622
623     @Test
624     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
625         new JavaTestKit(getSystem()) {{
626             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
627
628             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
629
630             shardManager.tell(new FindLocalShard("non-existent", false), getRef());
631
632             LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
633
634             assertEquals("getShardName", "non-existent", notFound.getShardName());
635         }};
636     }
637
638     @Test
639     public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
640         new JavaTestKit(getSystem()) {{
641             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
642
643             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
644             shardManager.tell(new ActorInitialized(), mockShardActor);
645
646             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
647
648             LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
649
650             assertTrue("Found path contains " + found.getPath().path().toString(),
651                     found.getPath().path().toString().contains("member-1-shard-default-config"));
652         }};
653     }
654
655     @Test
656     public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
657         new JavaTestKit(getSystem()) {{
658             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
659
660             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
661
662             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
663         }};
664     }
665
666     @Test
667     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
668         new JavaTestKit(getSystem()) {{
669             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
670
671             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
672
673             // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
674             // delayed until we send ActorInitialized.
675             Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
676                     new Timeout(5, TimeUnit.SECONDS));
677
678             shardManager.tell(new ActorInitialized(), mockShardActor);
679
680             Object resp = Await.result(future, duration("5 seconds"));
681             assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
682         }};
683     }
684
685     @Test
686     public void testOnRecoveryJournalIsCleaned() {
687         InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
688                 ImmutableSet.of("foo")));
689         InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules(
690                 ImmutableSet.of("bar")));
691         InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
692
693         new JavaTestKit(getSystem()) {{
694             TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
695                     Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
696
697             shardManager.underlyingActor().waitForRecoveryComplete();
698             InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
699
700             // Journal entries up to the last one should've been deleted
701             Map<Long, Object> journal = InMemoryJournal.get(shardMgrID);
702             synchronized (journal) {
703                 assertEquals("Journal size", 0, journal.size());
704             }
705         }};
706     }
707
708     @Test
709     public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
710         new JavaTestKit(getSystem()) {
711             {
712                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
713
714                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
715                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
716                         memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
717
718                 verify(ready, never()).countDown();
719
720                 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
721                         Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION));
722
723                 verify(ready, times(1)).countDown();
724
725             }};
726     }
727
728     @Test
729     public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
730         new JavaTestKit(getSystem()) {
731             {
732                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
733
734                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
735                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
736                         memberId, null, RaftState.Follower.name()));
737
738                 verify(ready, never()).countDown();
739
740                 shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
741
742                 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId,
743                         "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
744                         DataStoreVersions.CURRENT_VERSION));
745
746                 verify(ready, times(1)).countDown();
747
748             }};
749     }
750
751     @Test
752     public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
753         new JavaTestKit(getSystem()) {
754             {
755                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
756
757                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
758                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
759                         memberId, null, RaftState.Follower.name()));
760
761                 verify(ready, never()).countDown();
762
763                 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId,
764                         "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
765                         DataStoreVersions.CURRENT_VERSION));
766
767                 shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
768
769                 verify(ready, times(1)).countDown();
770
771             }};
772     }
773
774     @Test
775     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
776         new JavaTestKit(getSystem()) {
777             {
778                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
779
780                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
781                         "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
782
783                 verify(ready, never()).countDown();
784
785             }};
786     }
787
788
789     @Test
790     public void testByDefaultSyncStatusIsFalse() throws Exception{
791         final Props persistentProps = newShardMgrProps(true);
792         final TestActorRef<ShardManager> shardManager =
793                 TestActorRef.create(getSystem(), persistentProps);
794
795         ShardManager shardManagerActor = shardManager.underlyingActor();
796
797         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
798     }
799
800     @Test
801     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
802         final Props persistentProps = ShardManager.props(
803                 new MockClusterWrapper(),
804                 new MockConfiguration(),
805                 DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
806         final TestActorRef<ShardManager> shardManager =
807                 TestActorRef.create(getSystem(), persistentProps);
808
809         ShardManager shardManagerActor = shardManager.underlyingActor();
810         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
811                 RaftState.Follower.name(), RaftState.Leader.name()));
812
813         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
814     }
815
816     @Test
817     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
818         final Props persistentProps = newShardMgrProps(true);
819         final TestActorRef<ShardManager> shardManager =
820                 TestActorRef.create(getSystem(), persistentProps);
821
822         ShardManager shardManagerActor = shardManager.underlyingActor();
823         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
824                 RaftState.Follower.name(), RaftState.Candidate.name()));
825
826         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
827
828         // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
829         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
830
831         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
832     }
833
834     @Test
835     public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
836         final Props persistentProps = ShardManager.props(
837                 new MockClusterWrapper(),
838                 new MockConfiguration(),
839                 DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
840         final TestActorRef<ShardManager> shardManager =
841                 TestActorRef.create(getSystem(), persistentProps);
842
843         ShardManager shardManagerActor = shardManager.underlyingActor();
844         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
845                 RaftState.Candidate.name(), RaftState.Follower.name()));
846
847         // Initially will be false
848         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
849
850         // Send status true will make sync status true
851         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
852
853         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
854
855         // Send status false will make sync status false
856         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-default-unknown"));
857
858         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
859
860     }
861
862     @Test
863     public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
864         final Props persistentProps = ShardManager.props(
865                 new MockClusterWrapper(),
866                 new MockConfiguration() {
867                     @Override
868                     public List<String> getMemberShardNames(String memberName) {
869                         return Arrays.asList("default", "astronauts");
870                     }
871                 },
872                 DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
873         final TestActorRef<ShardManager> shardManager =
874                 TestActorRef.create(getSystem(), persistentProps);
875
876         ShardManager shardManagerActor = shardManager.underlyingActor();
877
878         // Initially will be false
879         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
880
881         // Make default shard leader
882         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
883                 RaftState.Follower.name(), RaftState.Leader.name()));
884
885         // default = Leader, astronauts is unknown so sync status remains false
886         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
887
888         // Make astronauts shard leader as well
889         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
890                 RaftState.Follower.name(), RaftState.Leader.name()));
891
892         // Now sync status should be true
893         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
894
895         // Make astronauts a Follower
896         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
897                 RaftState.Leader.name(), RaftState.Follower.name()));
898
899         // Sync status is not true
900         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
901
902         // Make the astronauts follower sync status true
903         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-astronauts-unknown"));
904
905         // Sync status is now true
906         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
907
908     }
909
910     private static class TestShardManager extends ShardManager {
911         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
912
913         TestShardManager(String shardMrgIDSuffix) {
914             super(new MockClusterWrapper(), new MockConfiguration(),
915                     DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready,
916                     new PrimaryShardInfoFutureCache());
917         }
918
919         @Override
920         public void handleRecover(Object message) throws Exception {
921             try {
922                 super.handleRecover(message);
923             } finally {
924                 if(message instanceof RecoveryCompleted) {
925                     recoveryComplete.countDown();
926                 }
927             }
928         }
929
930         void waitForRecoveryComplete() {
931             assertEquals("Recovery complete", true,
932                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
933         }
934     }
935
936     @SuppressWarnings("serial")
937     static class TestShardManagerCreator implements Creator<TestShardManager> {
938         String shardMrgIDSuffix;
939
940         TestShardManagerCreator(String shardMrgIDSuffix) {
941             this.shardMrgIDSuffix = shardMrgIDSuffix;
942         }
943
944         @Override
945         public TestShardManager create() throws Exception {
946             return new TestShardManager(shardMrgIDSuffix);
947         }
948
949     }
950
951     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
952         private static final long serialVersionUID = 1L;
953         private final Creator<ShardManager> delegate;
954
955         public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
956             this.delegate = delegate;
957         }
958
959         @Override
960         public ShardManager create() throws Exception {
961             return delegate.create();
962         }
963     }
964
965     private static class ForwardingShardManager extends ShardManager {
966         private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
967         private CountDownLatch memberUpReceived = new CountDownLatch(1);
968         private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
969         private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
970         private CountDownLatch memberReachableReceived = new CountDownLatch(1);
971         private final ActorRef shardActor;
972         private final String name;
973
974         protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration,
975                 DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, String name,
976                 ActorRef shardActor, PrimaryShardInfoFutureCache primaryShardInfoCache) {
977             super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch, primaryShardInfoCache);
978             this.shardActor = shardActor;
979             this.name = name;
980         }
981
982         @Override
983         public void handleCommand(Object message) throws Exception {
984             try{
985                 super.handleCommand(message);
986             } finally {
987                 if(message instanceof FindPrimary) {
988                     findPrimaryMessageReceived.countDown();
989                 } else if(message instanceof ClusterEvent.MemberUp) {
990                     String role = ((ClusterEvent.MemberUp)message).member().roles().head();
991                     if(!getCluster().getCurrentMemberName().equals(role)) {
992                         memberUpReceived.countDown();
993                     }
994                 } else if(message instanceof ClusterEvent.MemberRemoved) {
995                     String role = ((ClusterEvent.MemberRemoved)message).member().roles().head();
996                     if(!getCluster().getCurrentMemberName().equals(role)) {
997                         memberRemovedReceived.countDown();
998                     }
999                 } else if(message instanceof ClusterEvent.UnreachableMember) {
1000                     String role = ((ClusterEvent.UnreachableMember)message).member().roles().head();
1001                     if(!getCluster().getCurrentMemberName().equals(role)) {
1002                         memberUnreachableReceived.countDown();
1003                     }
1004                 } else if(message instanceof ClusterEvent.ReachableMember) {
1005                     String role = ((ClusterEvent.ReachableMember)message).member().roles().head();
1006                     if(!getCluster().getCurrentMemberName().equals(role)) {
1007                         memberReachableReceived.countDown();
1008                     }
1009                 }
1010             }
1011         }
1012
1013         @Override
1014         public String persistenceId() {
1015             return name;
1016         }
1017
1018         @Override
1019         protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
1020             return shardActor;
1021         }
1022
1023         void waitForMemberUp() {
1024             assertEquals("MemberUp received", true,
1025                     Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
1026             memberUpReceived = new CountDownLatch(1);
1027         }
1028
1029         void waitForMemberRemoved() {
1030             assertEquals("MemberRemoved received", true,
1031                     Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
1032             memberRemovedReceived = new CountDownLatch(1);
1033         }
1034
1035         void waitForUnreachableMember() {
1036             assertEquals("UnreachableMember received", true,
1037                 Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS
1038                 ));
1039             memberUnreachableReceived = new CountDownLatch(1);
1040         }
1041
1042         void waitForReachableMember() {
1043             assertEquals("ReachableMember received", true,
1044                 Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
1045             memberReachableReceived = new CountDownLatch(1);
1046         }
1047
1048         void verifyFindPrimary() {
1049             assertEquals("FindPrimary received", true,
1050                     Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
1051             findPrimaryMessageReceived = new CountDownLatch(1);
1052         }
1053     }
1054 }