CDS: Retry remote front-end transactions on AskTimeoutException
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardManagerTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNull;
5 import static org.junit.Assert.assertSame;
6 import static org.junit.Assert.assertTrue;
7 import static org.mockito.Mockito.mock;
8 import static org.mockito.Mockito.never;
9 import static org.mockito.Mockito.times;
10 import static org.mockito.Mockito.verify;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSystem;
13 import akka.actor.AddressFromURIString;
14 import akka.actor.Props;
15 import akka.cluster.Cluster;
16 import akka.cluster.ClusterEvent;
17 import akka.dispatch.Dispatchers;
18 import akka.japi.Creator;
19 import akka.pattern.Patterns;
20 import akka.persistence.RecoveryCompleted;
21 import akka.testkit.JavaTestKit;
22 import akka.testkit.TestActorRef;
23 import akka.util.Timeout;
24 import com.google.common.base.Optional;
25 import com.google.common.collect.ImmutableMap;
26 import com.google.common.collect.ImmutableSet;
27 import com.google.common.util.concurrent.Uninterruptibles;
28 import com.typesafe.config.ConfigFactory;
29 import java.util.Arrays;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.TimeUnit;
34 import org.junit.After;
35 import org.junit.Before;
36 import org.junit.Test;
37 import org.mockito.Mock;
38 import org.mockito.MockitoAnnotations;
39 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
40 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
41 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
42 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
43 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
44 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
45 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
46 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
47 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
48 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
49 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
50 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
51 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
52 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
53 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
54 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
55 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
56 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
57 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
58 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
59 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
60 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
61 import org.opendaylight.controller.cluster.raft.RaftState;
62 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
63 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
64 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
65 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
66 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
67 import scala.concurrent.Await;
68 import scala.concurrent.Future;
69 import scala.concurrent.duration.FiniteDuration;
70
71 public class ShardManagerTest extends AbstractActorTest {
72     private static int ID_COUNTER = 1;
73
74     private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
75     private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix;
76
77     @Mock
78     private static CountDownLatch ready;
79
80     private static TestActorRef<MessageCollectorActor> mockShardActor;
81
82     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
83             dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
84                    .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
85
86     private static ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
87         String name = new ShardIdentifier(shardName, memberName,"config").toString();
88         return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
89     }
90
91     private final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
92
93     @Before
94     public void setUp() {
95         MockitoAnnotations.initMocks(this);
96
97         InMemoryJournal.clear();
98
99         if(mockShardActor == null) {
100             String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString();
101             mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), name);
102         }
103
104         mockShardActor.underlyingActor().clear();
105     }
106
107     @After
108     public void tearDown() {
109         InMemoryJournal.clear();
110     }
111
112     private Props newShardMgrProps(boolean persistent) {
113         return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
114                 datastoreContextBuilder.persistent(persistent).build(), ready, primaryShardInfoCache);
115     }
116
117     private Props newPropsShardMgrWithMockShardActor() {
118         return newPropsShardMgrWithMockShardActor("shardManager", mockShardActor, new MockClusterWrapper(),
119                 new MockConfiguration());
120     }
121
122     private Props newPropsShardMgrWithMockShardActor(final String name, final ActorRef shardActor,
123             final ClusterWrapper clusterWrapper, final Configuration config) {
124         Creator<ShardManager> creator = new Creator<ShardManager>() {
125             private static final long serialVersionUID = 1L;
126             @Override
127             public ShardManager create() throws Exception {
128                 return new ForwardingShardManager(clusterWrapper, config, datastoreContextBuilder.build(),
129                         ready, name, shardActor, primaryShardInfoCache);
130             }
131         };
132
133         return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
134     }
135
136     @Test
137     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
138         new JavaTestKit(getSystem()) {{
139             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
140
141             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
142
143             shardManager.tell(new FindPrimary("non-existent", false), getRef());
144
145             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
146         }};
147     }
148
149     @Test
150     public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception {
151         new JavaTestKit(getSystem()) {{
152             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
153
154             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
155
156             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
157             shardManager.tell(new ActorInitialized(), mockShardActor);
158
159             DataTree mockDataTree = mock(DataTree.class);
160             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
161                     DataStoreVersions.CURRENT_VERSION), getRef());
162
163             MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
164             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
165                     RaftState.Leader.name())), mockShardActor);
166
167             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
168
169             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
170             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
171                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
172             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
173         }};
174     }
175
176     @Test
177     public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() throws Exception {
178         new JavaTestKit(getSystem()) {{
179             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
180
181             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
182             shardManager.tell(new ActorInitialized(), mockShardActor);
183
184             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
185             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
186             shardManager.tell(new RoleChangeNotification(memberId1,
187                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
188             shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION), mockShardActor);
189
190             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
191
192             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
193         }};
194     }
195
196     @Test
197     public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception {
198         new JavaTestKit(getSystem()) {{
199             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
200
201             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
202             shardManager.tell(new ActorInitialized(), mockShardActor);
203
204             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
205             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
206
207             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
208             shardManager.tell(new RoleChangeNotification(memberId1,
209                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
210             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
211             shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, Optional.<DataTree>absent(),
212                     leaderVersion), mockShardActor);
213
214             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
215
216             RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
217             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
218                     primaryFound.getPrimaryPath().contains("member-2-shard-default"));
219             assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
220         }};
221     }
222
223     @Test
224     public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception {
225         new JavaTestKit(getSystem()) {{
226             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
227
228             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
229
230             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
231         }};
232     }
233
234     @Test
235     public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception {
236         new JavaTestKit(getSystem()) {{
237             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
238
239             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
240             shardManager.tell(new ActorInitialized(), mockShardActor);
241
242             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
243
244             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
245         }};
246     }
247
248     @Test
249     public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception {
250         new JavaTestKit(getSystem()) {{
251             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
252
253             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
254             shardManager.tell(new ActorInitialized(), mockShardActor);
255
256             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
257             shardManager.tell(new RoleChangeNotification(memberId,
258                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
259
260             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
261
262             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
263
264             DataTree mockDataTree = mock(DataTree.class);
265             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
266                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
267
268             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
269
270             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
271             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
272                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
273             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
274         }};
275     }
276
277     @Test
278     public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
279         new JavaTestKit(getSystem()) {{
280             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
281
282             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
283
284             // We're passing waitUntilInitialized = true to FindPrimary so the response should be
285             // delayed until we send ActorInitialized and RoleChangeNotification.
286             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
287
288             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
289
290             shardManager.tell(new ActorInitialized(), mockShardActor);
291
292             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
293
294             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
295             shardManager.tell(new RoleChangeNotification(memberId,
296                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor);
297
298             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
299
300             DataTree mockDataTree = mock(DataTree.class);
301             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
302                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
303
304             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
305             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
306                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
307             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
308
309             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
310         }};
311     }
312
313     @Test
314     public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception {
315         new JavaTestKit(getSystem()) {{
316             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
317
318             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
319
320             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
321
322             expectMsgClass(duration("2 seconds"), NotInitializedException.class);
323
324             shardManager.tell(new ActorInitialized(), mockShardActor);
325
326             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
327         }};
328     }
329
330     @Test
331     public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception {
332         new JavaTestKit(getSystem()) {{
333             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
334
335             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
336             shardManager.tell(new ActorInitialized(), mockShardActor);
337             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
338                     null, RaftState.Candidate.name()), mockShardActor);
339
340             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
341
342             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
343         }};
344     }
345
346     @Test
347     public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() throws Exception {
348         new JavaTestKit(getSystem()) {{
349             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
350
351             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
352             shardManager.tell(new ActorInitialized(), mockShardActor);
353             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
354                     null, RaftState.IsolatedLeader.name()), mockShardActor);
355
356             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
357
358             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
359         }};
360     }
361
362     @Test
363     public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
364         new JavaTestKit(getSystem()) {{
365             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
366
367             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
368             shardManager.tell(new ActorInitialized(), mockShardActor);
369
370             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
371
372             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
373         }};
374     }
375
376     @Test
377     public void testOnReceiveFindPrimaryForRemoteShard() throws Exception {
378         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
379
380         // Create an ActorSystem ShardManager actor for member-1.
381
382         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
383         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
384
385         ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
386
387         final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
388                 newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
389                         new MockConfiguration()), shardManagerID);
390
391         // Create an ActorSystem ShardManager actor for member-2.
392
393         final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
394
395         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
396
397         final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
398
399         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
400                 put("default", Arrays.asList("member-1", "member-2")).
401                 put("astronauts", Arrays.asList("member-2")).build());
402
403         final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
404                 newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
405                         mockConfig2), shardManagerID);
406
407         new JavaTestKit(system1) {{
408
409             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
410             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
411
412             shardManager2.tell(new ActorInitialized(), mockShardActor2);
413
414             String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
415             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
416             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2,
417                     Optional.of(mock(DataTree.class)), leaderVersion), mockShardActor2);
418             shardManager2.tell(new RoleChangeNotification(memberId2,
419                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
420
421             shardManager1.underlyingActor().waitForMemberUp();
422
423             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
424
425             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
426             String path = found.getPrimaryPath();
427             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
428             assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
429
430             shardManager2.underlyingActor().verifyFindPrimary();
431
432             Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
433
434             shardManager1.underlyingActor().waitForMemberRemoved();
435
436             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
437
438             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
439         }};
440
441         JavaTestKit.shutdownActorSystem(system1);
442         JavaTestKit.shutdownActorSystem(system2);
443     }
444
445     @Test
446     public void testShardAvailabilityOnChangeOfMemberReachability() throws Exception {
447         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
448
449         // Create an ActorSystem ShardManager actor for member-1.
450
451         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
452         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
453
454         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
455
456         final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
457             newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
458                 new MockConfiguration()), shardManagerID);
459
460         // Create an ActorSystem ShardManager actor for member-2.
461
462         final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
463
464         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
465
466         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
467
468         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
469             put("default", Arrays.asList("member-1", "member-2")).build());
470
471         final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
472             newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
473                 mockConfig2), shardManagerID);
474
475         new JavaTestKit(system1) {{
476
477             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
478             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
479             shardManager1.tell(new ActorInitialized(), mockShardActor1);
480             shardManager2.tell(new ActorInitialized(), mockShardActor2);
481
482             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
483             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
484             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
485                 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
486             shardManager1.tell(new RoleChangeNotification(memberId1,
487                 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
488             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)),
489                     DataStoreVersions.CURRENT_VERSION),
490                 mockShardActor2);
491             shardManager2.tell(new RoleChangeNotification(memberId2,
492                 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
493             shardManager1.underlyingActor().waitForMemberUp();
494
495             shardManager1.tell(new FindPrimary("default", true), getRef());
496
497             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
498             String path = found.getPrimaryPath();
499             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
500
501             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
502                 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
503
504             shardManager1.underlyingActor().waitForUnreachableMember();
505
506             shardManager1.tell(new FindPrimary("default", true), getRef());
507
508             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
509
510             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
511                 createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
512
513             shardManager1.underlyingActor().waitForReachableMember();
514
515             shardManager1.tell(new FindPrimary("default", true), getRef());
516
517             RemotePrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
518             String path1 = found1.getPrimaryPath();
519             assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
520
521         }};
522
523         JavaTestKit.shutdownActorSystem(system1);
524         JavaTestKit.shutdownActorSystem(system2);
525     }
526
527     @Test
528     public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() throws Exception {
529         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
530
531         // Create an ActorSystem ShardManager actor for member-1.
532
533         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
534         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
535
536         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
537
538         final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
539             newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
540                 new MockConfiguration()), shardManagerID);
541
542         // Create an ActorSystem ShardManager actor for member-2.
543
544         final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
545
546         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
547
548         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
549
550         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
551             put("default", Arrays.asList("member-1", "member-2")).build());
552
553         final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
554             newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
555                 mockConfig2), shardManagerID);
556
557         new JavaTestKit(system1) {{
558
559             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
560             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
561             shardManager1.tell(new ActorInitialized(), mockShardActor1);
562             shardManager2.tell(new ActorInitialized(), mockShardActor2);
563
564             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
565             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
566             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
567                 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
568             shardManager1.tell(new RoleChangeNotification(memberId1,
569                 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
570             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)),
571                     DataStoreVersions.CURRENT_VERSION),
572                 mockShardActor2);
573             shardManager2.tell(new RoleChangeNotification(memberId2,
574                 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
575             shardManager1.underlyingActor().waitForMemberUp();
576
577             shardManager1.tell(new FindPrimary("default", true), getRef());
578
579             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
580             String path = found.getPrimaryPath();
581             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
582
583             primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(system1.actorSelection(
584                     mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION, Optional.<DataTree>absent()));
585
586             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
587                 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
588
589             shardManager1.underlyingActor().waitForUnreachableMember();
590
591             shardManager1.tell(new FindPrimary("default", true), getRef());
592
593             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
594
595             assertNull("Expected primaryShardInfoCache entry removed", primaryShardInfoCache.getIfPresent("default"));
596
597             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, Optional.of(mock(DataTree.class)),
598                     DataStoreVersions.CURRENT_VERSION), mockShardActor1);
599             shardManager1.tell(new RoleChangeNotification(memberId1,
600                 RaftState.Follower.name(), RaftState.Leader.name()), mockShardActor1);
601
602             shardManager1.tell(new FindPrimary("default", true), getRef());
603
604             LocalPrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
605             String path1 = found1.getPrimaryPath();
606             assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
607
608         }};
609
610         JavaTestKit.shutdownActorSystem(system1);
611         JavaTestKit.shutdownActorSystem(system2);
612     }
613
614
615     @Test
616     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
617         new JavaTestKit(getSystem()) {{
618             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
619
620             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
621
622             shardManager.tell(new FindLocalShard("non-existent", false), getRef());
623
624             LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
625
626             assertEquals("getShardName", "non-existent", notFound.getShardName());
627         }};
628     }
629
630     @Test
631     public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
632         new JavaTestKit(getSystem()) {{
633             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
634
635             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
636             shardManager.tell(new ActorInitialized(), mockShardActor);
637
638             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
639
640             LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
641
642             assertTrue("Found path contains " + found.getPath().path().toString(),
643                     found.getPath().path().toString().contains("member-1-shard-default-config"));
644         }};
645     }
646
647     @Test
648     public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
649         new JavaTestKit(getSystem()) {{
650             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
651
652             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
653
654             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
655         }};
656     }
657
658     @Test
659     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
660         new JavaTestKit(getSystem()) {{
661             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
662
663             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
664
665             // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
666             // delayed until we send ActorInitialized.
667             Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
668                     new Timeout(5, TimeUnit.SECONDS));
669
670             shardManager.tell(new ActorInitialized(), mockShardActor);
671
672             Object resp = Await.result(future, duration("5 seconds"));
673             assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
674         }};
675     }
676
677     @Test
678     public void testOnRecoveryJournalIsCleaned() {
679         InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
680                 ImmutableSet.of("foo")));
681         InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules(
682                 ImmutableSet.of("bar")));
683         InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
684
685         new JavaTestKit(getSystem()) {{
686             TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
687                     Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
688
689             shardManager.underlyingActor().waitForRecoveryComplete();
690             InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
691
692             // Journal entries up to the last one should've been deleted
693             Map<Long, Object> journal = InMemoryJournal.get(shardMgrID);
694             synchronized (journal) {
695                 assertEquals("Journal size", 0, journal.size());
696             }
697         }};
698     }
699
700     @Test
701     public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
702         new JavaTestKit(getSystem()) {
703             {
704                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
705
706                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
707                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
708                         memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
709
710                 verify(ready, never()).countDown();
711
712                 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
713                         Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION));
714
715                 verify(ready, times(1)).countDown();
716
717             }};
718     }
719
720     @Test
721     public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
722         new JavaTestKit(getSystem()) {
723             {
724                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
725
726                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
727                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
728                         memberId, null, RaftState.Follower.name()));
729
730                 verify(ready, never()).countDown();
731
732                 shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
733
734                 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId,
735                         "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
736                         DataStoreVersions.CURRENT_VERSION));
737
738                 verify(ready, times(1)).countDown();
739
740             }};
741     }
742
743     @Test
744     public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
745         new JavaTestKit(getSystem()) {
746             {
747                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
748
749                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
750                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
751                         memberId, null, RaftState.Follower.name()));
752
753                 verify(ready, never()).countDown();
754
755                 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId,
756                         "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
757                         DataStoreVersions.CURRENT_VERSION));
758
759                 shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
760
761                 verify(ready, times(1)).countDown();
762
763             }};
764     }
765
766     @Test
767     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
768         new JavaTestKit(getSystem()) {
769             {
770                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
771
772                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
773                         "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
774
775                 verify(ready, never()).countDown();
776
777             }};
778     }
779
780
781     @Test
782     public void testByDefaultSyncStatusIsFalse() throws Exception{
783         final Props persistentProps = newShardMgrProps(true);
784         final TestActorRef<ShardManager> shardManager =
785                 TestActorRef.create(getSystem(), persistentProps);
786
787         ShardManager shardManagerActor = shardManager.underlyingActor();
788
789         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
790     }
791
792     @Test
793     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
794         final Props persistentProps = ShardManager.props(
795                 new MockClusterWrapper(),
796                 new MockConfiguration(),
797                 DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
798         final TestActorRef<ShardManager> shardManager =
799                 TestActorRef.create(getSystem(), persistentProps);
800
801         ShardManager shardManagerActor = shardManager.underlyingActor();
802         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
803                 RaftState.Follower.name(), RaftState.Leader.name()));
804
805         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
806     }
807
808     @Test
809     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
810         final Props persistentProps = newShardMgrProps(true);
811         final TestActorRef<ShardManager> shardManager =
812                 TestActorRef.create(getSystem(), persistentProps);
813
814         ShardManager shardManagerActor = shardManager.underlyingActor();
815         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
816                 RaftState.Follower.name(), RaftState.Candidate.name()));
817
818         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
819
820         // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
821         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
822
823         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
824     }
825
826     @Test
827     public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
828         final Props persistentProps = ShardManager.props(
829                 new MockClusterWrapper(),
830                 new MockConfiguration(),
831                 DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
832         final TestActorRef<ShardManager> shardManager =
833                 TestActorRef.create(getSystem(), persistentProps);
834
835         ShardManager shardManagerActor = shardManager.underlyingActor();
836         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
837                 RaftState.Candidate.name(), RaftState.Follower.name()));
838
839         // Initially will be false
840         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
841
842         // Send status true will make sync status true
843         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
844
845         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
846
847         // Send status false will make sync status false
848         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-default-unknown"));
849
850         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
851
852     }
853
854     @Test
855     public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
856         final Props persistentProps = ShardManager.props(
857                 new MockClusterWrapper(),
858                 new MockConfiguration() {
859                     @Override
860                     public List<String> getMemberShardNames(String memberName) {
861                         return Arrays.asList("default", "astronauts");
862                     }
863                 },
864                 DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
865         final TestActorRef<ShardManager> shardManager =
866                 TestActorRef.create(getSystem(), persistentProps);
867
868         ShardManager shardManagerActor = shardManager.underlyingActor();
869
870         // Initially will be false
871         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
872
873         // Make default shard leader
874         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
875                 RaftState.Follower.name(), RaftState.Leader.name()));
876
877         // default = Leader, astronauts is unknown so sync status remains false
878         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
879
880         // Make astronauts shard leader as well
881         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
882                 RaftState.Follower.name(), RaftState.Leader.name()));
883
884         // Now sync status should be true
885         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
886
887         // Make astronauts a Follower
888         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
889                 RaftState.Leader.name(), RaftState.Follower.name()));
890
891         // Sync status is not true
892         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
893
894         // Make the astronauts follower sync status true
895         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-astronauts-unknown"));
896
897         // Sync status is now true
898         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
899
900     }
901
902     private static class TestShardManager extends ShardManager {
903         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
904
905         TestShardManager(String shardMrgIDSuffix) {
906             super(new MockClusterWrapper(), new MockConfiguration(),
907                     DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready,
908                     new PrimaryShardInfoFutureCache());
909         }
910
911         @Override
912         public void handleRecover(Object message) throws Exception {
913             try {
914                 super.handleRecover(message);
915             } finally {
916                 if(message instanceof RecoveryCompleted) {
917                     recoveryComplete.countDown();
918                 }
919             }
920         }
921
922         void waitForRecoveryComplete() {
923             assertEquals("Recovery complete", true,
924                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
925         }
926     }
927
928     @SuppressWarnings("serial")
929     static class TestShardManagerCreator implements Creator<TestShardManager> {
930         String shardMrgIDSuffix;
931
932         TestShardManagerCreator(String shardMrgIDSuffix) {
933             this.shardMrgIDSuffix = shardMrgIDSuffix;
934         }
935
936         @Override
937         public TestShardManager create() throws Exception {
938             return new TestShardManager(shardMrgIDSuffix);
939         }
940
941     }
942
943     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
944         private static final long serialVersionUID = 1L;
945         private final Creator<ShardManager> delegate;
946
947         public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
948             this.delegate = delegate;
949         }
950
951         @Override
952         public ShardManager create() throws Exception {
953             return delegate.create();
954         }
955     }
956
957     private static class ForwardingShardManager extends ShardManager {
958         private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
959         private CountDownLatch memberUpReceived = new CountDownLatch(1);
960         private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
961         private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
962         private CountDownLatch memberReachableReceived = new CountDownLatch(1);
963         private final ActorRef shardActor;
964         private final String name;
965
966         protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration,
967                 DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, String name,
968                 ActorRef shardActor, PrimaryShardInfoFutureCache primaryShardInfoCache) {
969             super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch, primaryShardInfoCache);
970             this.shardActor = shardActor;
971             this.name = name;
972         }
973
974         @Override
975         public void handleCommand(Object message) throws Exception {
976             try{
977                 super.handleCommand(message);
978             } finally {
979                 if(message instanceof FindPrimary) {
980                     findPrimaryMessageReceived.countDown();
981                 } else if(message instanceof ClusterEvent.MemberUp) {
982                     String role = ((ClusterEvent.MemberUp)message).member().roles().head();
983                     if(!getCluster().getCurrentMemberName().equals(role)) {
984                         memberUpReceived.countDown();
985                     }
986                 } else if(message instanceof ClusterEvent.MemberRemoved) {
987                     String role = ((ClusterEvent.MemberRemoved)message).member().roles().head();
988                     if(!getCluster().getCurrentMemberName().equals(role)) {
989                         memberRemovedReceived.countDown();
990                     }
991                 } else if(message instanceof ClusterEvent.UnreachableMember) {
992                     String role = ((ClusterEvent.UnreachableMember)message).member().roles().head();
993                     if(!getCluster().getCurrentMemberName().equals(role)) {
994                         memberUnreachableReceived.countDown();
995                     }
996                 } else if(message instanceof ClusterEvent.ReachableMember) {
997                     String role = ((ClusterEvent.ReachableMember)message).member().roles().head();
998                     if(!getCluster().getCurrentMemberName().equals(role)) {
999                         memberReachableReceived.countDown();
1000                     }
1001                 }
1002             }
1003         }
1004
1005         @Override
1006         public String persistenceId() {
1007             return name;
1008         }
1009
1010         @Override
1011         protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
1012             return shardActor;
1013         }
1014
1015         void waitForMemberUp() {
1016             assertEquals("MemberUp received", true,
1017                     Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
1018             memberUpReceived = new CountDownLatch(1);
1019         }
1020
1021         void waitForMemberRemoved() {
1022             assertEquals("MemberRemoved received", true,
1023                     Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
1024             memberRemovedReceived = new CountDownLatch(1);
1025         }
1026
1027         void waitForUnreachableMember() {
1028             assertEquals("UnreachableMember received", true,
1029                 Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS
1030                 ));
1031             memberUnreachableReceived = new CountDownLatch(1);
1032         }
1033
1034         void waitForReachableMember() {
1035             assertEquals("ReachableMember received", true,
1036                 Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
1037             memberReachableReceived = new CountDownLatch(1);
1038         }
1039
1040         void verifyFindPrimary() {
1041             assertEquals("FindPrimary received", true,
1042                     Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
1043             findPrimaryMessageReceived = new CountDownLatch(1);
1044         }
1045     }
1046 }