Bug 3020: Add leader version to LeaderStateChanged
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardManagerTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNull;
5 import static org.junit.Assert.assertSame;
6 import static org.junit.Assert.assertTrue;
7 import static org.mockito.Mockito.mock;
8 import static org.mockito.Mockito.never;
9 import static org.mockito.Mockito.times;
10 import static org.mockito.Mockito.verify;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSystem;
13 import akka.actor.AddressFromURIString;
14 import akka.actor.Props;
15 import akka.cluster.Cluster;
16 import akka.cluster.ClusterEvent;
17 import akka.dispatch.Dispatchers;
18 import akka.japi.Creator;
19 import akka.pattern.Patterns;
20 import akka.persistence.RecoveryCompleted;
21 import akka.testkit.JavaTestKit;
22 import akka.testkit.TestActorRef;
23 import akka.util.Timeout;
24 import com.google.common.base.Optional;
25 import com.google.common.collect.ImmutableMap;
26 import com.google.common.collect.ImmutableSet;
27 import com.google.common.util.concurrent.Uninterruptibles;
28 import com.typesafe.config.ConfigFactory;
29 import java.util.Arrays;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.TimeUnit;
34 import org.junit.After;
35 import org.junit.Before;
36 import org.junit.Test;
37 import org.mockito.Mock;
38 import org.mockito.MockitoAnnotations;
39 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
40 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
41 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
42 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
43 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
44 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
45 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
46 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
47 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
48 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
49 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
50 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
51 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
52 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
53 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
54 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
55 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
56 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
57 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
58 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
59 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
60 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
61 import org.opendaylight.controller.cluster.raft.RaftState;
62 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
63 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
64 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
65 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
66 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
67 import scala.concurrent.Await;
68 import scala.concurrent.Future;
69 import scala.concurrent.duration.FiniteDuration;
70
71 public class ShardManagerTest extends AbstractActorTest {
72     private static int ID_COUNTER = 1;
73
74     private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
75     private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix;
76
77     @Mock
78     private static CountDownLatch ready;
79
80     private static TestActorRef<MessageCollectorActor> mockShardActor;
81
82     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
83             dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS);
84
85     private static ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
86         String name = new ShardIdentifier(shardName, memberName,"config").toString();
87         return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
88     }
89
90     private final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
91
92     @Before
93     public void setUp() {
94         MockitoAnnotations.initMocks(this);
95
96         InMemoryJournal.clear();
97
98         if(mockShardActor == null) {
99             String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString();
100             mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), name);
101         }
102
103         mockShardActor.underlyingActor().clear();
104     }
105
106     @After
107     public void tearDown() {
108         InMemoryJournal.clear();
109     }
110
111     private Props newShardMgrProps(boolean persistent) {
112         return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
113                 datastoreContextBuilder.persistent(persistent).build(), ready, primaryShardInfoCache);
114     }
115
116     private Props newPropsShardMgrWithMockShardActor() {
117         return newPropsShardMgrWithMockShardActor("shardManager", mockShardActor, new MockClusterWrapper(),
118                 new MockConfiguration());
119     }
120
121     private Props newPropsShardMgrWithMockShardActor(final String name, final ActorRef shardActor,
122             final ClusterWrapper clusterWrapper, final Configuration config) {
123         Creator<ShardManager> creator = new Creator<ShardManager>() {
124             private static final long serialVersionUID = 1L;
125             @Override
126             public ShardManager create() throws Exception {
127                 return new ForwardingShardManager(clusterWrapper, config, datastoreContextBuilder.build(),
128                         ready, name, shardActor, primaryShardInfoCache);
129             }
130         };
131
132         return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
133     }
134
135     @Test
136     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
137         new JavaTestKit(getSystem()) {{
138             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
139
140             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
141
142             shardManager.tell(new FindPrimary("non-existent", false), getRef());
143
144             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
145         }};
146     }
147
148     @Test
149     public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception {
150         new JavaTestKit(getSystem()) {{
151             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
152
153             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
154
155             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
156             shardManager.tell(new ActorInitialized(), mockShardActor);
157
158             DataTree mockDataTree = mock(DataTree.class);
159             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
160                     DataStoreVersions.CURRENT_VERSION), getRef());
161
162             MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
163             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
164                     RaftState.Leader.name())), mockShardActor);
165
166             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
167
168             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
169             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
170                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
171             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
172         }};
173     }
174
175     @Test
176     public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() throws Exception {
177         new JavaTestKit(getSystem()) {{
178             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
179
180             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
181             shardManager.tell(new ActorInitialized(), mockShardActor);
182
183             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
184             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
185             shardManager.tell(new RoleChangeNotification(memberId1,
186                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
187             shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION), mockShardActor);
188
189             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
190
191             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
192         }};
193     }
194
195     @Test
196     public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception {
197         new JavaTestKit(getSystem()) {{
198             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
199
200             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
201             shardManager.tell(new ActorInitialized(), mockShardActor);
202
203             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
204             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
205
206             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
207             shardManager.tell(new RoleChangeNotification(memberId1,
208                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
209             shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, Optional.<DataTree>absent(),
210                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
211
212             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
213
214             RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
215             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
216                     primaryFound.getPrimaryPath().contains("member-2-shard-default"));
217         }};
218     }
219
220     @Test
221     public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception {
222         new JavaTestKit(getSystem()) {{
223             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
224
225             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
226
227             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
228         }};
229     }
230
231     @Test
232     public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception {
233         new JavaTestKit(getSystem()) {{
234             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
235
236             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
237             shardManager.tell(new ActorInitialized(), mockShardActor);
238
239             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
240
241             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
242         }};
243     }
244
245     @Test
246     public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception {
247         new JavaTestKit(getSystem()) {{
248             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
249
250             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
251             shardManager.tell(new ActorInitialized(), mockShardActor);
252
253             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
254             shardManager.tell(new RoleChangeNotification(memberId,
255                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
256
257             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
258
259             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
260
261             DataTree mockDataTree = mock(DataTree.class);
262             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
263                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
264
265             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
266
267             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
268             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
269                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
270             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
271         }};
272     }
273
274     @Test
275     public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
276         new JavaTestKit(getSystem()) {{
277             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
278
279             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
280
281             // We're passing waitUntilInitialized = true to FindPrimary so the response should be
282             // delayed until we send ActorInitialized and RoleChangeNotification.
283             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
284
285             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
286
287             shardManager.tell(new ActorInitialized(), mockShardActor);
288
289             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
290
291             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
292             shardManager.tell(new RoleChangeNotification(memberId,
293                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor);
294
295             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
296
297             DataTree mockDataTree = mock(DataTree.class);
298             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
299                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
300
301             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
302             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
303                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
304             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
305
306             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
307         }};
308     }
309
310     @Test
311     public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception {
312         new JavaTestKit(getSystem()) {{
313             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
314
315             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
316
317             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
318
319             expectMsgClass(duration("2 seconds"), NotInitializedException.class);
320
321             shardManager.tell(new ActorInitialized(), mockShardActor);
322
323             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
324         }};
325     }
326
327     @Test
328     public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception {
329         new JavaTestKit(getSystem()) {{
330             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
331
332             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
333             shardManager.tell(new ActorInitialized(), mockShardActor);
334             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
335                     null, RaftState.Candidate.name()), mockShardActor);
336
337             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
338
339             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
340         }};
341     }
342
343     @Test
344     public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
345         new JavaTestKit(getSystem()) {{
346             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
347
348             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
349             shardManager.tell(new ActorInitialized(), mockShardActor);
350
351             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
352
353             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
354         }};
355     }
356
357     @Test
358     public void testOnReceiveFindPrimaryForRemoteShard() throws Exception {
359         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
360
361         // Create an ActorSystem ShardManager actor for member-1.
362
363         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
364         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
365
366         ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
367
368         final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
369                 newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
370                         new MockConfiguration()), shardManagerID);
371
372         // Create an ActorSystem ShardManager actor for member-2.
373
374         final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
375
376         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
377
378         final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
379
380         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
381                 put("default", Arrays.asList("member-1", "member-2")).
382                 put("astronauts", Arrays.asList("member-2")).build());
383
384         final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
385                 newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
386                         mockConfig2), shardManagerID);
387
388         new JavaTestKit(system1) {{
389
390             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
391             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
392
393             shardManager2.tell(new ActorInitialized(), mockShardActor2);
394
395             String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
396             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2,
397                     Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor2);
398             shardManager2.tell(new RoleChangeNotification(memberId2,
399                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
400
401             shardManager1.underlyingActor().waitForMemberUp();
402
403             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
404
405             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
406             String path = found.getPrimaryPath();
407             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
408
409             shardManager2.underlyingActor().verifyFindPrimary();
410
411             Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
412
413             shardManager1.underlyingActor().waitForMemberRemoved();
414
415             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
416
417             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
418         }};
419
420         JavaTestKit.shutdownActorSystem(system1);
421         JavaTestKit.shutdownActorSystem(system2);
422     }
423
424     @Test
425     public void testShardAvailabilityOnChangeOfMemberReachability() throws Exception {
426         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
427
428         // Create an ActorSystem ShardManager actor for member-1.
429
430         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
431         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
432
433         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
434
435         final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
436             newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
437                 new MockConfiguration()), shardManagerID);
438
439         // Create an ActorSystem ShardManager actor for member-2.
440
441         final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
442
443         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
444
445         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
446
447         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
448             put("default", Arrays.asList("member-1", "member-2")).build());
449
450         final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
451             newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
452                 mockConfig2), shardManagerID);
453
454         new JavaTestKit(system1) {{
455
456             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
457             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
458             shardManager1.tell(new ActorInitialized(), mockShardActor1);
459             shardManager2.tell(new ActorInitialized(), mockShardActor2);
460
461             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
462             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
463             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
464                 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
465             shardManager1.tell(new RoleChangeNotification(memberId1,
466                 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
467             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)),
468                     DataStoreVersions.CURRENT_VERSION),
469                 mockShardActor2);
470             shardManager2.tell(new RoleChangeNotification(memberId2,
471                 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
472             shardManager1.underlyingActor().waitForMemberUp();
473
474             shardManager1.tell(new FindPrimary("default", true), getRef());
475
476             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
477             String path = found.getPrimaryPath();
478             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
479
480             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
481                 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
482
483             shardManager1.underlyingActor().waitForUnreachableMember();
484
485             shardManager1.tell(new FindPrimary("default", true), getRef());
486
487             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
488
489             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
490                 createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
491
492             shardManager1.underlyingActor().waitForReachableMember();
493
494             shardManager1.tell(new FindPrimary("default", true), getRef());
495
496             RemotePrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
497             String path1 = found1.getPrimaryPath();
498             assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
499
500         }};
501
502         JavaTestKit.shutdownActorSystem(system1);
503         JavaTestKit.shutdownActorSystem(system2);
504     }
505
506     @Test
507     public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() throws Exception {
508         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
509
510         // Create an ActorSystem ShardManager actor for member-1.
511
512         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
513         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
514
515         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
516
517         final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
518             newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
519                 new MockConfiguration()), shardManagerID);
520
521         // Create an ActorSystem ShardManager actor for member-2.
522
523         final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
524
525         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
526
527         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
528
529         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
530             put("default", Arrays.asList("member-1", "member-2")).build());
531
532         final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
533             newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
534                 mockConfig2), shardManagerID);
535
536         new JavaTestKit(system1) {{
537
538             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
539             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
540             shardManager1.tell(new ActorInitialized(), mockShardActor1);
541             shardManager2.tell(new ActorInitialized(), mockShardActor2);
542
543             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
544             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
545             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
546                 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
547             shardManager1.tell(new RoleChangeNotification(memberId1,
548                 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
549             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)),
550                     DataStoreVersions.CURRENT_VERSION),
551                 mockShardActor2);
552             shardManager2.tell(new RoleChangeNotification(memberId2,
553                 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
554             shardManager1.underlyingActor().waitForMemberUp();
555
556             shardManager1.tell(new FindPrimary("default", true), getRef());
557
558             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
559             String path = found.getPrimaryPath();
560             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
561
562             primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(system1.actorSelection(
563                     mockShardActor1.path()), Optional.<DataTree>absent()));
564
565             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
566                 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
567
568             shardManager1.underlyingActor().waitForUnreachableMember();
569
570             shardManager1.tell(new FindPrimary("default", true), getRef());
571
572             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
573
574             assertNull("Expected primaryShardInfoCache entry removed", primaryShardInfoCache.getIfPresent("default"));
575
576             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, Optional.of(mock(DataTree.class)),
577                     DataStoreVersions.CURRENT_VERSION), mockShardActor1);
578             shardManager1.tell(new RoleChangeNotification(memberId1,
579                 RaftState.Follower.name(), RaftState.Leader.name()), mockShardActor1);
580
581             shardManager1.tell(new FindPrimary("default", true), getRef());
582
583             LocalPrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
584             String path1 = found1.getPrimaryPath();
585             assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
586
587         }};
588
589         JavaTestKit.shutdownActorSystem(system1);
590         JavaTestKit.shutdownActorSystem(system2);
591     }
592
593
594     @Test
595     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
596         new JavaTestKit(getSystem()) {{
597             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
598
599             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
600
601             shardManager.tell(new FindLocalShard("non-existent", false), getRef());
602
603             LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
604
605             assertEquals("getShardName", "non-existent", notFound.getShardName());
606         }};
607     }
608
609     @Test
610     public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
611         new JavaTestKit(getSystem()) {{
612             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
613
614             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
615             shardManager.tell(new ActorInitialized(), mockShardActor);
616
617             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
618
619             LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
620
621             assertTrue("Found path contains " + found.getPath().path().toString(),
622                     found.getPath().path().toString().contains("member-1-shard-default-config"));
623         }};
624     }
625
626     @Test
627     public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
628         new JavaTestKit(getSystem()) {{
629             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
630
631             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
632
633             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
634         }};
635     }
636
637     @Test
638     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
639         new JavaTestKit(getSystem()) {{
640             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
641
642             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
643
644             // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
645             // delayed until we send ActorInitialized.
646             Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
647                     new Timeout(5, TimeUnit.SECONDS));
648
649             shardManager.tell(new ActorInitialized(), mockShardActor);
650
651             Object resp = Await.result(future, duration("5 seconds"));
652             assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
653         }};
654     }
655
656     @Test
657     public void testOnRecoveryJournalIsCleaned() {
658         InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
659                 ImmutableSet.of("foo")));
660         InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules(
661                 ImmutableSet.of("bar")));
662         InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
663
664         new JavaTestKit(getSystem()) {{
665             TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
666                     Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
667
668             shardManager.underlyingActor().waitForRecoveryComplete();
669             InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
670
671             // Journal entries up to the last one should've been deleted
672             Map<Long, Object> journal = InMemoryJournal.get(shardMgrID);
673             synchronized (journal) {
674                 assertEquals("Journal size", 0, journal.size());
675             }
676         }};
677     }
678
679     @Test
680     public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
681         new JavaTestKit(getSystem()) {
682             {
683                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
684
685                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
686                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
687                         memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
688
689                 verify(ready, never()).countDown();
690
691                 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
692                         Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION));
693
694                 verify(ready, times(1)).countDown();
695
696             }};
697     }
698
699     @Test
700     public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
701         new JavaTestKit(getSystem()) {
702             {
703                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
704
705                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
706                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
707                         memberId, null, RaftState.Follower.name()));
708
709                 verify(ready, never()).countDown();
710
711                 shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
712
713                 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId,
714                         "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
715                         DataStoreVersions.CURRENT_VERSION));
716
717                 verify(ready, times(1)).countDown();
718
719             }};
720     }
721
722     @Test
723     public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
724         new JavaTestKit(getSystem()) {
725             {
726                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
727
728                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
729                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
730                         memberId, null, RaftState.Follower.name()));
731
732                 verify(ready, never()).countDown();
733
734                 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId,
735                         "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
736                         DataStoreVersions.CURRENT_VERSION));
737
738                 shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
739
740                 verify(ready, times(1)).countDown();
741
742             }};
743     }
744
745     @Test
746     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
747         new JavaTestKit(getSystem()) {
748             {
749                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
750
751                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
752                         "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
753
754                 verify(ready, never()).countDown();
755
756             }};
757     }
758
759
760     @Test
761     public void testByDefaultSyncStatusIsFalse() throws Exception{
762         final Props persistentProps = newShardMgrProps(true);
763         final TestActorRef<ShardManager> shardManager =
764                 TestActorRef.create(getSystem(), persistentProps);
765
766         ShardManager shardManagerActor = shardManager.underlyingActor();
767
768         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
769     }
770
771     @Test
772     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
773         final Props persistentProps = ShardManager.props(
774                 new MockClusterWrapper(),
775                 new MockConfiguration(),
776                 DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
777         final TestActorRef<ShardManager> shardManager =
778                 TestActorRef.create(getSystem(), persistentProps);
779
780         ShardManager shardManagerActor = shardManager.underlyingActor();
781         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
782                 RaftState.Follower.name(), RaftState.Leader.name()));
783
784         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
785     }
786
787     @Test
788     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
789         final Props persistentProps = newShardMgrProps(true);
790         final TestActorRef<ShardManager> shardManager =
791                 TestActorRef.create(getSystem(), persistentProps);
792
793         ShardManager shardManagerActor = shardManager.underlyingActor();
794         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
795                 RaftState.Follower.name(), RaftState.Candidate.name()));
796
797         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
798
799         // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
800         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
801
802         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
803     }
804
805     @Test
806     public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
807         final Props persistentProps = ShardManager.props(
808                 new MockClusterWrapper(),
809                 new MockConfiguration(),
810                 DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
811         final TestActorRef<ShardManager> shardManager =
812                 TestActorRef.create(getSystem(), persistentProps);
813
814         ShardManager shardManagerActor = shardManager.underlyingActor();
815         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
816                 RaftState.Candidate.name(), RaftState.Follower.name()));
817
818         // Initially will be false
819         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
820
821         // Send status true will make sync status true
822         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
823
824         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
825
826         // Send status false will make sync status false
827         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-default-unknown"));
828
829         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
830
831     }
832
833     @Test
834     public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
835         final Props persistentProps = ShardManager.props(
836                 new MockClusterWrapper(),
837                 new MockConfiguration() {
838                     @Override
839                     public List<String> getMemberShardNames(String memberName) {
840                         return Arrays.asList("default", "astronauts");
841                     }
842                 },
843                 DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
844         final TestActorRef<ShardManager> shardManager =
845                 TestActorRef.create(getSystem(), persistentProps);
846
847         ShardManager shardManagerActor = shardManager.underlyingActor();
848
849         // Initially will be false
850         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
851
852         // Make default shard leader
853         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
854                 RaftState.Follower.name(), RaftState.Leader.name()));
855
856         // default = Leader, astronauts is unknown so sync status remains false
857         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
858
859         // Make astronauts shard leader as well
860         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
861                 RaftState.Follower.name(), RaftState.Leader.name()));
862
863         // Now sync status should be true
864         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
865
866         // Make astronauts a Follower
867         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
868                 RaftState.Leader.name(), RaftState.Follower.name()));
869
870         // Sync status is not true
871         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
872
873         // Make the astronauts follower sync status true
874         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-astronauts-unknown"));
875
876         // Sync status is now true
877         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
878
879     }
880
881     private static class TestShardManager extends ShardManager {
882         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
883
884         TestShardManager(String shardMrgIDSuffix) {
885             super(new MockClusterWrapper(), new MockConfiguration(),
886                     DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready,
887                     new PrimaryShardInfoFutureCache());
888         }
889
890         @Override
891         public void handleRecover(Object message) throws Exception {
892             try {
893                 super.handleRecover(message);
894             } finally {
895                 if(message instanceof RecoveryCompleted) {
896                     recoveryComplete.countDown();
897                 }
898             }
899         }
900
901         void waitForRecoveryComplete() {
902             assertEquals("Recovery complete", true,
903                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
904         }
905     }
906
907     @SuppressWarnings("serial")
908     static class TestShardManagerCreator implements Creator<TestShardManager> {
909         String shardMrgIDSuffix;
910
911         TestShardManagerCreator(String shardMrgIDSuffix) {
912             this.shardMrgIDSuffix = shardMrgIDSuffix;
913         }
914
915         @Override
916         public TestShardManager create() throws Exception {
917             return new TestShardManager(shardMrgIDSuffix);
918         }
919
920     }
921
922     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
923         private static final long serialVersionUID = 1L;
924         private final Creator<ShardManager> delegate;
925
926         public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
927             this.delegate = delegate;
928         }
929
930         @Override
931         public ShardManager create() throws Exception {
932             return delegate.create();
933         }
934     }
935
936     private static class ForwardingShardManager extends ShardManager {
937         private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
938         private CountDownLatch memberUpReceived = new CountDownLatch(1);
939         private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
940         private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
941         private CountDownLatch memberReachableReceived = new CountDownLatch(1);
942         private final ActorRef shardActor;
943         private final String name;
944
945         protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration,
946                 DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, String name,
947                 ActorRef shardActor, PrimaryShardInfoFutureCache primaryShardInfoCache) {
948             super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch, primaryShardInfoCache);
949             this.shardActor = shardActor;
950             this.name = name;
951         }
952
953         @Override
954         public void handleCommand(Object message) throws Exception {
955             try{
956                 super.handleCommand(message);
957             } finally {
958                 if(message instanceof FindPrimary) {
959                     findPrimaryMessageReceived.countDown();
960                 } else if(message instanceof ClusterEvent.MemberUp) {
961                     String role = ((ClusterEvent.MemberUp)message).member().roles().head();
962                     if(!getCluster().getCurrentMemberName().equals(role)) {
963                         memberUpReceived.countDown();
964                     }
965                 } else if(message instanceof ClusterEvent.MemberRemoved) {
966                     String role = ((ClusterEvent.MemberRemoved)message).member().roles().head();
967                     if(!getCluster().getCurrentMemberName().equals(role)) {
968                         memberRemovedReceived.countDown();
969                     }
970                 } else if(message instanceof ClusterEvent.UnreachableMember) {
971                     String role = ((ClusterEvent.UnreachableMember)message).member().roles().head();
972                     if(!getCluster().getCurrentMemberName().equals(role)) {
973                         memberUnreachableReceived.countDown();
974                     }
975                 } else if(message instanceof ClusterEvent.ReachableMember) {
976                     String role = ((ClusterEvent.ReachableMember)message).member().roles().head();
977                     if(!getCluster().getCurrentMemberName().equals(role)) {
978                         memberReachableReceived.countDown();
979                     }
980                 }
981             }
982         }
983
984         @Override
985         public String persistenceId() {
986             return name;
987         }
988
989         @Override
990         protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
991             return shardActor;
992         }
993
994         void waitForMemberUp() {
995             assertEquals("MemberUp received", true,
996                     Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
997             memberUpReceived = new CountDownLatch(1);
998         }
999
1000         void waitForMemberRemoved() {
1001             assertEquals("MemberRemoved received", true,
1002                     Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
1003             memberRemovedReceived = new CountDownLatch(1);
1004         }
1005
1006         void waitForUnreachableMember() {
1007             assertEquals("UnreachableMember received", true,
1008                 Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS
1009                 ));
1010             memberUnreachableReceived = new CountDownLatch(1);
1011         }
1012
1013         void waitForReachableMember() {
1014             assertEquals("ReachableMember received", true,
1015                 Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
1016             memberReachableReceived = new CountDownLatch(1);
1017         }
1018
1019         void verifyFindPrimary() {
1020             assertEquals("FindPrimary received", true,
1021                     Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
1022             findPrimaryMessageReceived = new CountDownLatch(1);
1023         }
1024     }
1025 }