4b2ce82cd51d081d49f82842dc06f932f7cef546
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManagerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.shardmanager;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertNull;
13 import static org.junit.Assert.assertSame;
14 import static org.junit.Assert.assertTrue;
15 import static org.junit.Assert.fail;
16 import static org.mockito.Matchers.anyString;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.mock;
19 import static org.mockito.Mockito.never;
20 import static org.mockito.Mockito.times;
21 import static org.mockito.Mockito.verify;
22
23 import akka.actor.ActorRef;
24 import akka.actor.ActorSystem;
25 import akka.actor.AddressFromURIString;
26 import akka.actor.Props;
27 import akka.actor.Status;
28 import akka.actor.Status.Failure;
29 import akka.actor.Status.Success;
30 import akka.cluster.Cluster;
31 import akka.cluster.ClusterEvent;
32 import akka.cluster.Member;
33 import akka.dispatch.Dispatchers;
34 import akka.dispatch.OnComplete;
35 import akka.japi.Creator;
36 import akka.pattern.Patterns;
37 import akka.persistence.RecoveryCompleted;
38 import akka.serialization.Serialization;
39 import akka.testkit.TestActorRef;
40 import akka.testkit.javadsl.TestKit;
41 import akka.util.Timeout;
42 import com.google.common.base.Function;
43 import com.google.common.base.Stopwatch;
44 import com.google.common.collect.ImmutableMap;
45 import com.google.common.collect.Lists;
46 import com.google.common.collect.Sets;
47 import com.google.common.util.concurrent.Uninterruptibles;
48 import java.net.URI;
49 import java.util.AbstractMap;
50 import java.util.Arrays;
51 import java.util.Collection;
52 import java.util.Collections;
53 import java.util.HashMap;
54 import java.util.List;
55 import java.util.Map;
56 import java.util.Map.Entry;
57 import java.util.Set;
58 import java.util.concurrent.CountDownLatch;
59 import java.util.concurrent.TimeUnit;
60 import java.util.concurrent.TimeoutException;
61 import java.util.stream.Collectors;
62 import org.junit.AfterClass;
63 import org.junit.BeforeClass;
64 import org.junit.Test;
65 import org.opendaylight.controller.cluster.access.concepts.MemberName;
66 import org.opendaylight.controller.cluster.datastore.AbstractShardManagerTest;
67 import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl;
68 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
69 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
70 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
71 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
72 import org.opendaylight.controller.cluster.datastore.Shard;
73 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
74 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
75 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
76 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
77 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
78 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
79 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
80 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
81 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
82 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
83 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
84 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
85 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
86 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
87 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
88 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
89 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
90 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
91 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
92 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
93 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
94 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
95 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
96 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
97 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
98 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
99 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
100 import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
101 import org.opendaylight.controller.cluster.datastore.utils.ForwardingActor;
102 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
103 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
104 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
105 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
106 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
107 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
108 import org.opendaylight.controller.cluster.raft.RaftState;
109 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
110 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
111 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
112 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
113 import org.opendaylight.controller.cluster.raft.messages.AddServer;
114 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
115 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
116 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
117 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
118 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
119 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
120 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
121 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
122 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
123 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
124 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
125 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
126 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
127 import org.slf4j.Logger;
128 import org.slf4j.LoggerFactory;
129 import scala.concurrent.Await;
130 import scala.concurrent.Future;
131 import scala.concurrent.duration.FiniteDuration;
132
133 public class ShardManagerTest extends AbstractShardManagerTest {
134     private static final Logger LOG = LoggerFactory.getLogger(ShardManagerTest.class);
135     private static final MemberName MEMBER_2 = MemberName.forName("member-2");
136     private static final MemberName MEMBER_3 = MemberName.forName("member-3");
137
138     private static SchemaContext TEST_SCHEMA_CONTEXT;
139
140     private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
141
142     @BeforeClass
143     public static void beforeClass() {
144         TEST_SCHEMA_CONTEXT = TestModel.createTestContext();
145     }
146
147     @AfterClass
148     public static void afterClass() {
149         TEST_SCHEMA_CONTEXT = null;
150     }
151
152     private ActorSystem newActorSystem(final String config) {
153         return newActorSystem("cluster-test", config);
154     }
155
156     private ActorRef newMockShardActor(final ActorSystem system, final String shardName, final String memberName) {
157         String name = ShardIdentifier.create(shardName, MemberName.forName(memberName), "config").toString();
158         if (system == getSystem()) {
159             return actorFactory.createActor(MessageCollectorActor.props(), name);
160         }
161
162         return system.actorOf(MessageCollectorActor.props(), name);
163     }
164
165     private Props newShardMgrProps() {
166         return newShardMgrProps(new MockConfiguration());
167     }
168
169     private static DatastoreContextFactory newDatastoreContextFactory(final DatastoreContext datastoreContext) {
170         DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class);
171         doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext();
172         doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(anyString());
173         return mockFactory;
174     }
175
176     private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor() {
177         return newTestShardMgrBuilderWithMockShardActor(mockShardActor);
178     }
179
180     private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(final ActorRef shardActor) {
181         return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor)
182                 .distributedDataStore(mock(DistributedDataStore.class));
183     }
184
185
186     private Props newPropsShardMgrWithMockShardActor() {
187         return newTestShardMgrBuilderWithMockShardActor().props().withDispatcher(
188                 Dispatchers.DefaultDispatcherId());
189     }
190
191     private Props newPropsShardMgrWithMockShardActor(final ActorRef shardActor) {
192         return newTestShardMgrBuilderWithMockShardActor(shardActor).props()
193                 .withDispatcher(Dispatchers.DefaultDispatcherId());
194     }
195
196
197     private TestShardManager newTestShardManager() {
198         return newTestShardManager(newShardMgrProps());
199     }
200
201     private TestShardManager newTestShardManager(final Props props) {
202         TestActorRef<TestShardManager> shardManagerActor = actorFactory.createTestActor(props);
203         TestShardManager shardManager = shardManagerActor.underlyingActor();
204         shardManager.waitForRecoveryComplete();
205         return shardManager;
206     }
207
208     private static void waitForShardInitialized(final ActorRef shardManager, final String shardName,
209             final TestKit kit) {
210         AssertionError last = null;
211         Stopwatch sw = Stopwatch.createStarted();
212         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
213             try {
214                 shardManager.tell(new FindLocalShard(shardName, true), kit.getRef());
215                 kit.expectMsgClass(LocalShardFound.class);
216                 return;
217             } catch (AssertionError e) {
218                 last = e;
219             }
220
221             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
222         }
223
224         throw last;
225     }
226
227     @SuppressWarnings("unchecked")
228     private static <T> T expectMsgClassOrFailure(final Class<T> msgClass, final TestKit kit, final String msg) {
229         Object reply = kit.expectMsgAnyClassOf(kit.duration("5 sec"), msgClass, Failure.class);
230         if (reply instanceof Failure) {
231             throw new AssertionError(msg + " failed", ((Failure)reply).cause());
232         }
233
234         return (T)reply;
235     }
236
237     @Test
238     public void testPerShardDatastoreContext() throws Exception {
239         LOG.info("testPerShardDatastoreContext starting");
240         final DatastoreContextFactory mockFactory = newDatastoreContextFactory(
241                 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
242
243         doReturn(
244                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(6).build())
245                 .when(mockFactory).getShardDatastoreContext("default");
246
247         doReturn(
248                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(7).build())
249                 .when(mockFactory).getShardDatastoreContext("topology");
250
251         final MockConfiguration mockConfig = new MockConfiguration() {
252             @Override
253             public Collection<String> getMemberShardNames(final MemberName memberName) {
254                 return Arrays.asList("default", "topology");
255             }
256
257             @Override
258             public Collection<MemberName> getMembersFromShardName(final String shardName) {
259                 return members("member-1");
260             }
261         };
262
263         final ActorRef defaultShardActor = actorFactory.createActor(
264                 MessageCollectorActor.props(), actorFactory.generateActorId("default"));
265         final ActorRef topologyShardActor = actorFactory.createActor(
266                 MessageCollectorActor.props(), actorFactory.generateActorId("topology"));
267
268         final Map<String, Entry<ActorRef, DatastoreContext>> shardInfoMap = Collections.synchronizedMap(
269                 new HashMap<String, Entry<ActorRef, DatastoreContext>>());
270         shardInfoMap.put("default", new AbstractMap.SimpleEntry<>(defaultShardActor, null));
271         shardInfoMap.put("topology", new AbstractMap.SimpleEntry<>(topologyShardActor, null));
272
273         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
274         final CountDownLatch newShardActorLatch = new CountDownLatch(2);
275         class LocalShardManager extends ShardManager {
276             LocalShardManager(final AbstractShardManagerCreator<?> creator) {
277                 super(creator);
278             }
279
280             @Override
281             protected ActorRef newShardActor(final ShardInformation info) {
282                 Entry<ActorRef, DatastoreContext> entry = shardInfoMap.get(info.getShardName());
283                 ActorRef ref = null;
284                 if (entry != null) {
285                     ref = entry.getKey();
286                     entry.setValue(info.getDatastoreContext());
287                 }
288
289                 newShardActorLatch.countDown();
290                 return ref;
291             }
292         }
293
294         final Creator<ShardManager> creator = new Creator<ShardManager>() {
295             private static final long serialVersionUID = 1L;
296             @Override
297             public ShardManager create() {
298                 return new LocalShardManager(
299                         new GenericCreator<>(LocalShardManager.class).datastoreContextFactory(mockFactory)
300                                 .primaryShardInfoCache(primaryShardInfoCache).configuration(mockConfig));
301             }
302         };
303
304         final TestKit kit = new TestKit(getSystem());
305
306         final ActorRef shardManager = actorFactory.createActor(Props.create(
307                 new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()));
308
309         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
310
311         assertEquals("Shard actors created", true, newShardActorLatch.await(5, TimeUnit.SECONDS));
312         assertEquals("getShardElectionTimeoutFactor", 6,
313                 shardInfoMap.get("default").getValue().getShardElectionTimeoutFactor());
314         assertEquals("getShardElectionTimeoutFactor", 7,
315                 shardInfoMap.get("topology").getValue().getShardElectionTimeoutFactor());
316
317         DatastoreContextFactory newMockFactory = newDatastoreContextFactory(
318                 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
319         doReturn(
320                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(66).build())
321                 .when(newMockFactory).getShardDatastoreContext("default");
322
323         doReturn(
324                 DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).shardElectionTimeoutFactor(77).build())
325                 .when(newMockFactory).getShardDatastoreContext("topology");
326
327         shardManager.tell(newMockFactory, kit.getRef());
328
329         DatastoreContext newContext = MessageCollectorActor.expectFirstMatching(defaultShardActor,
330                 DatastoreContext.class);
331         assertEquals("getShardElectionTimeoutFactor", 66, newContext.getShardElectionTimeoutFactor());
332
333         newContext = MessageCollectorActor.expectFirstMatching(topologyShardActor, DatastoreContext.class);
334         assertEquals("getShardElectionTimeoutFactor", 77, newContext.getShardElectionTimeoutFactor());
335
336         LOG.info("testPerShardDatastoreContext ending");
337     }
338
339     @Test
340     public void testOnReceiveFindPrimaryForNonExistentShard() {
341         final TestKit kit = new TestKit(getSystem());
342         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
343
344         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
345
346         shardManager.tell(new FindPrimary("non-existent", false), kit.getRef());
347
348         kit.expectMsgClass(kit.duration("5 seconds"), PrimaryNotFoundException.class);
349     }
350
351     @Test
352     public void testOnReceiveFindPrimaryForLocalLeaderShard() {
353         LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard starting");
354         final TestKit kit = new TestKit(getSystem());
355         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
356
357         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
358
359         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
360         shardManager.tell(new ActorInitialized(), mockShardActor);
361
362         DataTree mockDataTree = mock(DataTree.class);
363         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
364             DataStoreVersions.CURRENT_VERSION), kit.getRef());
365
366         MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
367         shardManager.tell(
368             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
369             mockShardActor);
370
371         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
372
373         LocalPrimaryShardFound primaryFound = kit.expectMsgClass(kit.duration("5 seconds"),
374             LocalPrimaryShardFound.class);
375         assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
376             primaryFound.getPrimaryPath().contains("member-1-shard-default"));
377         assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
378
379         LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard ending");
380     }
381
382     @Test
383     public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() {
384         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp starting");
385         final TestKit kit = new TestKit(getSystem());
386         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
387
388         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
389         shardManager.tell(new ActorInitialized(), mockShardActor);
390
391         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
392         String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
393         shardManager.tell(
394             new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
395             mockShardActor);
396         shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION),
397             mockShardActor);
398
399         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
400
401         kit.expectMsgClass(kit.duration("5 seconds"), NoShardLeaderException.class);
402
403         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp ending");
404     }
405
406     @Test
407     public void testOnReceiveFindPrimaryForNonLocalLeaderShard() {
408         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard starting");
409         final TestKit kit = new TestKit(getSystem());
410         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
411
412         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
413         shardManager.tell(new ActorInitialized(), mockShardActor);
414
415         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
416         MockClusterWrapper.sendMemberUp(shardManager, "member-2", kit.getRef().path().toString());
417
418         String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
419         shardManager.tell(
420             new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
421             mockShardActor);
422         short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
423         shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, leaderVersion), mockShardActor);
424
425         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
426
427         RemotePrimaryShardFound primaryFound = kit.expectMsgClass(kit.duration("5 seconds"),
428             RemotePrimaryShardFound.class);
429         assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
430             primaryFound.getPrimaryPath().contains("member-2-shard-default"));
431         assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
432
433         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard ending");
434     }
435
436     @Test
437     public void testOnReceiveFindPrimaryForUninitializedShard() {
438         final TestKit kit = new TestKit(getSystem());
439         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
440
441         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
442
443         kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class);
444     }
445
446     @Test
447     public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() {
448         final TestKit kit = new TestKit(getSystem());
449         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
450
451         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
452         shardManager.tell(new ActorInitialized(), mockShardActor);
453
454         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
455
456         kit.expectMsgClass(kit.duration("5 seconds"), NoShardLeaderException.class);
457     }
458
459     @Test
460     public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() {
461         LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
462         final TestKit kit = new TestKit(getSystem());
463         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
464
465         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
466         shardManager.tell(new ActorInitialized(), mockShardActor);
467
468         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
469         shardManager.tell(
470             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Follower.name()),
471             mockShardActor);
472
473         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
474
475         kit.expectMsgClass(kit.duration("5 seconds"), NoShardLeaderException.class);
476
477         DataTree mockDataTree = mock(DataTree.class);
478         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
479             DataStoreVersions.CURRENT_VERSION), mockShardActor);
480
481         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
482
483         LocalPrimaryShardFound primaryFound = kit.expectMsgClass(kit.duration("5 seconds"),
484             LocalPrimaryShardFound.class);
485         assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
486             primaryFound.getPrimaryPath().contains("member-1-shard-default"));
487         assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
488
489         LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
490     }
491
492     @Test
493     public void testOnReceiveFindPrimaryWaitForShardLeader() {
494         LOG.info("testOnReceiveFindPrimaryWaitForShardLeader starting");
495         datastoreContextBuilder.shardInitializationTimeout(10, TimeUnit.SECONDS);
496         final TestKit kit = new TestKit(getSystem());
497         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
498
499         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
500
501         // We're passing waitUntilInitialized = true to FindPrimary so
502         // the response should be
503         // delayed until we send ActorInitialized and
504         // RoleChangeNotification.
505         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
506
507         kit.expectNoMessage(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
508
509         shardManager.tell(new ActorInitialized(), mockShardActor);
510
511         kit.expectNoMessage(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
512
513         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
514         shardManager.tell(
515             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
516             mockShardActor);
517
518         kit.expectNoMessage(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
519
520         DataTree mockDataTree = mock(DataTree.class);
521         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
522             DataStoreVersions.CURRENT_VERSION), mockShardActor);
523
524         LocalPrimaryShardFound primaryFound = kit.expectMsgClass(kit.duration("5 seconds"),
525             LocalPrimaryShardFound.class);
526         assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
527             primaryFound.getPrimaryPath().contains("member-1-shard-default"));
528         assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
529
530         kit.expectNoMessage(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
531
532         LOG.info("testOnReceiveFindPrimaryWaitForShardLeader ending");
533     }
534
535     @Test
536     public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() {
537         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard starting");
538         final TestKit kit = new TestKit(getSystem());
539         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
540
541         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
542
543         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
544
545         kit.expectMsgClass(kit.duration("2 seconds"), NotInitializedException.class);
546
547         shardManager.tell(new ActorInitialized(), mockShardActor);
548
549         kit.expectNoMessage(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
550
551         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard ending");
552     }
553
554     @Test
555     public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() {
556         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard starting");
557         final TestKit kit = new TestKit(getSystem());
558         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
559
560         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
561         shardManager.tell(new ActorInitialized(), mockShardActor);
562         shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null,
563             RaftState.Candidate.name()), mockShardActor);
564
565         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
566
567         kit.expectMsgClass(kit.duration("2 seconds"), NoShardLeaderException.class);
568
569         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard ending");
570     }
571
572     @Test
573     public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() {
574         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard starting");
575         final TestKit kit = new TestKit(getSystem());
576         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
577
578         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
579         shardManager.tell(new ActorInitialized(), mockShardActor);
580         shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null,
581             RaftState.IsolatedLeader.name()), mockShardActor);
582
583         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true),kit. getRef());
584
585         kit.expectMsgClass(kit.duration("2 seconds"), NoShardLeaderException.class);
586
587         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard ending");
588     }
589
590     @Test
591     public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() {
592         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard starting");
593         final TestKit kit = new TestKit(getSystem());
594         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
595
596         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
597         shardManager.tell(new ActorInitialized(), mockShardActor);
598
599         shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
600
601         kit.expectMsgClass(kit.duration("2 seconds"), NoShardLeaderException.class);
602
603         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard ending");
604     }
605
606     @Test
607     public void testOnReceiveFindPrimaryForRemoteShard() {
608         LOG.info("testOnReceiveFindPrimaryForRemoteShard starting");
609         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
610
611         // Create an ActorSystem ShardManager actor for member-1.
612
613         final ActorSystem system1 = newActorSystem("Member1");
614         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
615
616         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
617                 newTestShardMgrBuilderWithMockShardActor().cluster(
618                         new ClusterWrapperImpl(system1)).props().withDispatcher(
619                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
620
621         // Create an ActorSystem ShardManager actor for member-2.
622
623         final ActorSystem system2 = newActorSystem("Member2");
624
625         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
626
627         final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
628
629         MockConfiguration mockConfig2 = new MockConfiguration(
630                 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
631                         .put("astronauts", Arrays.asList("member-2")).build());
632
633         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
634                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
635                         new ClusterWrapperImpl(system2)).props().withDispatcher(
636                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
637
638         final TestKit kit = new TestKit(system1);
639         shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
640         shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
641
642         shardManager2.tell(new ActorInitialized(), mockShardActor2);
643
644         String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
645         short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
646         shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
647             mockShardActor2);
648         shardManager2.tell(new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
649             mockShardActor2);
650
651         shardManager1.underlyingActor().waitForMemberUp();
652         shardManager1.tell(new FindPrimary("astronauts", false), kit.getRef());
653
654         RemotePrimaryShardFound found = kit.expectMsgClass(kit.duration("5 seconds"), RemotePrimaryShardFound.class);
655         String path = found.getPrimaryPath();
656         assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
657         assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
658
659         shardManager2.underlyingActor().verifyFindPrimary();
660
661         // This part times out quite a bit on jenkins for some reason
662
663 //                Cluster.get(system2).down(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
664 //
665 //                shardManager1.underlyingActor().waitForMemberRemoved();
666 //
667 //                shardManager1.tell(new FindPrimary("astronauts", false), getRef());
668 //
669 //                expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
670
671         LOG.info("testOnReceiveFindPrimaryForRemoteShard ending");
672     }
673
674     @Test
675     public void testShardAvailabilityOnChangeOfMemberReachability() {
676         LOG.info("testShardAvailabilityOnChangeOfMemberReachability starting");
677         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
678
679         // Create an ActorSystem ShardManager actor for member-1.
680
681         final ActorSystem system1 = newActorSystem("Member1");
682         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
683
684         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
685
686         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
687                 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(
688                         new ClusterWrapperImpl(system1)).props().withDispatcher(
689                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
690
691         // Create an ActorSystem ShardManager actor for member-2.
692
693         final ActorSystem system2 = newActorSystem("Member2");
694
695         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
696
697         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
698
699         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
700                 .put("default", Arrays.asList("member-1", "member-2")).build());
701
702         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
703                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
704                         new ClusterWrapperImpl(system2)).props().withDispatcher(
705                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
706
707         final TestKit kit = new TestKit(system1);
708         shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
709         shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
710         shardManager1.tell(new ActorInitialized(), mockShardActor1);
711         shardManager2.tell(new ActorInitialized(), mockShardActor2);
712
713         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
714         String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
715         shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class),
716             DataStoreVersions.CURRENT_VERSION), mockShardActor1);
717         shardManager1.tell(
718             new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
719             mockShardActor1);
720         shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
721             DataStoreVersions.CURRENT_VERSION), mockShardActor2);
722         shardManager2.tell(
723             new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
724             mockShardActor2);
725         shardManager1.underlyingActor().waitForMemberUp();
726
727         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
728
729         RemotePrimaryShardFound found = kit.expectMsgClass(kit.duration("5 seconds"), RemotePrimaryShardFound.class);
730         String path = found.getPrimaryPath();
731         assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
732
733         shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"),
734             kit.getRef());
735
736         shardManager1.underlyingActor().waitForUnreachableMember();
737
738         PeerDown peerDown = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
739         assertEquals("getMemberName", MEMBER_2, peerDown.getMemberName());
740         MessageCollectorActor.clearMessages(mockShardActor1);
741
742         shardManager1.tell(MockClusterWrapper.createMemberRemoved("member-2", "akka://cluster-test@127.0.0.1:2558"),
743             kit.getRef());
744
745         MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
746
747         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
748
749         kit.expectMsgClass(kit.duration("5 seconds"), NoShardLeaderException.class);
750
751         shardManager1.tell(MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"),
752             kit.getRef());
753
754         shardManager1.underlyingActor().waitForReachableMember();
755
756         PeerUp peerUp = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
757         assertEquals("getMemberName", MEMBER_2, peerUp.getMemberName());
758         MessageCollectorActor.clearMessages(mockShardActor1);
759
760         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
761
762         RemotePrimaryShardFound found1 = kit.expectMsgClass(kit.duration("5 seconds"), RemotePrimaryShardFound.class);
763         String path1 = found1.getPrimaryPath();
764         assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
765
766         shardManager1.tell(MockClusterWrapper.createMemberUp("member-2", "akka://cluster-test@127.0.0.1:2558"),
767             kit.getRef());
768
769         MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
770
771         // Test FindPrimary wait succeeds after reachable member event.
772
773         shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
774                 "akka://cluster-test@127.0.0.1:2558"), kit.getRef());
775         shardManager1.underlyingActor().waitForUnreachableMember();
776
777         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
778
779         shardManager1.tell(
780             MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"), kit.getRef());
781
782         RemotePrimaryShardFound found2 = kit.expectMsgClass(kit.duration("5 seconds"), RemotePrimaryShardFound.class);
783         String path2 = found2.getPrimaryPath();
784         assertTrue("Unexpected primary path " + path2, path2.contains("member-2-shard-default-config"));
785
786         LOG.info("testShardAvailabilityOnChangeOfMemberReachability ending");
787     }
788
789     @Test
790     public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() {
791         LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange starting");
792         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
793
794         // Create an ActorSystem ShardManager actor for member-1.
795
796         final ActorSystem system1 = newActorSystem("Member1");
797         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
798
799         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
800
801         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
802         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
803                 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(new ClusterWrapperImpl(system1))
804                         .primaryShardInfoCache(primaryShardInfoCache).props()
805                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
806                 shardManagerID);
807
808         // Create an ActorSystem ShardManager actor for member-2.
809
810         final ActorSystem system2 = newActorSystem("Member2");
811
812         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
813
814         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
815
816         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
817                 .put("default", Arrays.asList("member-1", "member-2")).build());
818
819         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
820                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
821                         new ClusterWrapperImpl(system2)).props().withDispatcher(
822                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
823
824         final TestKit kit = new TestKit(system1);
825         shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
826         shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
827         shardManager1.tell(new ActorInitialized(), mockShardActor1);
828         shardManager2.tell(new ActorInitialized(), mockShardActor2);
829
830         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
831         String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
832         shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class),
833             DataStoreVersions.CURRENT_VERSION), mockShardActor1);
834         shardManager1.tell(
835             new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
836             mockShardActor1);
837         shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class),
838             DataStoreVersions.CURRENT_VERSION), mockShardActor2);
839         shardManager2.tell(
840             new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
841             mockShardActor2);
842         shardManager1.underlyingActor().waitForMemberUp();
843
844         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
845
846         RemotePrimaryShardFound found = kit.expectMsgClass(kit.duration("5 seconds"), RemotePrimaryShardFound.class);
847         String path = found.getPrimaryPath();
848         assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
849
850         primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(
851             system1.actorSelection(mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION));
852
853         shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
854                 "akka://cluster-test@127.0.0.1:2558"), kit.getRef());
855
856         shardManager1.underlyingActor().waitForUnreachableMember();
857
858         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
859
860         kit.expectMsgClass(kit.duration("5 seconds"), NoShardLeaderException.class);
861
862         assertNull("Expected primaryShardInfoCache entry removed",
863             primaryShardInfoCache.getIfPresent("default"));
864
865         shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, mock(DataTree.class),
866             DataStoreVersions.CURRENT_VERSION), mockShardActor1);
867         shardManager1.tell(
868             new RoleChangeNotification(memberId1, RaftState.Follower.name(), RaftState.Leader.name()),
869             mockShardActor1);
870
871         shardManager1.tell(new FindPrimary("default", true), kit.getRef());
872
873         LocalPrimaryShardFound found1 = kit.expectMsgClass(kit.duration("5 seconds"), LocalPrimaryShardFound.class);
874         String path1 = found1.getPrimaryPath();
875         assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
876
877         LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange ending");
878     }
879
880     @Test
881     public void testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable() {
882         LOG.info("testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable starting");
883         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
884
885         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
886                 .put("default", Arrays.asList("member-256", "member-2")).build());
887
888         // Create an ActorSystem, ShardManager and actor for member-256.
889
890         final ActorSystem system256 = newActorSystem("Member256");
891         // 2562 is the tcp port of Member256 in src/test/resources/application.conf.
892         Cluster.get(system256).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2562"));
893
894         final ActorRef mockShardActor256 = newMockShardActor(system256, Shard.DEFAULT_NAME, "member-256");
895
896         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
897
898         // ShardManager must be created with shard configuration to let its localShards has shards.
899         final TestActorRef<TestShardManager> shardManager256 = TestActorRef.create(system256,
900                 newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor256)
901                         .cluster(new ClusterWrapperImpl(system256))
902                         .primaryShardInfoCache(primaryShardInfoCache).props()
903                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
904                 shardManagerID);
905
906         // Create an ActorSystem, ShardManager and actor for member-2 whose name is contained in member-256.
907
908         final ActorSystem system2 = newActorSystem("Member2");
909
910         // Join member-2 into the cluster of member-256.
911         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2562"));
912
913         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
914
915         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
916                 newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor2).cluster(
917                         new ClusterWrapperImpl(system2)).props().withDispatcher(
918                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
919
920         final TestKit kit256 = new TestKit(system256);
921         shardManager256.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit256.getRef());
922         shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit256.getRef());
923         shardManager256.tell(new ActorInitialized(), mockShardActor256);
924         shardManager2.tell(new ActorInitialized(), mockShardActor2);
925
926         String memberId256 = "member-256-shard-default-" + shardMrgIDSuffix;
927         String memberId2   = "member-2-shard-default-"   + shardMrgIDSuffix;
928         shardManager256.tell(new ShardLeaderStateChanged(memberId256, memberId256, mock(DataTree.class),
929             DataStoreVersions.CURRENT_VERSION), mockShardActor256);
930         shardManager256.tell(
931             new RoleChangeNotification(memberId256, RaftState.Candidate.name(), RaftState.Leader.name()),
932             mockShardActor256);
933         shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId256, mock(DataTree.class),
934             DataStoreVersions.CURRENT_VERSION), mockShardActor2);
935         shardManager2.tell(
936             new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Follower.name()),
937             mockShardActor2);
938         shardManager256.underlyingActor().waitForMemberUp();
939
940         shardManager256.tell(new FindPrimary("default", true), kit256.getRef());
941
942         LocalPrimaryShardFound found = kit256.expectMsgClass(kit256.duration("5 seconds"),
943             LocalPrimaryShardFound.class);
944         String path = found.getPrimaryPath();
945         assertTrue("Unexpected primary path " + path + " which must on member-256",
946             path.contains("member-256-shard-default-config"));
947
948         PrimaryShardInfo primaryShardInfo = new PrimaryShardInfo(
949             system256.actorSelection(mockShardActor256.path()), DataStoreVersions.CURRENT_VERSION);
950         primaryShardInfoCache.putSuccessful("default", primaryShardInfo);
951
952         // Simulate member-2 become unreachable.
953         shardManager256.tell(MockClusterWrapper.createUnreachableMember("member-2",
954                 "akka://cluster-test@127.0.0.1:2558"), kit256.getRef());
955         shardManager256.underlyingActor().waitForUnreachableMember();
956
957         // Make sure leader shard on member-256 is still leader and still in the cache.
958         shardManager256.tell(new FindPrimary("default", true), kit256.getRef());
959         found = kit256.expectMsgClass(kit256.duration("5 seconds"), LocalPrimaryShardFound.class);
960         path = found.getPrimaryPath();
961         assertTrue("Unexpected primary path " + path + " which must still not on member-256",
962             path.contains("member-256-shard-default-config"));
963         Future<PrimaryShardInfo> futurePrimaryShard = primaryShardInfoCache.getIfPresent("default");
964         futurePrimaryShard.onComplete(new OnComplete<PrimaryShardInfo>() {
965             @Override
966             public void onComplete(final Throwable failure, final PrimaryShardInfo futurePrimaryShardInfo) {
967                 if (failure != null) {
968                     assertTrue("Primary shard info is unexpectedly removed from primaryShardInfoCache", false);
969                 } else {
970                     assertEquals("Expected primaryShardInfoCache entry",
971                         primaryShardInfo, futurePrimaryShardInfo);
972                 }
973             }
974         }, system256.dispatchers().defaultGlobalDispatcher());
975
976         LOG.info("testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable ending");
977     }
978
979     @Test
980     public void testOnReceiveFindLocalShardForNonExistentShard() {
981         final TestKit kit = new TestKit(getSystem());
982         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
983
984         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
985
986         shardManager.tell(new FindLocalShard("non-existent", false), kit.getRef());
987
988         LocalShardNotFound notFound = kit.expectMsgClass(kit.duration("5 seconds"), LocalShardNotFound.class);
989
990         assertEquals("getShardName", "non-existent", notFound.getShardName());
991     }
992
993     @Test
994     public void testOnReceiveFindLocalShardForExistentShard() {
995         final TestKit kit = new TestKit(getSystem());
996         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
997
998         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
999         shardManager.tell(new ActorInitialized(), mockShardActor);
1000
1001         shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1002
1003         LocalShardFound found = kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
1004
1005         assertTrue("Found path contains " + found.getPath().path().toString(),
1006             found.getPath().path().toString().contains("member-1-shard-default-config"));
1007     }
1008
1009     @Test
1010     public void testOnReceiveFindLocalShardForNotInitializedShard() {
1011         final TestKit kit = new TestKit(getSystem());
1012         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1013
1014         shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1015
1016         kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class);
1017     }
1018
1019     @Test
1020     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
1021         LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
1022         final TestKit kit = new TestKit(getSystem());
1023         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1024
1025         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1026
1027         // We're passing waitUntilInitialized = true to FindLocalShard
1028         // so the response should be
1029         // delayed until we send ActorInitialized.
1030         Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
1031             new Timeout(5, TimeUnit.SECONDS));
1032
1033         shardManager.tell(new ActorInitialized(), mockShardActor);
1034
1035         Object resp = Await.result(future, kit.duration("5 seconds"));
1036         assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
1037
1038         LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
1039     }
1040
1041     @Test
1042     public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
1043         TestShardManager shardManager = newTestShardManager();
1044
1045         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1046         shardManager.onReceiveCommand(new RoleChangeNotification(
1047                 memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
1048
1049         verify(ready, never()).countDown();
1050
1051         shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
1052                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1053
1054         verify(ready, times(1)).countDown();
1055     }
1056
1057     @Test
1058     public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
1059         final TestKit kit = new TestKit(getSystem());
1060         TestShardManager shardManager = newTestShardManager();
1061
1062         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1063         shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1064
1065         verify(ready, never()).countDown();
1066
1067         shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
1068
1069         shardManager.onReceiveCommand(
1070             new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
1071                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1072
1073         verify(ready, times(1)).countDown();
1074     }
1075
1076     @Test
1077     public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
1078         final TestKit kit = new TestKit(getSystem());
1079         TestShardManager shardManager = newTestShardManager();
1080
1081         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1082         shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1083
1084         verify(ready, never()).countDown();
1085
1086         shardManager.onReceiveCommand(
1087             new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
1088                 mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
1089
1090         shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
1091
1092         verify(ready, times(1)).countDown();
1093     }
1094
1095     @Test
1096     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
1097         TestShardManager shardManager = newTestShardManager();
1098
1099         shardManager.onReceiveCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(),
1100             RaftState.Leader.name()));
1101
1102         verify(ready, never()).countDown();
1103     }
1104
1105     @Test
1106     public void testByDefaultSyncStatusIsFalse() {
1107         TestShardManager shardManager = newTestShardManager();
1108
1109         assertEquals(false, shardManager.getMBean().getSyncStatus());
1110     }
1111
1112     @Test
1113     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception {
1114         TestShardManager shardManager = newTestShardManager();
1115
1116         shardManager.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
1117                 RaftState.Follower.name(), RaftState.Leader.name()));
1118
1119         assertEquals(true, shardManager.getMBean().getSyncStatus());
1120     }
1121
1122     @Test
1123     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception {
1124         TestShardManager shardManager = newTestShardManager();
1125
1126         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1127         shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
1128                 RaftState.Follower.name(), RaftState.Candidate.name()));
1129
1130         assertEquals(false, shardManager.getMBean().getSyncStatus());
1131
1132         // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
1133         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(
1134                 true, shardId));
1135
1136         assertEquals(false, shardManager.getMBean().getSyncStatus());
1137     }
1138
1139     @Test
1140     public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception {
1141         TestShardManager shardManager = newTestShardManager();
1142
1143         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1144         shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
1145                 RaftState.Candidate.name(), RaftState.Follower.name()));
1146
1147         // Initially will be false
1148         assertEquals(false, shardManager.getMBean().getSyncStatus());
1149
1150         // Send status true will make sync status true
1151         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, shardId));
1152
1153         assertEquals(true, shardManager.getMBean().getSyncStatus());
1154
1155         // Send status false will make sync status false
1156         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(false, shardId));
1157
1158         assertEquals(false, shardManager.getMBean().getSyncStatus());
1159     }
1160
1161     @Test
1162     public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception {
1163         LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards starting");
1164         TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
1165             @Override
1166             public List<String> getMemberShardNames(final MemberName memberName) {
1167                 return Arrays.asList("default", "astronauts");
1168             }
1169         }));
1170
1171         // Initially will be false
1172         assertEquals(false, shardManager.getMBean().getSyncStatus());
1173
1174         // Make default shard leader
1175         String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix;
1176         shardManager.onReceiveCommand(new RoleChangeNotification(defaultShardId,
1177                 RaftState.Follower.name(), RaftState.Leader.name()));
1178
1179         // default = Leader, astronauts is unknown so sync status remains false
1180         assertEquals(false, shardManager.getMBean().getSyncStatus());
1181
1182         // Make astronauts shard leader as well
1183         String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1184         shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
1185                 RaftState.Follower.name(), RaftState.Leader.name()));
1186
1187         // Now sync status should be true
1188         assertEquals(true, shardManager.getMBean().getSyncStatus());
1189
1190         // Make astronauts a Follower
1191         shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
1192                 RaftState.Leader.name(), RaftState.Follower.name()));
1193
1194         // Sync status is not true
1195         assertEquals(false, shardManager.getMBean().getSyncStatus());
1196
1197         // Make the astronauts follower sync status true
1198         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
1199
1200         // Sync status is now true
1201         assertEquals(true, shardManager.getMBean().getSyncStatus());
1202
1203         LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards ending");
1204     }
1205
1206     @Test
1207     public void testOnReceiveSwitchShardBehavior() {
1208         final TestKit kit = new TestKit(getSystem());
1209         final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1210
1211         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1212         shardManager.tell(new ActorInitialized(), mockShardActor);
1213
1214         shardManager.tell(new SwitchShardBehavior(mockShardName, RaftState.Leader, 1000), kit.getRef());
1215
1216         SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor,
1217             SwitchBehavior.class);
1218
1219         assertEquals(RaftState.Leader, switchBehavior.getNewState());
1220         assertEquals(1000, switchBehavior.getNewTerm());
1221     }
1222
1223     private static List<MemberName> members(final String... names) {
1224         return Arrays.asList(names).stream().map(MemberName::forName).collect(Collectors.toList());
1225     }
1226
1227     @Test
1228     public void testOnCreateShard() {
1229         LOG.info("testOnCreateShard starting");
1230         final TestKit kit = new TestKit(getSystem());
1231         datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1232
1233         ActorRef shardManager = actorFactory
1234                 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1235                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1236
1237         SchemaContext schemaContext = TEST_SCHEMA_CONTEXT;
1238         shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1239
1240         DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100)
1241                 .persistent(false).build();
1242         Shard.Builder shardBuilder = Shard.builder();
1243
1244         ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1245             "foo", null, members("member-1", "member-5", "member-6"));
1246         shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), kit.getRef());
1247
1248         kit.expectMsgClass(kit.duration("5 seconds"), Success.class);
1249
1250         shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
1251
1252         kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
1253
1254         assertEquals("isRecoveryApplicable", false, shardBuilder.getDatastoreContext().isPersistent());
1255         assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig()
1256             .getPeerAddressResolver() instanceof ShardPeerAddressResolver);
1257         assertEquals("peerMembers", Sets.newHashSet(
1258             ShardIdentifier.create("foo", MemberName.forName("member-5"), shardMrgIDSuffix).toString(),
1259             ShardIdentifier.create("foo", MemberName.forName("member-6"), shardMrgIDSuffix).toString()),
1260             shardBuilder.getPeerAddresses().keySet());
1261         assertEquals("ShardIdentifier", ShardIdentifier.create("foo", MEMBER_1, shardMrgIDSuffix),
1262             shardBuilder.getId());
1263         assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1264
1265         // Send CreateShard with same name - should return Success with
1266         // a message.
1267
1268         shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
1269
1270         Success success = kit.expectMsgClass(kit.duration("5 seconds"), Success.class);
1271         assertNotNull("Success status is null", success.status());
1272
1273         LOG.info("testOnCreateShard ending");
1274     }
1275
1276     @Test
1277     public void testOnCreateShardWithLocalMemberNotInShardConfig() {
1278         LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig starting");
1279         final TestKit kit = new TestKit(getSystem());
1280         datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1281
1282         ActorRef shardManager = actorFactory
1283                 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1284                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1285
1286         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), ActorRef.noSender());
1287
1288         Shard.Builder shardBuilder = Shard.builder();
1289         ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1290             "foo", null, members("member-5", "member-6"));
1291
1292         shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
1293         kit.expectMsgClass(kit.duration("5 seconds"), Success.class);
1294
1295         shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
1296         kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
1297
1298         assertEquals("peerMembers size", 0, shardBuilder.getPeerAddresses().size());
1299         assertEquals("schemaContext", DisableElectionsRaftPolicy.class.getName(), shardBuilder
1300             .getDatastoreContext().getShardRaftConfig().getCustomRaftPolicyImplementationClass());
1301
1302         LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig ending");
1303     }
1304
1305     @Test
1306     public void testOnCreateShardWithNoInitialSchemaContext() {
1307         LOG.info("testOnCreateShardWithNoInitialSchemaContext starting");
1308         final TestKit kit = new TestKit(getSystem());
1309         ActorRef shardManager = actorFactory
1310                 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1311                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1312
1313         Shard.Builder shardBuilder = Shard.builder();
1314
1315         ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1316             "foo", null, members("member-1"));
1317         shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
1318
1319         kit.expectMsgClass(kit.duration("5 seconds"), Success.class);
1320
1321         SchemaContext schemaContext = TEST_SCHEMA_CONTEXT;
1322         shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1323
1324         shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
1325
1326         kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
1327
1328         assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1329         assertNotNull("schemaContext is null", shardBuilder.getDatastoreContext());
1330
1331         LOG.info("testOnCreateShardWithNoInitialSchemaContext ending");
1332     }
1333
1334     @Test
1335     public void testGetSnapshot() {
1336         LOG.info("testGetSnapshot starting");
1337         TestKit kit = new TestKit(getSystem());
1338
1339         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1340                 .put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1"))
1341                 .put("astronauts", Collections.<String>emptyList()).build());
1342
1343         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig)
1344                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1345
1346         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1347         Failure failure = kit.expectMsgClass(Failure.class);
1348         assertEquals("Failure cause type", IllegalStateException.class, failure.cause().getClass());
1349
1350         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), ActorRef.noSender());
1351
1352         waitForShardInitialized(shardManager, "shard1", kit);
1353         waitForShardInitialized(shardManager, "shard2", kit);
1354
1355         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1356
1357         DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1358
1359         assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1360         assertNull("Expected null ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
1361
1362         Function<ShardSnapshot, String> shardNameTransformer = ShardSnapshot::getName;
1363
1364         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), Sets.newHashSet(
1365             datastoreSnapshot.getShardSnapshots().stream().map(shardNameTransformer).collect(Collectors.toSet())));
1366
1367         // Add a new replica
1368
1369         TestKit mockShardLeaderKit = new TestKit(getSystem());
1370
1371         TestShardManager shardManagerInstance = shardManager.underlyingActor();
1372         shardManagerInstance.setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1373
1374         shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1375         mockShardLeaderKit.expectMsgClass(AddServer.class);
1376         mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.OK, ""));
1377         kit.expectMsgClass(Status.Success.class);
1378         waitForShardInitialized(shardManager, "astronauts", kit);
1379
1380         // Send another GetSnapshot and verify
1381
1382         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1383         datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1384
1385         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"), Sets.newHashSet(
1386                 Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer)));
1387
1388         ShardManagerSnapshot snapshot = datastoreSnapshot.getShardManagerSnapshot();
1389         assertNotNull("Expected ShardManagerSnapshot", snapshot);
1390         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1391                 Sets.newHashSet(snapshot.getShardList()));
1392
1393         LOG.info("testGetSnapshot ending");
1394     }
1395
1396     @Test
1397     public void testRestoreFromSnapshot() {
1398         LOG.info("testRestoreFromSnapshot starting");
1399
1400         datastoreContextBuilder.shardInitializationTimeout(3, TimeUnit.SECONDS);
1401
1402         TestKit kit = new TestKit(getSystem());
1403
1404         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1405                 .put("shard1", Collections.<String>emptyList()).put("shard2", Collections.<String>emptyList())
1406                 .put("astronauts", Collections.<String>emptyList()).build());
1407
1408         ShardManagerSnapshot snapshot =
1409                 new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts"), Collections.emptyMap());
1410         DatastoreSnapshot restoreFromSnapshot = new DatastoreSnapshot(shardMrgIDSuffix, snapshot,
1411                 Collections.<ShardSnapshot>emptyList());
1412         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(mockConfig)
1413                 .restoreFromSnapshot(restoreFromSnapshot).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
1414
1415         shardManager.underlyingActor().waitForRecoveryComplete();
1416
1417         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), ActorRef.noSender());
1418
1419         waitForShardInitialized(shardManager, "shard1", kit);
1420         waitForShardInitialized(shardManager, "shard2", kit);
1421         waitForShardInitialized(shardManager, "astronauts", kit);
1422
1423         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1424
1425         DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1426
1427         assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1428
1429         assertNotNull("Expected ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
1430         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1431                 Sets.newHashSet(datastoreSnapshot.getShardManagerSnapshot().getShardList()));
1432
1433         LOG.info("testRestoreFromSnapshot ending");
1434     }
1435
1436     @Test
1437     public void testAddShardReplicaForNonExistentShardConfig() {
1438         final TestKit kit = new TestKit(getSystem());
1439         ActorRef shardManager = actorFactory
1440                 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1441                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1442
1443         shardManager.tell(new AddShardReplica("model-inventory"), kit.getRef());
1444         Status.Failure resp = kit.expectMsgClass(kit.duration("2 seconds"), Status.Failure.class);
1445
1446         assertEquals("Failure obtained", true, resp.cause() instanceof IllegalArgumentException);
1447     }
1448
1449     @Test
1450     public void testAddShardReplica() {
1451         LOG.info("testAddShardReplica starting");
1452         MockConfiguration mockConfig = new MockConfiguration(
1453                 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
1454                         .put("astronauts", Arrays.asList("member-2")).build());
1455
1456         final String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1457         datastoreContextBuilder.shardManagerPersistenceId(shardManagerID);
1458
1459         // Create an ActorSystem ShardManager actor for member-1.
1460         final ActorSystem system1 = newActorSystem("Member1");
1461         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1462         ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1463         final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1464                 newTestShardMgrBuilder(mockConfig).shardActor(mockDefaultShardActor)
1465                         .cluster(new ClusterWrapperImpl(system1)).props()
1466                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
1467                 shardManagerID);
1468
1469         // Create an ActorSystem ShardManager actor for member-2.
1470         final ActorSystem system2 = newActorSystem("Member2");
1471         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1472
1473         String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
1474         String name = ShardIdentifier.create("astronauts", MEMBER_2, "config").toString();
1475         final TestActorRef<MockRespondActor> mockShardLeaderActor = TestActorRef.create(system2,
1476                 Props.create(MockRespondActor.class, AddServer.class,
1477                         new AddServerReply(ServerChangeStatus.OK, memberId2))
1478                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
1479                 name);
1480         final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1481                 newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor)
1482                         .cluster(new ClusterWrapperImpl(system2)).props()
1483                         .withDispatcher(Dispatchers.DefaultDispatcherId()),
1484                 shardManagerID);
1485
1486         final TestKit kit = new TestKit(getSystem());
1487         newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1488         leaderShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1489
1490         leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1491
1492         short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1493         leaderShardManager.tell(
1494             new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
1495             mockShardLeaderActor);
1496         leaderShardManager.tell(
1497             new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
1498             mockShardLeaderActor);
1499
1500         newReplicaShardManager.underlyingActor().waitForMemberUp();
1501         leaderShardManager.underlyingActor().waitForMemberUp();
1502
1503         // Have a dummy snapshot to be overwritten by the new data
1504         // persisted.
1505         String[] restoredShards = { "default", "people" };
1506         ShardManagerSnapshot snapshot =
1507                 new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap());
1508         InMemorySnapshotStore.addSnapshot(shardManagerID, snapshot);
1509         Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS);
1510
1511         InMemorySnapshotStore.addSnapshotSavedLatch(shardManagerID);
1512         InMemorySnapshotStore.addSnapshotDeletedLatch(shardManagerID);
1513
1514         // construct a mock response message
1515         newReplicaShardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1516         AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1517             AddServer.class);
1518         String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1519         assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
1520         kit.expectMsgClass(kit.duration("5 seconds"), Status.Success.class);
1521
1522         InMemorySnapshotStore.waitForSavedSnapshot(shardManagerID, ShardManagerSnapshot.class);
1523         InMemorySnapshotStore.waitForDeletedSnapshot(shardManagerID);
1524         List<ShardManagerSnapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(shardManagerID,
1525             ShardManagerSnapshot.class);
1526         assertEquals("Number of snapshots persisted", 1, persistedSnapshots.size());
1527         ShardManagerSnapshot shardManagerSnapshot = persistedSnapshots.get(0);
1528         assertEquals("Persisted local shards", Sets.newHashSet("default", "astronauts"),
1529             Sets.newHashSet(shardManagerSnapshot.getShardList()));
1530         LOG.info("testAddShardReplica ending");
1531     }
1532
1533     @Test
1534     public void testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader() {
1535         LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader starting");
1536         final TestKit kit = new TestKit(getSystem());
1537         TestActorRef<TestShardManager> shardManager = actorFactory
1538                 .createTestActor(newPropsShardMgrWithMockShardActor(), shardMgrID);
1539
1540         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1541         shardManager.tell(new ActorInitialized(), mockShardActor);
1542
1543         String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix;
1544         AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null);
1545         ActorRef leaderShardActor = shardManager.underlyingActor().getContext()
1546                 .actorOf(Props.create(MockRespondActor.class, AddServer.class, addServerReply), leaderId);
1547
1548         MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString());
1549
1550         String newReplicaId = "member-1-shard-default-" + shardMrgIDSuffix;
1551         shardManager.tell(
1552             new RoleChangeNotification(newReplicaId, RaftState.Candidate.name(), RaftState.Follower.name()),
1553             mockShardActor);
1554         shardManager.tell(
1555             new ShardLeaderStateChanged(newReplicaId, leaderId, DataStoreVersions.CURRENT_VERSION),
1556             mockShardActor);
1557
1558         shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1559
1560         MessageCollectorActor.expectFirstMatching(leaderShardActor, AddServer.class);
1561
1562         Failure resp = kit.expectMsgClass(kit.duration("5 seconds"), Failure.class);
1563         assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1564
1565         shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1566         kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
1567
1568         // Send message again to verify previous in progress state is
1569         // cleared
1570
1571         shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1572         resp = kit.expectMsgClass(kit.duration("5 seconds"), Failure.class);
1573         assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1574
1575         // Send message again with an AddServer timeout to verify the
1576         // pre-existing shard actor isn't terminated.
1577
1578         shardManager.tell(
1579             newDatastoreContextFactory(
1580                 datastoreContextBuilder.shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build()), kit.getRef());
1581         leaderShardActor.tell(MockRespondActor.CLEAR_RESPONSE, ActorRef.noSender());
1582         shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1583         kit.expectMsgClass(kit.duration("5 seconds"), Failure.class);
1584
1585         shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1586         kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
1587
1588         LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader ending");
1589     }
1590
1591     @Test
1592     public void testAddShardReplicaWithPreExistingLocalReplicaLeader() {
1593         LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader starting");
1594         final TestKit kit = new TestKit(getSystem());
1595         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1596         ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1597
1598         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1599         shardManager.tell(new ActorInitialized(), mockShardActor);
1600         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1601             DataStoreVersions.CURRENT_VERSION), kit.getRef());
1602         shardManager.tell(
1603             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
1604             mockShardActor);
1605
1606         shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
1607         Failure resp = kit.expectMsgClass(kit.duration("5 seconds"), Failure.class);
1608         assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1609
1610         shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
1611         kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
1612
1613         LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader ending");
1614     }
1615
1616     @Test
1617     public void testAddShardReplicaWithAddServerReplyFailure() {
1618         LOG.info("testAddShardReplicaWithAddServerReplyFailure starting");
1619         final TestKit kit = new TestKit(getSystem());
1620         final TestKit mockShardLeaderKit = new TestKit(getSystem());
1621
1622         MockConfiguration mockConfig = new MockConfiguration(
1623             ImmutableMap.of("astronauts", Arrays.asList("member-2")));
1624
1625         ActorRef mockNewReplicaShardActor = newMockShardActor(getSystem(), "astronauts", "member-1");
1626         final TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1627             newTestShardMgrBuilder(mockConfig).shardActor(mockNewReplicaShardActor).props()
1628             .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID);
1629         shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1630
1631         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1632
1633         TestKit terminateWatcher = new TestKit(getSystem());
1634         terminateWatcher.watch(mockNewReplicaShardActor);
1635
1636         shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1637
1638         AddServer addServerMsg = mockShardLeaderKit.expectMsgClass(AddServer.class);
1639         assertEquals("AddServer serverId", "member-1-shard-astronauts-" + shardMrgIDSuffix,
1640             addServerMsg.getNewServerId());
1641         mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.TIMEOUT, null));
1642
1643         Failure failure = kit.expectMsgClass(kit.duration("5 seconds"), Failure.class);
1644         assertEquals("Failure cause", TimeoutException.class, failure.cause().getClass());
1645
1646         shardManager.tell(new FindLocalShard("astronauts", false), kit.getRef());
1647         kit.expectMsgClass(kit.duration("5 seconds"), LocalShardNotFound.class);
1648
1649         terminateWatcher.expectTerminated(mockNewReplicaShardActor);
1650
1651         shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1652         mockShardLeaderKit.expectMsgClass(AddServer.class);
1653         mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.NO_LEADER, null));
1654         failure = kit.expectMsgClass(kit.duration("5 seconds"), Failure.class);
1655         assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass());
1656
1657         LOG.info("testAddShardReplicaWithAddServerReplyFailure ending");
1658     }
1659
1660     @Test
1661     public void testAddShardReplicaWithAlreadyInProgress() {
1662         testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1663                 AddServer.class, new AddShardReplica("astronauts"));
1664     }
1665
1666     @Test
1667     public void testAddShardReplicaWithFindPrimaryTimeout() {
1668         LOG.info("testAddShardReplicaWithFindPrimaryTimeout starting");
1669         datastoreContextBuilder.shardInitializationTimeout(100, TimeUnit.MILLISECONDS);
1670         final TestKit kit = new TestKit(getSystem());
1671         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.of("astronauts", Arrays.asList("member-2")));
1672
1673         final ActorRef newReplicaShardManager = actorFactory
1674                 .createActor(newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor).props()
1675                     .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID);
1676
1677         newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1678         MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2",
1679             AddressFromURIString.parse("akka://non-existent@127.0.0.1:5").toString());
1680
1681         newReplicaShardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1682         Status.Failure resp = kit.expectMsgClass(kit.duration("5 seconds"), Status.Failure.class);
1683         assertEquals("Failure obtained", true, resp.cause() instanceof RuntimeException);
1684
1685         LOG.info("testAddShardReplicaWithFindPrimaryTimeout ending");
1686     }
1687
1688     @Test
1689     public void testRemoveShardReplicaForNonExistentShard() {
1690         final TestKit kit = new TestKit(getSystem());
1691         ActorRef shardManager = actorFactory
1692                 .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
1693                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1694
1695         shardManager.tell(new RemoveShardReplica("model-inventory", MEMBER_1), kit.getRef());
1696         Status.Failure resp = kit.expectMsgClass(kit.duration("10 seconds"), Status.Failure.class);
1697         assertEquals("Failure obtained", true, resp.cause() instanceof PrimaryNotFoundException);
1698     }
1699
1700     @Test
1701     /**
1702      * Primary is Local.
1703      */
1704     public void testRemoveShardReplicaLocal() {
1705         final TestKit kit = new TestKit(getSystem());
1706         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1707
1708         final ActorRef respondActor = actorFactory.createActor(Props.create(MockRespondActor.class,
1709             RemoveServer.class, new RemoveServerReply(ServerChangeStatus.OK, null)), memberId);
1710
1711         ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
1712
1713         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1714         shardManager.tell(new ActorInitialized(), respondActor);
1715         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
1716             DataStoreVersions.CURRENT_VERSION), kit.getRef());
1717         shardManager.tell(
1718             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
1719             respondActor);
1720
1721         shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, MEMBER_1), kit.getRef());
1722         final RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(respondActor,
1723             RemoveServer.class);
1724         assertEquals(ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString(),
1725             removeServer.getServerId());
1726         kit.expectMsgClass(kit.duration("5 seconds"), Success.class);
1727     }
1728
1729     @Test
1730     public void testRemoveShardReplicaRemote() {
1731         MockConfiguration mockConfig = new MockConfiguration(
1732                 ImmutableMap.<String, List<String>>builder().put("default", Arrays.asList("member-1", "member-2"))
1733                         .put("astronauts", Arrays.asList("member-1")).build());
1734
1735         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1736
1737         // Create an ActorSystem ShardManager actor for member-1.
1738         final ActorSystem system1 = newActorSystem("Member1");
1739         Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1740         ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1741
1742         final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1743                 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockDefaultShardActor).cluster(
1744                         new ClusterWrapperImpl(system1)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1745                 shardManagerID);
1746
1747         // Create an ActorSystem ShardManager actor for member-2.
1748         final ActorSystem system2 = newActorSystem("Member2");
1749         Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
1750
1751         String name = ShardIdentifier.create("default", MEMBER_2, shardMrgIDSuffix).toString();
1752         String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
1753         final TestActorRef<MockRespondActor> mockShardLeaderActor =
1754                 TestActorRef.create(system2, Props.create(MockRespondActor.class, RemoveServer.class,
1755                         new RemoveServerReply(ServerChangeStatus.OK, memberId2)), name);
1756
1757         LOG.error("Mock Shard Leader Actor : {}", mockShardLeaderActor);
1758
1759         final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1760                 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardLeaderActor).cluster(
1761                         new ClusterWrapperImpl(system2)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1762                 shardManagerID);
1763
1764         // Because mockShardLeaderActor is created at the top level of the actor system it has an address like so,
1765         //    akka://cluster-test@127.0.0.1:2559/user/member-2-shard-default-config1
1766         // However when a shard manager has a local shard which is a follower and a leader that is remote it will
1767         // try to compute an address for the remote shard leader using the ShardPeerAddressResolver. This address will
1768         // look like so,
1769         //    akka://cluster-test@127.0.0.1:2559/user/shardmanager-config1/member-2-shard-default-config1
1770         // In this specific case if we did a FindPrimary for shard default from member-1 we would come up
1771         // with the address of an actor which does not exist, therefore any message sent to that actor would go to
1772         // dead letters.
1773         // To work around this problem we create a ForwardingActor with the right address and pass to it the
1774         // mockShardLeaderActor. The ForwardingActor simply forwards all messages to the mockShardLeaderActor and every
1775         // thing works as expected
1776         final ActorRef actorRef = leaderShardManager.underlyingActor().context()
1777                 .actorOf(Props.create(ForwardingActor.class, mockShardLeaderActor),
1778                         "member-2-shard-default-" + shardMrgIDSuffix);
1779
1780         LOG.error("Forwarding actor : {}", actorRef);
1781
1782         final TestKit kit = new TestKit(getSystem());
1783         newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1784         leaderShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1785
1786         leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1787         newReplicaShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1788
1789         short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1790         leaderShardManager.tell(
1791             new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion),
1792             mockShardLeaderActor);
1793         leaderShardManager.tell(
1794             new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()),
1795             mockShardLeaderActor);
1796
1797         String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
1798         newReplicaShardManager.tell(
1799             new ShardLeaderStateChanged(memberId1, memberId2, mock(DataTree.class), leaderVersion),
1800             mockShardActor);
1801         newReplicaShardManager.tell(
1802             new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
1803             mockShardActor);
1804
1805         newReplicaShardManager.underlyingActor().waitForMemberUp();
1806         leaderShardManager.underlyingActor().waitForMemberUp();
1807
1808         // construct a mock response message
1809         newReplicaShardManager.tell(new RemoveShardReplica("default", MEMBER_1), kit.getRef());
1810         RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1811             RemoveServer.class);
1812         String removeServerId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
1813         assertEquals("RemoveServer serverId", removeServerId, removeServer.getServerId());
1814         kit.expectMsgClass(kit.duration("5 seconds"), Status.Success.class);
1815     }
1816
1817     @Test
1818     public void testRemoveShardReplicaWhenAnotherRemoveShardReplicaAlreadyInProgress() {
1819         testServerChangeWhenAlreadyInProgress("astronauts", new RemoveShardReplica("astronauts", MEMBER_2),
1820                 RemoveServer.class, new RemoveShardReplica("astronauts", MEMBER_3));
1821     }
1822
1823     @Test
1824     public void testRemoveShardReplicaWhenAddShardReplicaAlreadyInProgress() {
1825         testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1826                 AddServer.class, new RemoveShardReplica("astronauts", MEMBER_2));
1827     }
1828
1829
1830     public void testServerChangeWhenAlreadyInProgress(final String shardName, final Object firstServerChange,
1831                                                       final Class<?> firstForwardedServerChangeClass,
1832                                                       final Object secondServerChange) {
1833         final TestKit kit = new TestKit(getSystem());
1834         final TestKit mockShardLeaderKit = new TestKit(getSystem());
1835         final TestKit secondRequestKit = new TestKit(getSystem());
1836
1837         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1838             .put(shardName, Arrays.asList("member-2")).build());
1839
1840         final TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
1841             newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardActor)
1842             .cluster(new MockClusterWrapper()).props()
1843             .withDispatcher(Dispatchers.DefaultDispatcherId()),
1844             shardMgrID);
1845
1846         shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1847
1848         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1849
1850         shardManager.tell(firstServerChange, kit.getRef());
1851
1852         mockShardLeaderKit.expectMsgClass(firstForwardedServerChangeClass);
1853
1854         shardManager.tell(secondServerChange, secondRequestKit.getRef());
1855
1856         secondRequestKit.expectMsgClass(secondRequestKit.duration("5 seconds"), Failure.class);
1857     }
1858
1859     @Test
1860     public void testServerRemovedShardActorNotRunning() {
1861         LOG.info("testServerRemovedShardActorNotRunning starting");
1862         final TestKit kit = new TestKit(getSystem());
1863         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1864             .put("default", Arrays.asList("member-1", "member-2"))
1865             .put("astronauts", Arrays.asList("member-2"))
1866             .put("people", Arrays.asList("member-1", "member-2")).build());
1867
1868         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1869             newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId()));
1870
1871         shardManager.underlyingActor().waitForRecoveryComplete();
1872         shardManager.tell(new FindLocalShard("people", false), kit.getRef());
1873         kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class);
1874
1875         shardManager.tell(new FindLocalShard("default", false), kit.getRef());
1876         kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class);
1877
1878         // Removed the default shard replica from member-1
1879         ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
1880         ShardIdentifier shardId = builder.shardName("default").memberName(MEMBER_1).type(shardMrgIDSuffix)
1881                 .build();
1882         shardManager.tell(new ServerRemoved(shardId.toString()), kit.getRef());
1883
1884         shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1885
1886         LOG.info("testServerRemovedShardActorNotRunning ending");
1887     }
1888
1889     @Test
1890     public void testServerRemovedShardActorRunning() {
1891         LOG.info("testServerRemovedShardActorRunning starting");
1892         final TestKit kit = new TestKit(getSystem());
1893         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1894             .put("default", Arrays.asList("member-1", "member-2"))
1895             .put("astronauts", Arrays.asList("member-2"))
1896             .put("people", Arrays.asList("member-1", "member-2")).build());
1897
1898         String shardId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
1899         ActorRef shard = actorFactory.createActor(MessageCollectorActor.props(), shardId);
1900
1901         TestActorRef<TestShardManager> shardManager = actorFactory
1902                 .createTestActor(newTestShardMgrBuilder(mockConfig).addShardActor("default", shard).props()
1903                     .withDispatcher(Dispatchers.DefaultDispatcherId()));
1904
1905         shardManager.underlyingActor().waitForRecoveryComplete();
1906
1907         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1908         shardManager.tell(new ActorInitialized(), shard);
1909
1910         waitForShardInitialized(shardManager, "people", kit);
1911         waitForShardInitialized(shardManager, "default", kit);
1912
1913         // Removed the default shard replica from member-1
1914         shardManager.tell(new ServerRemoved(shardId), kit.getRef());
1915
1916         shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1917
1918         MessageCollectorActor.expectFirstMatching(shard, Shutdown.class);
1919
1920         LOG.info("testServerRemovedShardActorRunning ending");
1921     }
1922
1923     @Test
1924     public void testShardPersistenceWithRestoredData() {
1925         LOG.info("testShardPersistenceWithRestoredData starting");
1926         final TestKit kit = new TestKit(getSystem());
1927         MockConfiguration mockConfig =
1928                 new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1929                     .put("default", Arrays.asList("member-1", "member-2"))
1930                     .put("astronauts", Arrays.asList("member-2"))
1931                     .put("people", Arrays.asList("member-1", "member-2")).build());
1932         String[] restoredShards = {"default", "astronauts"};
1933         ShardManagerSnapshot snapshot =
1934                 new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap());
1935         InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot);
1936
1937         // create shardManager to come up with restored data
1938         TestActorRef<TestShardManager> newRestoredShardManager = actorFactory.createTestActor(
1939             newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId()));
1940
1941         newRestoredShardManager.underlyingActor().waitForRecoveryComplete();
1942
1943         newRestoredShardManager.tell(new FindLocalShard("people", false), kit.getRef());
1944         LocalShardNotFound notFound = kit.expectMsgClass(kit.duration("5 seconds"), LocalShardNotFound.class);
1945         assertEquals("for uninitialized shard", "people", notFound.getShardName());
1946
1947         // Verify a local shard is created for the restored shards,
1948         // although we expect a NotInitializedException for the shards
1949         // as the actor initialization
1950         // message is not sent for them
1951         newRestoredShardManager.tell(new FindLocalShard("default", false), kit.getRef());
1952         kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class);
1953
1954         newRestoredShardManager.tell(new FindLocalShard("astronauts", false), kit.getRef());
1955         kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class);
1956
1957         LOG.info("testShardPersistenceWithRestoredData ending");
1958     }
1959
1960     @Test
1961     public void testShutDown() throws Exception {
1962         LOG.info("testShutDown starting");
1963         final TestKit kit = new TestKit(getSystem());
1964         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder()
1965             .put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1")).build());
1966
1967         String shardId1 = ShardIdentifier.create("shard1", MEMBER_1, shardMrgIDSuffix).toString();
1968         ActorRef shard1 = actorFactory.createActor(MessageCollectorActor.props(), shardId1);
1969
1970         String shardId2 = ShardIdentifier.create("shard2", MEMBER_1, shardMrgIDSuffix).toString();
1971         ActorRef shard2 = actorFactory.createActor(MessageCollectorActor.props(), shardId2);
1972
1973         ActorRef shardManager = actorFactory.createActor(newTestShardMgrBuilder(mockConfig)
1974             .addShardActor("shard1", shard1).addShardActor("shard2", shard2).props());
1975
1976         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
1977         shardManager.tell(new ActorInitialized(), shard1);
1978         shardManager.tell(new ActorInitialized(), shard2);
1979
1980         FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
1981         Future<Boolean> stopFuture = Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE);
1982
1983         MessageCollectorActor.expectFirstMatching(shard1, Shutdown.class);
1984         MessageCollectorActor.expectFirstMatching(shard2, Shutdown.class);
1985
1986         try {
1987             Await.ready(stopFuture, FiniteDuration.create(500, TimeUnit.MILLISECONDS));
1988             fail("ShardManager actor stopped without waiting for the Shards to be stopped");
1989         } catch (TimeoutException e) {
1990             // expected
1991         }
1992
1993         actorFactory.killActor(shard1, kit);
1994         actorFactory.killActor(shard2, kit);
1995
1996         Boolean stopped = Await.result(stopFuture, duration);
1997         assertEquals("Stopped", Boolean.TRUE, stopped);
1998
1999         LOG.info("testShutDown ending");
2000     }
2001
2002     @Test
2003     public void testChangeServersVotingStatus() {
2004         final TestKit kit = new TestKit(getSystem());
2005         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
2006
2007         ActorRef respondActor = actorFactory
2008                 .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
2009                     new ServerChangeReply(ServerChangeStatus.OK, null)), memberId);
2010
2011         ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
2012
2013         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
2014         shardManager.tell(new ActorInitialized(), respondActor);
2015         shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
2016             DataStoreVersions.CURRENT_VERSION), kit.getRef());
2017         shardManager.tell(
2018             new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
2019             respondActor);
2020
2021         shardManager.tell(
2022             new ChangeShardMembersVotingStatus("default", ImmutableMap.of("member-2", Boolean.TRUE)), kit.getRef());
2023
2024         ChangeServersVotingStatus actualChangeStatusMsg = MessageCollectorActor
2025                 .expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
2026         assertEquals("ChangeServersVotingStatus map", actualChangeStatusMsg.getServerVotingStatusMap(),
2027             ImmutableMap.of(ShardIdentifier
2028                 .create("default", MemberName.forName("member-2"), shardMrgIDSuffix).toString(),
2029                 Boolean.TRUE));
2030
2031         kit.expectMsgClass(kit.duration("5 seconds"), Success.class);
2032     }
2033
2034     @Test
2035     public void testChangeServersVotingStatusWithNoLeader() {
2036         final TestKit kit = new TestKit(getSystem());
2037         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
2038
2039         ActorRef respondActor = actorFactory
2040                 .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
2041                     new ServerChangeReply(ServerChangeStatus.NO_LEADER, null)), memberId);
2042
2043         ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
2044
2045         shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
2046         shardManager.tell(new ActorInitialized(), respondActor);
2047         shardManager.tell(new RoleChangeNotification(memberId, null, RaftState.Follower.name()), respondActor);
2048
2049         shardManager.tell(
2050             new ChangeShardMembersVotingStatus("default", ImmutableMap.of("member-2", Boolean.TRUE)), kit.getRef());
2051
2052         MessageCollectorActor.expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
2053
2054         Status.Failure resp = kit.expectMsgClass(kit.duration("5 seconds"), Status.Failure.class);
2055         assertEquals("Failure resposnse", true, resp.cause() instanceof NoShardLeaderException);
2056     }
2057
2058     public static class TestShardManager extends ShardManager {
2059         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
2060         private final CountDownLatch snapshotPersist = new CountDownLatch(1);
2061         private ShardManagerSnapshot snapshot;
2062         private final Map<String, ActorRef> shardActors;
2063         private final ActorRef shardActor;
2064         private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
2065         private CountDownLatch memberUpReceived = new CountDownLatch(1);
2066         private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
2067         private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
2068         private CountDownLatch memberReachableReceived = new CountDownLatch(1);
2069         private volatile MessageInterceptor messageInterceptor;
2070
2071         TestShardManager(final Builder builder) {
2072             super(builder);
2073             shardActor = builder.shardActor;
2074             shardActors = builder.shardActors;
2075         }
2076
2077         @Override
2078         protected void handleRecover(final Object message) throws Exception {
2079             try {
2080                 super.handleRecover(message);
2081             } finally {
2082                 if (message instanceof RecoveryCompleted) {
2083                     recoveryComplete.countDown();
2084                 }
2085             }
2086         }
2087
2088         private void countDownIfOther(final Member member, final CountDownLatch latch) {
2089             if (!getCluster().getCurrentMemberName().equals(memberToName(member))) {
2090                 latch.countDown();
2091             }
2092         }
2093
2094         @Override
2095         public void handleCommand(final Object message) throws Exception {
2096             try {
2097                 if (messageInterceptor != null && messageInterceptor.canIntercept(message)) {
2098                     getSender().tell(messageInterceptor.apply(message), getSelf());
2099                 } else {
2100                     super.handleCommand(message);
2101                 }
2102             } finally {
2103                 if (message instanceof FindPrimary) {
2104                     findPrimaryMessageReceived.countDown();
2105                 } else if (message instanceof ClusterEvent.MemberUp) {
2106                     countDownIfOther(((ClusterEvent.MemberUp) message).member(), memberUpReceived);
2107                 } else if (message instanceof ClusterEvent.MemberRemoved) {
2108                     countDownIfOther(((ClusterEvent.MemberRemoved) message).member(), memberRemovedReceived);
2109                 } else if (message instanceof ClusterEvent.UnreachableMember) {
2110                     countDownIfOther(((ClusterEvent.UnreachableMember) message).member(), memberUnreachableReceived);
2111                 } else if (message instanceof ClusterEvent.ReachableMember) {
2112                     countDownIfOther(((ClusterEvent.ReachableMember) message).member(), memberReachableReceived);
2113                 }
2114             }
2115         }
2116
2117         void setMessageInterceptor(final MessageInterceptor messageInterceptor) {
2118             this.messageInterceptor = messageInterceptor;
2119         }
2120
2121         void waitForRecoveryComplete() {
2122             assertEquals("Recovery complete", true,
2123                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
2124         }
2125
2126         public void waitForMemberUp() {
2127             assertEquals("MemberUp received", true,
2128                     Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
2129             memberUpReceived = new CountDownLatch(1);
2130         }
2131
2132         void waitForMemberRemoved() {
2133             assertEquals("MemberRemoved received", true,
2134                     Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
2135             memberRemovedReceived = new CountDownLatch(1);
2136         }
2137
2138         void waitForUnreachableMember() {
2139             assertEquals("UnreachableMember received", true,
2140                 Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS
2141                 ));
2142             memberUnreachableReceived = new CountDownLatch(1);
2143         }
2144
2145         void waitForReachableMember() {
2146             assertEquals("ReachableMember received", true,
2147                 Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
2148             memberReachableReceived = new CountDownLatch(1);
2149         }
2150
2151         void verifyFindPrimary() {
2152             assertEquals("FindPrimary received", true,
2153                     Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
2154             findPrimaryMessageReceived = new CountDownLatch(1);
2155         }
2156
2157         public static Builder builder(final DatastoreContext.Builder datastoreContextBuilder) {
2158             return new Builder(datastoreContextBuilder);
2159         }
2160
2161         public static class Builder extends AbstractGenericCreator<Builder, TestShardManager> {
2162             private ActorRef shardActor;
2163             private final Map<String, ActorRef> shardActors = new HashMap<>();
2164
2165             Builder(final DatastoreContext.Builder datastoreContextBuilder) {
2166                 super(TestShardManager.class);
2167                 datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build()));
2168             }
2169
2170             Builder shardActor(final ActorRef newShardActor) {
2171                 this.shardActor = newShardActor;
2172                 return this;
2173             }
2174
2175             Builder addShardActor(final String shardName, final ActorRef actorRef) {
2176                 shardActors.put(shardName, actorRef);
2177                 return this;
2178             }
2179         }
2180
2181         @Override
2182         public void saveSnapshot(final Object obj) {
2183             snapshot = (ShardManagerSnapshot) obj;
2184             snapshotPersist.countDown();
2185             super.saveSnapshot(obj);
2186         }
2187
2188         void verifySnapshotPersisted(final Set<String> shardList) {
2189             assertEquals("saveSnapshot invoked", true,
2190                     Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
2191             assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
2192         }
2193
2194         @Override
2195         protected ActorRef newShardActor(final ShardInformation info) {
2196             if (shardActors.get(info.getShardName()) != null) {
2197                 return shardActors.get(info.getShardName());
2198             }
2199
2200             if (shardActor != null) {
2201                 return shardActor;
2202             }
2203
2204             return super.newShardActor(info);
2205         }
2206     }
2207
2208     private abstract static class AbstractGenericCreator<T extends AbstractGenericCreator<T, ?>, C extends ShardManager>
2209                                                      extends AbstractShardManagerCreator<T> {
2210         private final Class<C> shardManagerClass;
2211
2212         AbstractGenericCreator(final Class<C> shardManagerClass) {
2213             this.shardManagerClass = shardManagerClass;
2214             cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).waitTillReadyCountDownLatch(ready)
2215                     .primaryShardInfoCache(new PrimaryShardInfoFutureCache());
2216         }
2217
2218         @Override
2219         public Props props() {
2220             verify();
2221             return Props.create(shardManagerClass, this);
2222         }
2223     }
2224
2225     private static class GenericCreator<C extends ShardManager> extends AbstractGenericCreator<GenericCreator<C>, C> {
2226         GenericCreator(final Class<C> shardManagerClass) {
2227             super(shardManagerClass);
2228         }
2229     }
2230
2231     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
2232         private static final long serialVersionUID = 1L;
2233         private final Creator<ShardManager> delegate;
2234
2235         DelegatingShardManagerCreator(final Creator<ShardManager> delegate) {
2236             this.delegate = delegate;
2237         }
2238
2239         @Override
2240         public ShardManager create() throws Exception {
2241             return delegate.create();
2242         }
2243     }
2244
2245     interface MessageInterceptor extends Function<Object, Object> {
2246         boolean canIntercept(Object message);
2247     }
2248
2249     private static MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) {
2250         return new MessageInterceptor() {
2251             @Override
2252             public Object apply(final Object message) {
2253                 return new RemotePrimaryShardFound(Serialization.serializedActorPath(primaryActor), (short) 1);
2254             }
2255
2256             @Override
2257             public boolean canIntercept(final Object message) {
2258                 return message instanceof FindPrimary;
2259             }
2260         };
2261     }
2262
2263     private static class MockRespondActor extends MessageCollectorActor {
2264         static final String CLEAR_RESPONSE = "clear-response";
2265
2266         private Object responseMsg;
2267         private final Class<?> requestClass;
2268
2269         @SuppressWarnings("unused")
2270         MockRespondActor(final Class<?> requestClass, final Object responseMsg) {
2271             this.requestClass = requestClass;
2272             this.responseMsg = responseMsg;
2273         }
2274
2275         @Override
2276         public void onReceive(final Object message) throws Exception {
2277             if (message.equals(CLEAR_RESPONSE)) {
2278                 responseMsg = null;
2279             } else {
2280                 super.onReceive(message);
2281                 if (message.getClass().equals(requestClass) && responseMsg != null) {
2282                     getSender().tell(responseMsg, getSelf());
2283                 }
2284             }
2285         }
2286     }
2287 }