2525ba78b95a82278dcf36c3ba829053fedb2525
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardManagerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Mockito.mock;
17 import static org.mockito.Mockito.never;
18 import static org.mockito.Mockito.times;
19 import static org.mockito.Mockito.verify;
20 import akka.actor.ActorRef;
21 import akka.actor.ActorSystem;
22 import akka.actor.AddressFromURIString;
23 import akka.actor.Props;
24 import akka.actor.Status;
25 import akka.actor.Status.Failure;
26 import akka.actor.Status.Success;
27 import akka.actor.Terminated;
28 import akka.cluster.Cluster;
29 import akka.cluster.ClusterEvent;
30 import akka.dispatch.Dispatchers;
31 import akka.japi.Creator;
32 import akka.pattern.Patterns;
33 import akka.persistence.RecoveryCompleted;
34 import akka.serialization.Serialization;
35 import akka.testkit.JavaTestKit;
36 import akka.testkit.TestActorRef;
37 import akka.util.Timeout;
38 import com.google.common.base.Function;
39 import com.google.common.base.Optional;
40 import com.google.common.base.Stopwatch;
41 import com.google.common.collect.ImmutableMap;
42 import com.google.common.collect.ImmutableSet;
43 import com.google.common.collect.Lists;
44 import com.google.common.collect.Sets;
45 import com.google.common.util.concurrent.Uninterruptibles;
46 import com.typesafe.config.ConfigFactory;
47 import java.net.URI;
48 import java.util.AbstractMap;
49 import java.util.ArrayList;
50 import java.util.Arrays;
51 import java.util.Collection;
52 import java.util.Collections;
53 import java.util.HashMap;
54 import java.util.List;
55 import java.util.Map;
56 import java.util.Map.Entry;
57 import java.util.Set;
58 import java.util.concurrent.CountDownLatch;
59 import java.util.concurrent.TimeUnit;
60 import java.util.concurrent.TimeoutException;
61 import org.apache.commons.lang3.SerializationUtils;
62 import org.junit.After;
63 import org.junit.Before;
64 import org.junit.Test;
65 import org.mockito.Mock;
66 import org.mockito.Mockito;
67 import org.mockito.MockitoAnnotations;
68 import org.opendaylight.controller.cluster.datastore.config.Configuration;
69 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
70 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
71 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
72 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
73 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
74 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
75 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
76 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
77 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
78 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
79 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
80 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
81 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
82 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
83 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
84 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
85 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
86 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
87 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
88 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
89 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
90 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
91 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
92 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
93 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
94 import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
95 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
96 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
97 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
98 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
99 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
100 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
101 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
102 import org.opendaylight.controller.cluster.raft.RaftState;
103 import org.opendaylight.controller.cluster.raft.TestActorFactory;
104 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
105 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
106 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
107 import org.opendaylight.controller.cluster.raft.messages.AddServer;
108 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
109 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
110 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
111 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
112 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
113 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
114 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
115 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
116 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
117 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
118 import org.slf4j.Logger;
119 import org.slf4j.LoggerFactory;
120 import scala.concurrent.Await;
121 import scala.concurrent.Future;
122 import scala.concurrent.duration.FiniteDuration;
123
124 public class ShardManagerTest extends AbstractActorTest {
125     private static final Logger LOG = LoggerFactory.getLogger(ShardManagerTest.class);
126
127     private static int ID_COUNTER = 1;
128
129     private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
130     private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
131
132     @Mock
133     private static CountDownLatch ready;
134
135     private static TestActorRef<MessageCollectorActor> mockShardActor;
136
137     private static String mockShardName;
138
139     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
140             dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
141                    .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
142
143     private final Collection<ActorSystem> actorSystems = new ArrayList<>();
144
145     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
146
147     @Before
148     public void setUp() {
149         MockitoAnnotations.initMocks(this);
150
151         InMemoryJournal.clear();
152         InMemorySnapshotStore.clear();
153
154         if(mockShardActor == null) {
155             mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString();
156             mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), mockShardName);
157         }
158
159         mockShardActor.underlyingActor().clear();
160     }
161
162     @After
163     public void tearDown() {
164         InMemoryJournal.clear();
165         InMemorySnapshotStore.clear();
166
167         for(ActorSystem system: actorSystems) {
168             JavaTestKit.shutdownActorSystem(system, null, Boolean.TRUE);
169         }
170
171         actorFactory.close();
172     }
173
174     private ActorSystem newActorSystem(String config) {
175         ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(config));
176         actorSystems.add(system);
177         return system;
178     }
179
180     private ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
181         String name = new ShardIdentifier(shardName, memberName,"config").toString();
182         if(system == getSystem()) {
183             return actorFactory.createTestActor(Props.create(MessageCollectorActor.class), name);
184         }
185
186         return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
187     }
188
189     private Props newShardMgrProps() {
190         return newShardMgrProps(new MockConfiguration());
191     }
192
193     private static DatastoreContextFactory newDatastoreContextFactory(DatastoreContext datastoreContext) {
194         DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class);
195         Mockito.doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext();
196         Mockito.doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(Mockito.anyString());
197         return mockFactory;
198     }
199
200     private TestShardManager.Builder newTestShardMgrBuilder() {
201         return TestShardManager.builder(datastoreContextBuilder);
202     }
203
204     private TestShardManager.Builder newTestShardMgrBuilder(Configuration config) {
205         return TestShardManager.builder(datastoreContextBuilder).configuration(config);
206     }
207
208     private Props newShardMgrProps(Configuration config) {
209         return newTestShardMgrBuilder(config).props();
210     }
211
212     private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor() {
213         return TestShardManager.builder(datastoreContextBuilder).shardActor(mockShardActor);
214     }
215
216     private Props newPropsShardMgrWithMockShardActor() {
217         return newTestShardMgrBuilderWithMockShardActor().props();
218     }
219
220     private TestShardManager newTestShardManager() {
221         return newTestShardManager(newShardMgrProps());
222     }
223
224     private TestShardManager newTestShardManager(Props props) {
225         TestActorRef<TestShardManager> shardManagerActor = actorFactory.createTestActor(props);
226         TestShardManager shardManager = shardManagerActor.underlyingActor();
227         shardManager.waitForRecoveryComplete();
228         return shardManager;
229     }
230
231     private void waitForShardInitialized(ActorRef shardManager, String shardName, JavaTestKit kit) {
232         AssertionError last = null;
233         Stopwatch sw = Stopwatch.createStarted();
234         while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
235             try {
236                 shardManager.tell(new FindLocalShard(shardName, true), kit.getRef());
237                 kit.expectMsgClass(LocalShardFound.class);
238                 return;
239             } catch(AssertionError e) {
240                 last = e;
241             }
242
243             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
244         }
245
246         throw last;
247     }
248
249     private <T> T expectMsgClassOrFailure(Class<T> msgClass, JavaTestKit kit, String msg) {
250         Object reply = kit.expectMsgAnyClassOf(JavaTestKit.duration("5 sec"), msgClass, Failure.class);
251         if(reply instanceof Failure) {
252             throw new AssertionError(msg + " failed", ((Failure)reply).cause());
253         }
254
255         return (T)reply;
256     }
257
258     @Test
259     public void testPerShardDatastoreContext() throws Exception {
260         LOG.info("testPerShardDatastoreContext starting");
261         final DatastoreContextFactory mockFactory = newDatastoreContextFactory(
262                 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
263
264         Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
265                 shardElectionTimeoutFactor(6).build()).when(mockFactory).getShardDatastoreContext("default");
266
267         Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
268                 shardElectionTimeoutFactor(7).build()).when(mockFactory).getShardDatastoreContext("topology");
269
270         final MockConfiguration mockConfig = new MockConfiguration() {
271             @Override
272             public Collection<String> getMemberShardNames(String memberName) {
273                 return Arrays.asList("default", "topology");
274             }
275
276             @Override
277             public Collection<String> getMembersFromShardName(String shardName) {
278                 return Arrays.asList("member-1");
279             }
280         };
281
282         final TestActorRef<MessageCollectorActor> defaultShardActor = actorFactory.createTestActor(
283                 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("default"));
284         final TestActorRef<MessageCollectorActor> topologyShardActor = actorFactory.createTestActor(
285                 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("topology"));
286
287         final Map<String, Entry<ActorRef, DatastoreContext>> shardInfoMap = Collections.synchronizedMap(
288                 new HashMap<String, Entry<ActorRef, DatastoreContext>>());
289         shardInfoMap.put("default", new AbstractMap.SimpleEntry<ActorRef, DatastoreContext>(defaultShardActor, null));
290         shardInfoMap.put("topology", new AbstractMap.SimpleEntry<ActorRef, DatastoreContext>(topologyShardActor, null));
291
292         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
293         final CountDownLatch newShardActorLatch = new CountDownLatch(2);
294         class LocalShardManager extends ShardManager {
295             public LocalShardManager(AbstractBuilder<?> builder) {
296                 super(builder);
297             }
298
299             @Override
300             protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
301                 Entry<ActorRef, DatastoreContext> entry = shardInfoMap.get(info.getShardName());
302                 ActorRef ref = null;
303                 if(entry != null) {
304                     ref = entry.getKey();
305                     entry.setValue(info.getDatastoreContext());
306                 }
307
308                 newShardActorLatch.countDown();
309                 return ref;
310             }
311         }
312
313         final Creator<ShardManager> creator = new Creator<ShardManager>() {
314             private static final long serialVersionUID = 1L;
315             @Override
316             public ShardManager create() throws Exception {
317                 return new LocalShardManager(new GenericBuilder<LocalShardManager>(LocalShardManager.class).
318                         datastoreContextFactory(mockFactory).primaryShardInfoCache(primaryShardInfoCache).
319                         configuration(mockConfig));
320             }
321         };
322
323         JavaTestKit kit = new JavaTestKit(getSystem());
324
325         final ActorRef shardManager = actorFactory.createActor(Props.create(
326                 new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()));
327
328         shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), kit.getRef());
329
330         assertEquals("Shard actors created", true, newShardActorLatch.await(5, TimeUnit.SECONDS));
331         assertEquals("getShardElectionTimeoutFactor", 6, shardInfoMap.get("default").getValue().
332                 getShardElectionTimeoutFactor());
333         assertEquals("getShardElectionTimeoutFactor", 7, shardInfoMap.get("topology").getValue().
334                 getShardElectionTimeoutFactor());
335
336         DatastoreContextFactory newMockFactory = newDatastoreContextFactory(
337                 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
338         Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
339                 shardElectionTimeoutFactor(66).build()).when(newMockFactory).getShardDatastoreContext("default");
340
341         Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
342                 shardElectionTimeoutFactor(77).build()).when(newMockFactory).getShardDatastoreContext("topology");
343
344         shardManager.tell(newMockFactory, kit.getRef());
345
346         DatastoreContext newContext = MessageCollectorActor.expectFirstMatching(defaultShardActor, DatastoreContext.class);
347         assertEquals("getShardElectionTimeoutFactor", 66, newContext.getShardElectionTimeoutFactor());
348
349         newContext = MessageCollectorActor.expectFirstMatching(topologyShardActor, DatastoreContext.class);
350         assertEquals("getShardElectionTimeoutFactor", 77, newContext.getShardElectionTimeoutFactor());
351
352         LOG.info("testPerShardDatastoreContext ending");
353     }
354
355     @Test
356     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
357         new JavaTestKit(getSystem()) {{
358             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
359
360             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
361
362             shardManager.tell(new FindPrimary("non-existent", false), getRef());
363
364             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
365         }};
366     }
367
368     @Test
369     public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception {
370         LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard starting");
371         new JavaTestKit(getSystem()) {{
372             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
373
374             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
375
376             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
377             shardManager.tell(new ActorInitialized(), mockShardActor);
378
379             DataTree mockDataTree = mock(DataTree.class);
380             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
381                     DataStoreVersions.CURRENT_VERSION), getRef());
382
383             MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
384             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
385                     RaftState.Leader.name())), mockShardActor);
386
387             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
388
389             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
390             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
391                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
392             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
393         }};
394
395         LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard ending");
396     }
397
398     @Test
399     public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() throws Exception {
400         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp starting");
401         new JavaTestKit(getSystem()) {{
402             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
403
404             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
405             shardManager.tell(new ActorInitialized(), mockShardActor);
406
407             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
408             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
409             shardManager.tell(new RoleChangeNotification(memberId1,
410                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
411             shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION), mockShardActor);
412
413             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
414
415             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
416         }};
417
418         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp ending");
419     }
420
421     @Test
422     public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception {
423         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard starting");
424         new JavaTestKit(getSystem()) {{
425             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
426
427             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
428             shardManager.tell(new ActorInitialized(), mockShardActor);
429
430             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
431             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
432
433             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
434             shardManager.tell(new RoleChangeNotification(memberId1,
435                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
436             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
437             shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, Optional.<DataTree>absent(),
438                     leaderVersion), mockShardActor);
439
440             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
441
442             RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
443             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
444                     primaryFound.getPrimaryPath().contains("member-2-shard-default"));
445             assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
446         }};
447
448         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard ending");
449     }
450
451     @Test
452     public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception {
453         new JavaTestKit(getSystem()) {{
454             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
455
456             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
457
458             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
459         }};
460     }
461
462     @Test
463     public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception {
464         new JavaTestKit(getSystem()) {{
465             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
466
467             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
468             shardManager.tell(new ActorInitialized(), mockShardActor);
469
470             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
471
472             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
473         }};
474     }
475
476     @Test
477     public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception {
478         LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
479         new JavaTestKit(getSystem()) {{
480             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
481
482             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
483             shardManager.tell(new ActorInitialized(), mockShardActor);
484
485             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
486             shardManager.tell(new RoleChangeNotification(memberId,
487                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
488
489             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
490
491             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
492
493             DataTree mockDataTree = mock(DataTree.class);
494             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
495                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
496
497             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
498
499             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
500             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
501                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
502             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
503         }};
504
505         LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
506     }
507
508     @Test
509     public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
510         LOG.info("testOnReceiveFindPrimaryWaitForShardLeader starting");
511         new JavaTestKit(getSystem()) {{
512             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
513
514             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
515
516             // We're passing waitUntilInitialized = true to FindPrimary so the response should be
517             // delayed until we send ActorInitialized and RoleChangeNotification.
518             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
519
520             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
521
522             shardManager.tell(new ActorInitialized(), mockShardActor);
523
524             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
525
526             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
527             shardManager.tell(new RoleChangeNotification(memberId,
528                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor);
529
530             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
531
532             DataTree mockDataTree = mock(DataTree.class);
533             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
534                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
535
536             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
537             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
538                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
539             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
540
541             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
542         }};
543
544         LOG.info("testOnReceiveFindPrimaryWaitForShardLeader ending");
545     }
546
547     @Test
548     public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception {
549         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard starting");
550         new JavaTestKit(getSystem()) {{
551             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
552
553             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
554
555             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
556
557             expectMsgClass(duration("2 seconds"), NotInitializedException.class);
558
559             shardManager.tell(new ActorInitialized(), mockShardActor);
560
561             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
562         }};
563
564         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard ending");
565     }
566
567     @Test
568     public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception {
569         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard starting");
570         new JavaTestKit(getSystem()) {{
571             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
572
573             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
574             shardManager.tell(new ActorInitialized(), mockShardActor);
575             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
576                     null, RaftState.Candidate.name()), mockShardActor);
577
578             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
579
580             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
581         }};
582
583         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard ending");
584     }
585
586     @Test
587     public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() throws Exception {
588         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard starting");
589         new JavaTestKit(getSystem()) {{
590             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
591
592             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
593             shardManager.tell(new ActorInitialized(), mockShardActor);
594             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
595                     null, RaftState.IsolatedLeader.name()), mockShardActor);
596
597             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
598
599             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
600         }};
601
602         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard ending");
603     }
604
605     @Test
606     public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
607         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard starting");
608         new JavaTestKit(getSystem()) {{
609             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
610
611             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
612             shardManager.tell(new ActorInitialized(), mockShardActor);
613
614             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
615
616             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
617         }};
618
619         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard ending");
620     }
621
622     @Test
623     public void testOnReceiveFindPrimaryForRemoteShard() throws Exception {
624         LOG.info("testOnReceiveFindPrimaryForRemoteShard starting");
625         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
626
627         // Create an ActorSystem ShardManager actor for member-1.
628
629         final ActorSystem system1 = newActorSystem("Member1");
630         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
631
632         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
633                 newTestShardMgrBuilderWithMockShardActor().cluster(
634                         new ClusterWrapperImpl(system1)).props(), shardManagerID);
635
636         // Create an ActorSystem ShardManager actor for member-2.
637
638         final ActorSystem system2 = newActorSystem("Member2");
639
640         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
641
642         final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
643
644         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
645                 put("default", Arrays.asList("member-1", "member-2")).
646                 put("astronauts", Arrays.asList("member-2")).build());
647
648         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
649                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
650                         new ClusterWrapperImpl(system2)).props(), shardManagerID);
651
652         new JavaTestKit(system1) {{
653
654             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
655             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
656
657             shardManager2.tell(new ActorInitialized(), mockShardActor2);
658
659             String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
660             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
661             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2,
662                     Optional.of(mock(DataTree.class)), leaderVersion), mockShardActor2);
663             shardManager2.tell(new RoleChangeNotification(memberId2,
664                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
665
666             shardManager1.underlyingActor().waitForMemberUp();
667
668             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
669
670             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
671             String path = found.getPrimaryPath();
672             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
673             assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
674
675             shardManager2.underlyingActor().verifyFindPrimary();
676
677             Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
678
679             shardManager1.underlyingActor().waitForMemberRemoved();
680
681             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
682
683             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
684         }};
685
686         LOG.info("testOnReceiveFindPrimaryForRemoteShard ending");
687     }
688
689     @Test
690     public void testShardAvailabilityOnChangeOfMemberReachability() throws Exception {
691         LOG.info("testShardAvailabilityOnChangeOfMemberReachability starting");
692         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
693
694         // Create an ActorSystem ShardManager actor for member-1.
695
696         final ActorSystem system1 = newActorSystem("Member1");
697         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
698
699         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
700
701         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
702                 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(
703                         new ClusterWrapperImpl(system1)).props(), shardManagerID);
704
705         // Create an ActorSystem ShardManager actor for member-2.
706
707         final ActorSystem system2 = newActorSystem("Member2");
708
709         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
710
711         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
712
713         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
714             put("default", Arrays.asList("member-1", "member-2")).build());
715
716         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
717                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
718                         new ClusterWrapperImpl(system2)).props(), shardManagerID);
719
720         new JavaTestKit(system1) {{
721
722             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
723             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
724             shardManager1.tell(new ActorInitialized(), mockShardActor1);
725             shardManager2.tell(new ActorInitialized(), mockShardActor2);
726
727             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
728             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
729             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
730                 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
731             shardManager1.tell(new RoleChangeNotification(memberId1,
732                 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
733             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)),
734                     DataStoreVersions.CURRENT_VERSION),
735                 mockShardActor2);
736             shardManager2.tell(new RoleChangeNotification(memberId2,
737                 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
738             shardManager1.underlyingActor().waitForMemberUp();
739
740             shardManager1.tell(new FindPrimary("default", true), getRef());
741
742             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
743             String path = found.getPrimaryPath();
744             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
745
746             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
747                 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
748
749             shardManager1.underlyingActor().waitForUnreachableMember();
750
751             PeerDown peerDown = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
752             assertEquals("getMemberName", "member-2", peerDown.getMemberName());
753             MessageCollectorActor.clearMessages(mockShardActor1);
754
755             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
756                     createMemberRemoved("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
757
758             MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
759
760             shardManager1.tell(new FindPrimary("default", true), getRef());
761
762             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
763
764             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
765                 createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
766
767             shardManager1.underlyingActor().waitForReachableMember();
768
769             PeerUp peerUp = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
770             assertEquals("getMemberName", "member-2", peerUp.getMemberName());
771             MessageCollectorActor.clearMessages(mockShardActor1);
772
773             shardManager1.tell(new FindPrimary("default", true), getRef());
774
775             RemotePrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
776             String path1 = found1.getPrimaryPath();
777             assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
778
779             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
780                     createMemberUp("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
781
782             MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
783
784         }};
785
786         LOG.info("testShardAvailabilityOnChangeOfMemberReachability ending");
787     }
788
789     @Test
790     public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() throws Exception {
791         LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange starting");
792         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
793
794         // Create an ActorSystem ShardManager actor for member-1.
795
796         final ActorSystem system1 = newActorSystem("Member1");
797         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
798
799         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
800
801         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
802         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
803                 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(
804                         new ClusterWrapperImpl(system1)).primaryShardInfoCache(primaryShardInfoCache).props(),
805                 shardManagerID);
806
807         // Create an ActorSystem ShardManager actor for member-2.
808
809         final ActorSystem system2 = newActorSystem("Member2");
810
811         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
812
813         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
814
815         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
816             put("default", Arrays.asList("member-1", "member-2")).build());
817
818         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
819                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
820                         new ClusterWrapperImpl(system2)).props(), shardManagerID);
821
822         new JavaTestKit(system1) {{
823             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
824             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
825             shardManager1.tell(new ActorInitialized(), mockShardActor1);
826             shardManager2.tell(new ActorInitialized(), mockShardActor2);
827
828             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
829             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
830             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
831                 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
832             shardManager1.tell(new RoleChangeNotification(memberId1,
833                 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
834             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)),
835                     DataStoreVersions.CURRENT_VERSION),
836                 mockShardActor2);
837             shardManager2.tell(new RoleChangeNotification(memberId2,
838                 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
839             shardManager1.underlyingActor().waitForMemberUp();
840
841             shardManager1.tell(new FindPrimary("default", true), getRef());
842
843             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
844             String path = found.getPrimaryPath();
845             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
846
847             primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(system1.actorSelection(
848                     mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION, Optional.<DataTree>absent()));
849
850             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
851                 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
852
853             shardManager1.underlyingActor().waitForUnreachableMember();
854
855             shardManager1.tell(new FindPrimary("default", true), getRef());
856
857             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
858
859             assertNull("Expected primaryShardInfoCache entry removed", primaryShardInfoCache.getIfPresent("default"));
860
861             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, Optional.of(mock(DataTree.class)),
862                     DataStoreVersions.CURRENT_VERSION), mockShardActor1);
863             shardManager1.tell(new RoleChangeNotification(memberId1,
864                 RaftState.Follower.name(), RaftState.Leader.name()), mockShardActor1);
865
866             shardManager1.tell(new FindPrimary("default", true), getRef());
867
868             LocalPrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
869             String path1 = found1.getPrimaryPath();
870             assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
871
872         }};
873
874         LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange ending");
875     }
876
877
878     @Test
879     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
880         new JavaTestKit(getSystem()) {{
881             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
882
883             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
884
885             shardManager.tell(new FindLocalShard("non-existent", false), getRef());
886
887             LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
888
889             assertEquals("getShardName", "non-existent", notFound.getShardName());
890         }};
891     }
892
893     @Test
894     public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
895         new JavaTestKit(getSystem()) {{
896             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
897
898             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
899             shardManager.tell(new ActorInitialized(), mockShardActor);
900
901             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
902
903             LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
904
905             assertTrue("Found path contains " + found.getPath().path().toString(),
906                     found.getPath().path().toString().contains("member-1-shard-default-config"));
907         }};
908     }
909
910     @Test
911     public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
912         new JavaTestKit(getSystem()) {{
913             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
914
915             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
916
917             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
918         }};
919     }
920
921     @Test
922     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
923         LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
924         new JavaTestKit(getSystem()) {{
925             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
926
927             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
928
929             // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
930             // delayed until we send ActorInitialized.
931             Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
932                     new Timeout(5, TimeUnit.SECONDS));
933
934             shardManager.tell(new ActorInitialized(), mockShardActor);
935
936             Object resp = Await.result(future, duration("5 seconds"));
937             assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
938         }};
939
940         LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
941     }
942
943     @Test
944     public void testOnRecoveryJournalIsCleaned() {
945         String persistenceID = "shard-manager-" + shardMrgIDSuffix;
946         InMemoryJournal.addEntry(persistenceID, 1L, new ShardManager.SchemaContextModules(
947                 ImmutableSet.of("foo")));
948         InMemoryJournal.addEntry(persistenceID, 2L, new ShardManager.SchemaContextModules(
949                 ImmutableSet.of("bar")));
950         InMemoryJournal.addDeleteMessagesCompleteLatch(persistenceID);
951
952         TestShardManager shardManager = newTestShardManager();
953
954         InMemoryJournal.waitForDeleteMessagesComplete(persistenceID);
955
956         // Journal entries up to the last one should've been deleted
957         Map<Long, Object> journal = InMemoryJournal.get(persistenceID);
958         synchronized (journal) {
959             assertEquals("Journal size", 0, journal.size());
960         }
961     }
962
963     @Test
964     public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
965         TestShardManager shardManager = newTestShardManager();
966
967         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
968         shardManager.onReceiveCommand(new RoleChangeNotification(
969                 memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
970
971         verify(ready, never()).countDown();
972
973         shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
974                 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION));
975
976         verify(ready, times(1)).countDown();
977     }
978
979     @Test
980     public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
981         new JavaTestKit(getSystem()) {{
982             TestShardManager shardManager = newTestShardManager();
983
984             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
985             shardManager.onReceiveCommand(new RoleChangeNotification(
986                     memberId, null, RaftState.Follower.name()));
987
988             verify(ready, never()).countDown();
989
990             shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
991
992             shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId,
993                     "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
994                     DataStoreVersions.CURRENT_VERSION));
995
996             verify(ready, times(1)).countDown();
997         }};
998     }
999
1000     @Test
1001     public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
1002         new JavaTestKit(getSystem()) {{
1003             TestShardManager shardManager = newTestShardManager();
1004
1005             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1006             shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1007
1008             verify(ready, never()).countDown();
1009
1010             shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId,
1011                     "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
1012                     DataStoreVersions.CURRENT_VERSION));
1013
1014             shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
1015
1016             verify(ready, times(1)).countDown();
1017         }};
1018     }
1019
1020     @Test
1021     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
1022         TestShardManager shardManager = newTestShardManager();
1023
1024         shardManager.onReceiveCommand(new RoleChangeNotification(
1025                 "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
1026
1027         verify(ready, never()).countDown();
1028     }
1029
1030     @Test
1031     public void testByDefaultSyncStatusIsFalse() throws Exception{
1032         TestShardManager shardManager = newTestShardManager();
1033
1034         assertEquals(false, shardManager.getMBean().getSyncStatus());
1035     }
1036
1037     @Test
1038     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
1039         TestShardManager shardManager = newTestShardManager();
1040
1041         shardManager.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
1042                 RaftState.Follower.name(), RaftState.Leader.name()));
1043
1044         assertEquals(true, shardManager.getMBean().getSyncStatus());
1045     }
1046
1047     @Test
1048     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
1049         TestShardManager shardManager = newTestShardManager();
1050
1051         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1052         shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
1053                 RaftState.Follower.name(), RaftState.Candidate.name()));
1054
1055         assertEquals(false, shardManager.getMBean().getSyncStatus());
1056
1057         // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
1058         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(
1059                 true, shardId));
1060
1061         assertEquals(false, shardManager.getMBean().getSyncStatus());
1062     }
1063
1064     @Test
1065     public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
1066         TestShardManager shardManager = newTestShardManager();
1067
1068         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1069         shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
1070                 RaftState.Candidate.name(), RaftState.Follower.name()));
1071
1072         // Initially will be false
1073         assertEquals(false, shardManager.getMBean().getSyncStatus());
1074
1075         // Send status true will make sync status true
1076         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, shardId));
1077
1078         assertEquals(true, shardManager.getMBean().getSyncStatus());
1079
1080         // Send status false will make sync status false
1081         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(false, shardId));
1082
1083         assertEquals(false, shardManager.getMBean().getSyncStatus());
1084
1085     }
1086
1087     @Test
1088     public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
1089         LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards starting");
1090         TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
1091             @Override
1092             public List<String> getMemberShardNames(String memberName) {
1093                 return Arrays.asList("default", "astronauts");
1094             }
1095         }));
1096
1097         // Initially will be false
1098         assertEquals(false, shardManager.getMBean().getSyncStatus());
1099
1100         // Make default shard leader
1101         String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix;
1102         shardManager.onReceiveCommand(new RoleChangeNotification(defaultShardId,
1103                 RaftState.Follower.name(), RaftState.Leader.name()));
1104
1105         // default = Leader, astronauts is unknown so sync status remains false
1106         assertEquals(false, shardManager.getMBean().getSyncStatus());
1107
1108         // Make astronauts shard leader as well
1109         String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1110         shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
1111                 RaftState.Follower.name(), RaftState.Leader.name()));
1112
1113         // Now sync status should be true
1114         assertEquals(true, shardManager.getMBean().getSyncStatus());
1115
1116         // Make astronauts a Follower
1117         shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
1118                 RaftState.Leader.name(), RaftState.Follower.name()));
1119
1120         // Sync status is not true
1121         assertEquals(false, shardManager.getMBean().getSyncStatus());
1122
1123         // Make the astronauts follower sync status true
1124         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
1125
1126         // Sync status is now true
1127         assertEquals(true, shardManager.getMBean().getSyncStatus());
1128
1129         LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards ending");
1130     }
1131
1132     @Test
1133     public void testOnReceiveSwitchShardBehavior() throws Exception {
1134         new JavaTestKit(getSystem()) {{
1135             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1136
1137             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1138             shardManager.tell(new ActorInitialized(), mockShardActor);
1139
1140             shardManager.tell(new SwitchShardBehavior(mockShardName, "Leader", 1000), getRef());
1141
1142             SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor, SwitchBehavior.class);
1143
1144             assertEquals(RaftState.Leader, switchBehavior.getNewState());
1145             assertEquals(1000, switchBehavior.getNewTerm());
1146         }};
1147     }
1148
1149     @Test
1150     public void testOnCreateShard() {
1151         LOG.info("testOnCreateShard starting");
1152         new JavaTestKit(getSystem()) {{
1153             datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1154
1155             ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
1156                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1157
1158             SchemaContext schemaContext = TestModel.createTestContext();
1159             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1160
1161             DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100).
1162                     persistent(false).build();
1163             Shard.Builder shardBuilder = Shard.builder();
1164
1165             ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1166                     "foo", null, Arrays.asList("member-1", "member-5", "member-6"));
1167             shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), getRef());
1168
1169             expectMsgClass(duration("5 seconds"), Success.class);
1170
1171             shardManager.tell(new FindLocalShard("foo", true), getRef());
1172
1173             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1174
1175             assertEquals("isRecoveryApplicable", false, shardBuilder.getDatastoreContext().isPersistent());
1176             assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig().
1177                     getPeerAddressResolver() instanceof ShardPeerAddressResolver);
1178             assertEquals("peerMembers", Sets.newHashSet(new ShardIdentifier("foo", "member-5", shardMrgIDSuffix).toString(),
1179                     new ShardIdentifier("foo", "member-6", shardMrgIDSuffix).toString()),
1180                     shardBuilder.getPeerAddresses().keySet());
1181             assertEquals("ShardIdentifier", new ShardIdentifier("foo", "member-1", shardMrgIDSuffix),
1182                     shardBuilder.getId());
1183             assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1184
1185             // Send CreateShard with same name - should return Success with a message.
1186
1187             shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1188
1189             Success success = expectMsgClass(duration("5 seconds"), Success.class);
1190             assertNotNull("Success status is null", success.status());
1191         }};
1192
1193         LOG.info("testOnCreateShard ending");
1194     }
1195
1196     @Test
1197     public void testOnCreateShardWithLocalMemberNotInShardConfig() {
1198         LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig starting");
1199         new JavaTestKit(getSystem()) {{
1200             datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1201
1202             ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
1203                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1204
1205             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1206
1207             Shard.Builder shardBuilder = Shard.builder();
1208             ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1209                     "foo", null, Arrays.asList("member-5", "member-6"));
1210
1211             shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1212             expectMsgClass(duration("5 seconds"), Success.class);
1213
1214             shardManager.tell(new FindLocalShard("foo", true), getRef());
1215             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1216
1217             assertEquals("peerMembers size", 0, shardBuilder.getPeerAddresses().size());
1218             assertEquals("schemaContext", DisableElectionsRaftPolicy.class.getName(),
1219                     shardBuilder.getDatastoreContext().getShardRaftConfig().getCustomRaftPolicyImplementationClass());
1220         }};
1221
1222         LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig ending");
1223     }
1224
1225     @Test
1226     public void testOnCreateShardWithNoInitialSchemaContext() {
1227         LOG.info("testOnCreateShardWithNoInitialSchemaContext starting");
1228         new JavaTestKit(getSystem()) {{
1229             ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
1230                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1231
1232             Shard.Builder shardBuilder = Shard.builder();
1233
1234             ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1235                     "foo", null, Arrays.asList("member-1"));
1236             shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1237
1238             expectMsgClass(duration("5 seconds"), Success.class);
1239
1240             SchemaContext schemaContext = TestModel.createTestContext();
1241             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1242
1243             shardManager.tell(new FindLocalShard("foo", true), getRef());
1244
1245             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1246
1247             assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1248             assertNotNull("schemaContext is null", shardBuilder.getDatastoreContext());
1249         }};
1250
1251         LOG.info("testOnCreateShardWithNoInitialSchemaContext ending");
1252     }
1253
1254     @Test
1255     public void testGetSnapshot() throws Throwable {
1256         LOG.info("testGetSnapshot starting");
1257         JavaTestKit kit = new JavaTestKit(getSystem());
1258
1259         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1260                    put("shard1", Arrays.asList("member-1")).
1261                    put("shard2", Arrays.asList("member-1")).
1262                    put("astronauts", Collections.<String>emptyList()).build());
1263
1264         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig).
1265                 withDispatcher(Dispatchers.DefaultDispatcherId()));
1266
1267         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1268         Failure failure = kit.expectMsgClass(Failure.class);
1269         assertEquals("Failure cause type", IllegalStateException.class, failure.cause().getClass());
1270
1271         shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1272
1273         waitForShardInitialized(shardManager, "shard1", kit);
1274         waitForShardInitialized(shardManager, "shard2", kit);
1275
1276         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1277
1278         DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1279
1280         assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1281         assertNull("Expected null ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
1282
1283         Function<ShardSnapshot, String> shardNameTransformer = new Function<ShardSnapshot, String>() {
1284             @Override
1285             public String apply(ShardSnapshot s) {
1286                 return s.getName();
1287             }
1288         };
1289
1290         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), Sets.newHashSet(
1291                 Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer)));
1292
1293         // Add a new replica
1294
1295         JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
1296
1297         TestShardManager shardManagerInstance = shardManager.underlyingActor();
1298         shardManagerInstance.setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1299
1300         shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1301         mockShardLeaderKit.expectMsgClass(AddServer.class);
1302         mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.OK, ""));
1303         kit.expectMsgClass(Status.Success.class);
1304         waitForShardInitialized(shardManager, "astronauts", kit);
1305
1306         // Send another GetSnapshot and verify
1307
1308         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1309         datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1310
1311         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"), Sets.newHashSet(
1312                 Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer)));
1313
1314         byte[] snapshotBytes = datastoreSnapshot.getShardManagerSnapshot();
1315         assertNotNull("Expected ShardManagerSnapshot", snapshotBytes);
1316         ShardManagerSnapshot snapshot = SerializationUtils.deserialize(snapshotBytes);
1317         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1318                 Sets.newHashSet(snapshot.getShardList()));
1319
1320         LOG.info("testGetSnapshot ending");
1321     }
1322
1323     @Test
1324     public void testRestoreFromSnapshot() throws Throwable {
1325         LOG.info("testRestoreFromSnapshot starting");
1326
1327         JavaTestKit kit = new JavaTestKit(getSystem());
1328
1329         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1330                    put("shard1", Collections.<String>emptyList()).
1331                    put("shard2", Collections.<String>emptyList()).
1332                    put("astronauts", Collections.<String>emptyList()).build());
1333
1334
1335         ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts"));
1336         DatastoreSnapshot restoreFromSnapshot = new DatastoreSnapshot(shardMrgIDSuffix,
1337                 SerializationUtils.serialize(snapshot), Collections.<ShardSnapshot>emptyList());
1338         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(mockConfig).
1339                 restoreFromSnapshot(restoreFromSnapshot).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
1340
1341         shardManager.underlyingActor().waitForRecoveryComplete();
1342
1343         shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1344
1345         waitForShardInitialized(shardManager, "shard1", kit);
1346         waitForShardInitialized(shardManager, "shard2", kit);
1347         waitForShardInitialized(shardManager, "astronauts", kit);
1348
1349         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1350
1351         DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1352
1353         assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1354
1355         byte[] snapshotBytes = datastoreSnapshot.getShardManagerSnapshot();
1356         assertNotNull("Expected ShardManagerSnapshot", snapshotBytes);
1357         snapshot = SerializationUtils.deserialize(snapshotBytes);
1358         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1359                 Sets.newHashSet(snapshot.getShardList()));
1360
1361         LOG.info("testRestoreFromSnapshot ending");
1362     }
1363
1364     @Test
1365     public void testAddShardReplicaForNonExistentShardConfig() throws Exception {
1366         new JavaTestKit(getSystem()) {{
1367             ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
1368                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1369
1370             shardManager.tell(new AddShardReplica("model-inventory"), getRef());
1371             Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class);
1372
1373             assertEquals("Failure obtained", true,
1374                           (resp.cause() instanceof IllegalArgumentException));
1375         }};
1376     }
1377
1378     @Test
1379     public void testAddShardReplica() throws Exception {
1380         LOG.info("testAddShardReplica starting");
1381
1382         MockConfiguration mockConfig =
1383                 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1384                    put("default", Arrays.asList("member-1", "member-2")).
1385                    put("astronauts", Arrays.asList("member-2")).build());
1386
1387         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1388
1389         // Create an ActorSystem ShardManager actor for member-1.
1390         final ActorSystem system1 = newActorSystem("Member1");
1391         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1392         ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1393         final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1394                 newTestShardMgrBuilder(mockConfig).shardActor(mockDefaultShardActor).cluster(
1395                         new ClusterWrapperImpl(system1)).props(), shardManagerID);
1396
1397         // Create an ActorSystem ShardManager actor for member-2.
1398         final ActorSystem system2 = newActorSystem("Member2");
1399         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1400
1401         String name = new ShardIdentifier("astronauts", "member-2", "config").toString();
1402         final TestActorRef<MockRespondActor> mockShardLeaderActor =
1403                 TestActorRef.create(system2, Props.create(MockRespondActor.class), name);
1404         final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1405                 newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor).cluster(
1406                         new ClusterWrapperImpl(system2)).props(), shardManagerID);
1407
1408         new JavaTestKit(system1) {{
1409
1410             newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1411             leaderShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1412
1413             leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1414
1415             String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
1416             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1417             leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2,
1418                     Optional.of(mock(DataTree.class)), leaderVersion), mockShardLeaderActor);
1419             leaderShardManager.tell(new RoleChangeNotification(memberId2,
1420                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardLeaderActor);
1421
1422             newReplicaShardManager.underlyingActor().waitForMemberUp();
1423             leaderShardManager.underlyingActor().waitForMemberUp();
1424
1425             //construct a mock response message
1426             AddServerReply response = new AddServerReply(ServerChangeStatus.OK, memberId2);
1427             mockShardLeaderActor.underlyingActor().updateResponse(response);
1428             newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
1429             AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1430                 AddServer.class);
1431             String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1432             assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
1433             newReplicaShardManager.underlyingActor()
1434                 .verifySnapshotPersisted(Sets.newHashSet("default", "astronauts"));
1435             expectMsgClass(duration("5 seconds"), Status.Success.class);
1436         }};
1437
1438         LOG.info("testAddShardReplica ending");
1439     }
1440
1441     @Test
1442     public void testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader() throws Exception {
1443         LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader starting");
1444         new JavaTestKit(getSystem()) {{
1445             TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1446                     newPropsShardMgrWithMockShardActor(), shardMgrID);
1447
1448             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1449             shardManager.tell(new ActorInitialized(), mockShardActor);
1450
1451             String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix;
1452             AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null);
1453             ActorRef leaderShardActor = shardManager.underlyingActor().getContext().actorOf(
1454                     Props.create(MockRespondActor.class, addServerReply), leaderId);
1455
1456             MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString());
1457
1458             String newReplicaId = "member-1-shard-default-" + shardMrgIDSuffix;
1459             shardManager.tell(new RoleChangeNotification(newReplicaId,
1460                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
1461             shardManager.tell(new ShardLeaderStateChanged(newReplicaId, leaderId, Optional.<DataTree>absent(),
1462                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
1463
1464             shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1465
1466             MessageCollectorActor.expectFirstMatching(leaderShardActor, AddServer.class);
1467
1468             Failure resp = expectMsgClass(duration("5 seconds"), Failure.class);
1469             assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1470
1471             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1472             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1473
1474             // Send message again to verify previous in progress state is cleared
1475
1476             shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1477             resp = expectMsgClass(duration("5 seconds"), Failure.class);
1478             assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1479
1480             // Send message again with an AddServer timeout to verify the pre-existing shard actor isn't terminated.
1481
1482             shardManager.tell(newDatastoreContextFactory(datastoreContextBuilder.
1483                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build()), getRef());
1484             leaderShardActor.tell(MockRespondActor.CLEAR_RESPONSE, ActorRef.noSender());
1485             shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1486             expectMsgClass(duration("5 seconds"), Failure.class);
1487
1488             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1489             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1490         }};
1491
1492         LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader ending");
1493     }
1494
1495     @Test
1496     public void testAddShardReplicaWithPreExistingLocalReplicaLeader() throws Exception {
1497         LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader starting");
1498         new JavaTestKit(getSystem()) {{
1499             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1500             ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1501
1502             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1503             shardManager.tell(new ActorInitialized(), mockShardActor);
1504             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mock(DataTree.class)),
1505                     DataStoreVersions.CURRENT_VERSION), getRef());
1506             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
1507                     RaftState.Leader.name())), mockShardActor);
1508
1509             shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1510             Failure resp = expectMsgClass(duration("5 seconds"), Failure.class);
1511             assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1512
1513             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1514             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1515         }};
1516
1517         LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader ending");
1518     }
1519
1520     @Test
1521     public void testAddShardReplicaWithAddServerReplyFailure() throws Exception {
1522         LOG.info("testAddShardReplicaWithAddServerReplyFailure starting");
1523         new JavaTestKit(getSystem()) {{
1524             JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
1525
1526             MockConfiguration mockConfig =
1527                     new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1528                        put("astronauts", Arrays.asList("member-2")).build());
1529
1530             ActorRef mockNewReplicaShardActor = newMockShardActor(getSystem(), "astronauts", "member-1");
1531             final TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1532                     newTestShardMgrBuilder(mockConfig).shardActor(mockNewReplicaShardActor).props(), shardMgrID);
1533             shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1534
1535             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1536
1537             JavaTestKit terminateWatcher = new JavaTestKit(getSystem());
1538             terminateWatcher.watch(mockNewReplicaShardActor);
1539
1540             shardManager.tell(new AddShardReplica("astronauts"), getRef());
1541
1542             AddServer addServerMsg = mockShardLeaderKit.expectMsgClass(AddServer.class);
1543             assertEquals("AddServer serverId", "member-1-shard-astronauts-" + shardMrgIDSuffix,
1544                     addServerMsg.getNewServerId());
1545             mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.TIMEOUT, null));
1546
1547             Failure failure = expectMsgClass(duration("5 seconds"), Failure.class);
1548             assertEquals("Failure cause", TimeoutException.class, failure.cause().getClass());
1549
1550             shardManager.tell(new FindLocalShard("astronauts", false), getRef());
1551             expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
1552
1553             terminateWatcher.expectTerminated(mockNewReplicaShardActor);
1554
1555             shardManager.tell(new AddShardReplica("astronauts"), getRef());
1556             mockShardLeaderKit.expectMsgClass(AddServer.class);
1557             mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.NO_LEADER, null));
1558             failure = expectMsgClass(duration("5 seconds"), Failure.class);
1559             assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass());
1560         }};
1561
1562         LOG.info("testAddShardReplicaWithAddServerReplyFailure ending");
1563     }
1564
1565     @Test
1566     public void testAddShardReplicaWithAlreadyInProgress() throws Exception {
1567         LOG.info("testAddShardReplicaWithAlreadyInProgress starting");
1568         new JavaTestKit(getSystem()) {{
1569             JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
1570             JavaTestKit secondRequestKit = new JavaTestKit(getSystem());
1571
1572             MockConfiguration mockConfig =
1573                     new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1574                        put("astronauts", Arrays.asList("member-2")).build());
1575
1576             final TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1577                     newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor).props(), shardMgrID);
1578             shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1579
1580             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1581
1582             shardManager.tell(new AddShardReplica("astronauts"), getRef());
1583
1584             mockShardLeaderKit.expectMsgClass(AddServer.class);
1585
1586             shardManager.tell(new AddShardReplica("astronauts"), secondRequestKit.getRef());
1587
1588             secondRequestKit.expectMsgClass(duration("5 seconds"), Failure.class);
1589         }};
1590
1591         LOG.info("testAddShardReplicaWithAlreadyInProgress ending");
1592     }
1593
1594     @Test
1595     public void testAddShardReplicaWithFindPrimaryTimeout() throws Exception {
1596         LOG.info("testAddShardReplicaWithFindPrimaryTimeout starting");
1597         new JavaTestKit(getSystem()) {{
1598             MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1599                        put("astronauts", Arrays.asList("member-2")).build());
1600
1601             final ActorRef newReplicaShardManager = actorFactory.createActor(newTestShardMgrBuilder(mockConfig).
1602                     shardActor(mockShardActor).props(), shardMgrID);
1603
1604             newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1605             MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2", getRef().path().toString());
1606
1607             newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
1608             Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class);
1609             assertEquals("Failure obtained", true,
1610                           (resp.cause() instanceof RuntimeException));
1611         }};
1612
1613         LOG.info("testAddShardReplicaWithFindPrimaryTimeout ending");
1614     }
1615
1616     @Test
1617     public void testRemoveShardReplicaForNonExistentShard() throws Exception {
1618         new JavaTestKit(getSystem()) {{
1619             ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
1620                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1621
1622             shardManager.tell(new RemoveShardReplica("model-inventory"), getRef());
1623             Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class);
1624             assertEquals("Failure obtained", true,
1625                          (resp.cause() instanceof IllegalArgumentException));
1626         }};
1627
1628     }
1629
1630     @Test
1631     public void testServerRemovedShardActorNotRunning() throws Exception {
1632         LOG.info("testServerRemovedShardActorNotRunning starting");
1633         new JavaTestKit(getSystem()) {{
1634             MockConfiguration mockConfig =
1635                     new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1636                             put("default", Arrays.asList("member-1", "member-2")).
1637                             put("astronauts", Arrays.asList("member-2")).
1638                             put("people", Arrays.asList("member-1", "member-2")).build());
1639
1640             TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig));
1641
1642             shardManager.underlyingActor().waitForRecoveryComplete();
1643             shardManager.tell(new FindLocalShard("people", false), getRef());
1644             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1645
1646             shardManager.tell(new FindLocalShard("default", false), getRef());
1647             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1648
1649             // Removed the default shard replica from member-1
1650             ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
1651             ShardIdentifier shardId = builder.shardName("default").memberName("member-1").type(shardMrgIDSuffix).build();
1652             shardManager.tell(new ServerRemoved(shardId.toString()), getRef());
1653
1654             shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1655         }};
1656
1657         LOG.info("testServerRemovedShardActorNotRunning ending");
1658     }
1659
1660     @Test
1661     public void testServerRemovedShardActorRunning() throws Exception {
1662         LOG.info("testServerRemovedShardActorRunning starting");
1663         new JavaTestKit(getSystem()) {{
1664             MockConfiguration mockConfig =
1665                     new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1666                             put("default", Arrays.asList("member-1", "member-2")).
1667                             put("astronauts", Arrays.asList("member-2")).
1668                             put("people", Arrays.asList("member-1", "member-2")).build());
1669
1670             String shardId = ShardIdentifier.builder().shardName("default").memberName("member-1").
1671                     type(shardMrgIDSuffix).build().toString();
1672             TestActorRef<MessageCollectorActor> shard = actorFactory.createTestActor(
1673                     MessageCollectorActor.props(), shardId);
1674
1675             TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1676                     newTestShardMgrBuilder(mockConfig).addShardActor("default", shard).props());
1677
1678             watch(shard);
1679
1680             shardManager.underlyingActor().waitForRecoveryComplete();
1681
1682             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1683             shardManager.tell(new ActorInitialized(), shard);
1684
1685             waitForShardInitialized(shardManager, "people", this);
1686             waitForShardInitialized(shardManager, "default", this);
1687
1688             // Removed the default shard replica from member-1
1689             shardManager.tell(new ServerRemoved(shardId), getRef());
1690
1691             shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1692
1693             expectMsgClass(duration("5 seconds"), Terminated.class);
1694         }};
1695
1696         LOG.info("testServerRemovedShardActorRunning ending");
1697     }
1698
1699
1700     @Test
1701     public void testShardPersistenceWithRestoredData() throws Exception {
1702         LOG.info("testShardPersistenceWithRestoredData starting");
1703         new JavaTestKit(getSystem()) {{
1704             MockConfiguration mockConfig =
1705                 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1706                    put("default", Arrays.asList("member-1", "member-2")).
1707                    put("astronauts", Arrays.asList("member-2")).
1708                    put("people", Arrays.asList("member-1", "member-2")).build());
1709             String[] restoredShards = {"default", "astronauts"};
1710             ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards));
1711             InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot);
1712
1713             //create shardManager to come up with restored data
1714             TestActorRef<TestShardManager> newRestoredShardManager = actorFactory.createTestActor(
1715                     newShardMgrProps(mockConfig));
1716
1717             newRestoredShardManager.underlyingActor().waitForRecoveryComplete();
1718
1719             newRestoredShardManager.tell(new FindLocalShard("people", false), getRef());
1720             LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
1721             assertEquals("for uninitialized shard", "people", notFound.getShardName());
1722
1723             //Verify a local shard is created for the restored shards,
1724             //although we expect a NotInitializedException for the shards as the actor initialization
1725             //message is not sent for them
1726             newRestoredShardManager.tell(new FindLocalShard("default", false), getRef());
1727             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1728
1729             newRestoredShardManager.tell(new FindLocalShard("astronauts", false), getRef());
1730             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1731         }};
1732
1733         LOG.info("testShardPersistenceWithRestoredData ending");
1734     }
1735
1736
1737     private static class TestShardManager extends ShardManager {
1738         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
1739         private final CountDownLatch snapshotPersist = new CountDownLatch(1);
1740         private ShardManagerSnapshot snapshot;
1741         private final Map<String, ActorRef> shardActors;
1742         private final ActorRef shardActor;
1743         private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
1744         private CountDownLatch memberUpReceived = new CountDownLatch(1);
1745         private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
1746         private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
1747         private CountDownLatch memberReachableReceived = new CountDownLatch(1);
1748         private volatile MessageInterceptor messageInterceptor;
1749
1750         private TestShardManager(Builder builder) {
1751             super(builder);
1752             shardActor = builder.shardActor;
1753             shardActors = builder.shardActors;
1754         }
1755
1756         @Override
1757         public void handleRecover(Object message) throws Exception {
1758             try {
1759                 super.handleRecover(message);
1760             } finally {
1761                 if(message instanceof RecoveryCompleted) {
1762                     recoveryComplete.countDown();
1763                 }
1764             }
1765         }
1766
1767         @Override
1768         public void handleCommand(Object message) throws Exception {
1769             try{
1770                 if(messageInterceptor != null && messageInterceptor.canIntercept(message)) {
1771                     getSender().tell(messageInterceptor.apply(message), getSelf());
1772                 } else {
1773                     super.handleCommand(message);
1774                 }
1775             } finally {
1776                 if(message instanceof FindPrimary) {
1777                     findPrimaryMessageReceived.countDown();
1778                 } else if(message instanceof ClusterEvent.MemberUp) {
1779                     String role = ((ClusterEvent.MemberUp)message).member().roles().head();
1780                     if(!getCluster().getCurrentMemberName().equals(role)) {
1781                         memberUpReceived.countDown();
1782                     }
1783                 } else if(message instanceof ClusterEvent.MemberRemoved) {
1784                     String role = ((ClusterEvent.MemberRemoved)message).member().roles().head();
1785                     if(!getCluster().getCurrentMemberName().equals(role)) {
1786                         memberRemovedReceived.countDown();
1787                     }
1788                 } else if(message instanceof ClusterEvent.UnreachableMember) {
1789                     String role = ((ClusterEvent.UnreachableMember)message).member().roles().head();
1790                     if(!getCluster().getCurrentMemberName().equals(role)) {
1791                         memberUnreachableReceived.countDown();
1792                     }
1793                 } else if(message instanceof ClusterEvent.ReachableMember) {
1794                     String role = ((ClusterEvent.ReachableMember)message).member().roles().head();
1795                     if(!getCluster().getCurrentMemberName().equals(role)) {
1796                         memberReachableReceived.countDown();
1797                     }
1798                 }
1799             }
1800         }
1801
1802         void setMessageInterceptor(MessageInterceptor messageInterceptor) {
1803             this.messageInterceptor = messageInterceptor;
1804         }
1805
1806         void waitForRecoveryComplete() {
1807             assertEquals("Recovery complete", true,
1808                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
1809         }
1810
1811         void waitForMemberUp() {
1812             assertEquals("MemberUp received", true,
1813                     Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
1814             memberUpReceived = new CountDownLatch(1);
1815         }
1816
1817         void waitForMemberRemoved() {
1818             assertEquals("MemberRemoved received", true,
1819                     Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
1820             memberRemovedReceived = new CountDownLatch(1);
1821         }
1822
1823         void waitForUnreachableMember() {
1824             assertEquals("UnreachableMember received", true,
1825                 Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS
1826                 ));
1827             memberUnreachableReceived = new CountDownLatch(1);
1828         }
1829
1830         void waitForReachableMember() {
1831             assertEquals("ReachableMember received", true,
1832                 Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
1833             memberReachableReceived = new CountDownLatch(1);
1834         }
1835
1836         void verifyFindPrimary() {
1837             assertEquals("FindPrimary received", true,
1838                     Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
1839             findPrimaryMessageReceived = new CountDownLatch(1);
1840         }
1841
1842         public static Builder builder(DatastoreContext.Builder datastoreContextBuilder) {
1843             return new Builder(datastoreContextBuilder);
1844         }
1845
1846         private static class Builder extends AbstractGenericBuilder<Builder, TestShardManager> {
1847             private ActorRef shardActor;
1848             private final Map<String, ActorRef> shardActors = new HashMap<>();
1849
1850             Builder(DatastoreContext.Builder datastoreContextBuilder) {
1851                 super(TestShardManager.class);
1852                 datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build()));
1853             }
1854
1855             Builder shardActor(ActorRef shardActor) {
1856                 this.shardActor = shardActor;
1857                 return this;
1858             }
1859
1860             Builder addShardActor(String shardName, ActorRef actorRef){
1861                 shardActors.put(shardName, actorRef);
1862                 return this;
1863             }
1864         }
1865
1866         @Override
1867         public void saveSnapshot(Object obj) {
1868             snapshot = (ShardManagerSnapshot) obj;
1869             snapshotPersist.countDown();
1870         }
1871
1872         void verifySnapshotPersisted(Set<String> shardList) {
1873             assertEquals("saveSnapshot invoked", true,
1874                     Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
1875             assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
1876         }
1877
1878         @Override
1879         protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
1880             if(shardActors.get(info.getShardName()) != null){
1881                 return shardActors.get(info.getShardName());
1882             }
1883
1884             if(shardActor != null) {
1885                 return shardActor;
1886             }
1887
1888             return super.newShardActor(schemaContext, info);
1889         }
1890     }
1891
1892     private static abstract class AbstractGenericBuilder<T extends AbstractGenericBuilder<T, ?>, C extends ShardManager>
1893                                                      extends ShardManager.AbstractBuilder<T> {
1894         private final Class<C> shardManagerClass;
1895
1896         AbstractGenericBuilder(Class<C> shardManagerClass) {
1897             this.shardManagerClass = shardManagerClass;
1898             cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).
1899                     waitTillReadyCountdownLatch(ready).primaryShardInfoCache(new PrimaryShardInfoFutureCache());
1900         }
1901
1902         @Override
1903         public Props props() {
1904             verify();
1905             return Props.create(shardManagerClass, this);
1906         }
1907     }
1908
1909     private static class GenericBuilder<C extends ShardManager> extends AbstractGenericBuilder<GenericBuilder<C>, C> {
1910         GenericBuilder(Class<C> shardManagerClass) {
1911             super(shardManagerClass);
1912         }
1913     }
1914
1915     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
1916         private static final long serialVersionUID = 1L;
1917         private final Creator<ShardManager> delegate;
1918
1919         public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
1920             this.delegate = delegate;
1921         }
1922
1923         @Override
1924         public ShardManager create() throws Exception {
1925             return delegate.create();
1926         }
1927     }
1928
1929     interface MessageInterceptor extends Function<Object, Object> {
1930         boolean canIntercept(Object message);
1931     }
1932
1933     private MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) {
1934         return new MessageInterceptor(){
1935             @Override
1936             public Object apply(Object message) {
1937                 return new RemotePrimaryShardFound(Serialization.serializedActorPath(primaryActor), (short) 1);
1938             }
1939
1940             @Override
1941             public boolean canIntercept(Object message) {
1942                 return message instanceof FindPrimary;
1943             }
1944         };
1945     }
1946
1947     private static class MockRespondActor extends MessageCollectorActor {
1948         static final String CLEAR_RESPONSE = "clear-response";
1949
1950         private volatile Object responseMsg;
1951
1952         @SuppressWarnings("unused")
1953         public MockRespondActor() {
1954         }
1955
1956         @SuppressWarnings("unused")
1957         public MockRespondActor(Object responseMsg) {
1958             this.responseMsg = responseMsg;
1959         }
1960
1961         public void updateResponse(Object response) {
1962             responseMsg = response;
1963         }
1964
1965         @Override
1966         public void onReceive(Object message) throws Exception {
1967             super.onReceive(message);
1968             if (message instanceof AddServer) {
1969                 if (responseMsg != null) {
1970                     getSender().tell(responseMsg, getSelf());
1971                 }
1972             } if(message.equals(CLEAR_RESPONSE)) {
1973                 responseMsg = null;
1974             }
1975         }
1976     }
1977 }