Move SwitchShardBehaviorMessage to shardmanager package
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManagerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.shardmanager;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.junit.Assert.fail;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.times;
20 import static org.mockito.Mockito.verify;
21 import akka.actor.ActorRef;
22 import akka.actor.ActorSystem;
23 import akka.actor.AddressFromURIString;
24 import akka.actor.Props;
25 import akka.actor.Status;
26 import akka.actor.Status.Failure;
27 import akka.actor.Status.Success;
28 import akka.cluster.Cluster;
29 import akka.cluster.ClusterEvent;
30 import akka.dispatch.Dispatchers;
31 import akka.japi.Creator;
32 import akka.pattern.Patterns;
33 import akka.persistence.RecoveryCompleted;
34 import akka.serialization.Serialization;
35 import akka.testkit.JavaTestKit;
36 import akka.testkit.TestActorRef;
37 import akka.util.Timeout;
38 import com.google.common.base.Function;
39 import com.google.common.base.Optional;
40 import com.google.common.base.Stopwatch;
41 import com.google.common.collect.ImmutableMap;
42 import com.google.common.collect.ImmutableSet;
43 import com.google.common.collect.Lists;
44 import com.google.common.collect.Sets;
45 import com.google.common.util.concurrent.Uninterruptibles;
46 import com.typesafe.config.ConfigFactory;
47 import java.net.URI;
48 import java.util.AbstractMap;
49 import java.util.ArrayList;
50 import java.util.Arrays;
51 import java.util.Collection;
52 import java.util.Collections;
53 import java.util.HashMap;
54 import java.util.List;
55 import java.util.Map;
56 import java.util.Map.Entry;
57 import java.util.Set;
58 import java.util.concurrent.CountDownLatch;
59 import java.util.concurrent.TimeUnit;
60 import java.util.concurrent.TimeoutException;
61 import org.apache.commons.lang3.SerializationUtils;
62 import org.junit.After;
63 import org.junit.Before;
64 import org.junit.Test;
65 import org.mockito.Mock;
66 import org.mockito.Mockito;
67 import org.mockito.MockitoAnnotations;
68 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
69 import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl;
70 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
71 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
72 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
73 import org.opendaylight.controller.cluster.datastore.Shard;
74 import org.opendaylight.controller.cluster.datastore.ShardManager.SchemaContextModules;
75 import org.opendaylight.controller.cluster.datastore.config.Configuration;
76 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
77 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
78 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
79 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
80 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
81 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
82 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
83 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
84 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
85 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
86 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
87 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
88 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
89 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
90 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
91 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
92 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
93 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
94 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
95 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
96 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
97 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
98 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
99 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
100 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
101 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
102 import org.opendaylight.controller.cluster.datastore.utils.ForwardingActor;
103 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
104 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
105 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
106 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
107 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
108 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
109 import org.opendaylight.controller.cluster.raft.RaftState;
110 import org.opendaylight.controller.cluster.raft.TestActorFactory;
111 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
112 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
113 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
114 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
115 import org.opendaylight.controller.cluster.raft.messages.AddServer;
116 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
117 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
118 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
119 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
120 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
121 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
122 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
123 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
124 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
125 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
126 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
127 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
128 import org.slf4j.Logger;
129 import org.slf4j.LoggerFactory;
130 import scala.concurrent.Await;
131 import scala.concurrent.Future;
132 import scala.concurrent.duration.FiniteDuration;
133
134 public class ShardManagerTest extends AbstractActorTest {
135     private static final Logger LOG = LoggerFactory.getLogger(ShardManagerTest.class);
136
137     private static int ID_COUNTER = 1;
138
139     private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
140     private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
141
142     @Mock
143     private static CountDownLatch ready;
144
145     private static TestActorRef<MessageCollectorActor> mockShardActor;
146
147     private static String mockShardName;
148
149     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
150             dataStoreName(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
151                    .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
152
153     private final Collection<ActorSystem> actorSystems = new ArrayList<>();
154
155     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
156
157     @Before
158     public void setUp() {
159         MockitoAnnotations.initMocks(this);
160
161         InMemoryJournal.clear();
162         InMemorySnapshotStore.clear();
163
164         if(mockShardActor == null) {
165             mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString();
166             mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), mockShardName);
167         }
168
169         mockShardActor.underlyingActor().clear();
170     }
171
172     @After
173     public void tearDown() {
174         InMemoryJournal.clear();
175         InMemorySnapshotStore.clear();
176
177         for(ActorSystem system: actorSystems) {
178             JavaTestKit.shutdownActorSystem(system, null, Boolean.TRUE);
179         }
180
181         actorFactory.close();
182     }
183
184     private ActorSystem newActorSystem(String config) {
185         ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(config));
186         actorSystems.add(system);
187         return system;
188     }
189
190     private ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
191         String name = new ShardIdentifier(shardName, memberName,"config").toString();
192         if(system == getSystem()) {
193             return actorFactory.createTestActor(Props.create(MessageCollectorActor.class), name);
194         }
195
196         return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
197     }
198
199     private Props newShardMgrProps() {
200         return newShardMgrProps(new MockConfiguration());
201     }
202
203     private static DatastoreContextFactory newDatastoreContextFactory(DatastoreContext datastoreContext) {
204         DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class);
205         Mockito.doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext();
206         Mockito.doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(Mockito.anyString());
207         return mockFactory;
208     }
209
210     private TestShardManager.Builder newTestShardMgrBuilder() {
211         return TestShardManager.builder(datastoreContextBuilder);
212     }
213
214     private TestShardManager.Builder newTestShardMgrBuilder(Configuration config) {
215         return TestShardManager.builder(datastoreContextBuilder).configuration(config);
216     }
217
218     private Props newShardMgrProps(Configuration config) {
219         return newTestShardMgrBuilder(config).props();
220     }
221
222     private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor() {
223         return newTestShardMgrBuilderWithMockShardActor(mockShardActor);
224     }
225
226     private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(ActorRef shardActor) {
227         return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor);
228     }
229
230
231     private Props newPropsShardMgrWithMockShardActor() {
232         return newTestShardMgrBuilderWithMockShardActor().props();
233     }
234
235     private Props newPropsShardMgrWithMockShardActor(ActorRef shardActor) {
236         return newTestShardMgrBuilderWithMockShardActor(shardActor).props();
237     }
238
239
240     private TestShardManager newTestShardManager() {
241         return newTestShardManager(newShardMgrProps());
242     }
243
244     private TestShardManager newTestShardManager(Props props) {
245         TestActorRef<TestShardManager> shardManagerActor = actorFactory.createTestActor(props);
246         TestShardManager shardManager = shardManagerActor.underlyingActor();
247         shardManager.waitForRecoveryComplete();
248         return shardManager;
249     }
250
251     private static void waitForShardInitialized(ActorRef shardManager, String shardName, JavaTestKit kit) {
252         AssertionError last = null;
253         Stopwatch sw = Stopwatch.createStarted();
254         while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
255             try {
256                 shardManager.tell(new FindLocalShard(shardName, true), kit.getRef());
257                 kit.expectMsgClass(LocalShardFound.class);
258                 return;
259             } catch(AssertionError e) {
260                 last = e;
261             }
262
263             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
264         }
265
266         throw last;
267     }
268
269     private static <T> T expectMsgClassOrFailure(Class<T> msgClass, JavaTestKit kit, String msg) {
270         Object reply = kit.expectMsgAnyClassOf(JavaTestKit.duration("5 sec"), msgClass, Failure.class);
271         if(reply instanceof Failure) {
272             throw new AssertionError(msg + " failed", ((Failure)reply).cause());
273         }
274
275         return (T)reply;
276     }
277
278     @Test
279     public void testPerShardDatastoreContext() throws Exception {
280         LOG.info("testPerShardDatastoreContext starting");
281         final DatastoreContextFactory mockFactory = newDatastoreContextFactory(
282                 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
283
284         Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
285                 shardElectionTimeoutFactor(6).build()).when(mockFactory).getShardDatastoreContext("default");
286
287         Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
288                 shardElectionTimeoutFactor(7).build()).when(mockFactory).getShardDatastoreContext("topology");
289
290         final MockConfiguration mockConfig = new MockConfiguration() {
291             @Override
292             public Collection<String> getMemberShardNames(String memberName) {
293                 return Arrays.asList("default", "topology");
294             }
295
296             @Override
297             public Collection<String> getMembersFromShardName(String shardName) {
298                 return Arrays.asList("member-1");
299             }
300         };
301
302         final TestActorRef<MessageCollectorActor> defaultShardActor = actorFactory.createTestActor(
303                 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("default"));
304         final TestActorRef<MessageCollectorActor> topologyShardActor = actorFactory.createTestActor(
305                 Props.create(MessageCollectorActor.class), actorFactory.generateActorId("topology"));
306
307         final Map<String, Entry<ActorRef, DatastoreContext>> shardInfoMap = Collections.synchronizedMap(
308                 new HashMap<String, Entry<ActorRef, DatastoreContext>>());
309         shardInfoMap.put("default", new AbstractMap.SimpleEntry<ActorRef, DatastoreContext>(defaultShardActor, null));
310         shardInfoMap.put("topology", new AbstractMap.SimpleEntry<ActorRef, DatastoreContext>(topologyShardActor, null));
311
312         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
313         final CountDownLatch newShardActorLatch = new CountDownLatch(2);
314         class LocalShardManager extends ShardManager {
315             public LocalShardManager(AbstractBuilder<?> builder) {
316                 super(builder);
317             }
318
319             @Override
320             protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
321                 Entry<ActorRef, DatastoreContext> entry = shardInfoMap.get(info.getShardName());
322                 ActorRef ref = null;
323                 if(entry != null) {
324                     ref = entry.getKey();
325                     entry.setValue(info.getDatastoreContext());
326                 }
327
328                 newShardActorLatch.countDown();
329                 return ref;
330             }
331         }
332
333         final Creator<ShardManager> creator = new Creator<ShardManager>() {
334             private static final long serialVersionUID = 1L;
335             @Override
336             public ShardManager create() throws Exception {
337                 return new LocalShardManager(new GenericBuilder<LocalShardManager>(LocalShardManager.class).
338                         datastoreContextFactory(mockFactory).primaryShardInfoCache(primaryShardInfoCache).
339                         configuration(mockConfig));
340             }
341         };
342
343         JavaTestKit kit = new JavaTestKit(getSystem());
344
345         final ActorRef shardManager = actorFactory.createActor(Props.create(
346                 new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()));
347
348         shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), kit.getRef());
349
350         assertEquals("Shard actors created", true, newShardActorLatch.await(5, TimeUnit.SECONDS));
351         assertEquals("getShardElectionTimeoutFactor", 6, shardInfoMap.get("default").getValue().
352                 getShardElectionTimeoutFactor());
353         assertEquals("getShardElectionTimeoutFactor", 7, shardInfoMap.get("topology").getValue().
354                 getShardElectionTimeoutFactor());
355
356         DatastoreContextFactory newMockFactory = newDatastoreContextFactory(
357                 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
358         Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
359                 shardElectionTimeoutFactor(66).build()).when(newMockFactory).getShardDatastoreContext("default");
360
361         Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
362                 shardElectionTimeoutFactor(77).build()).when(newMockFactory).getShardDatastoreContext("topology");
363
364         shardManager.tell(newMockFactory, kit.getRef());
365
366         DatastoreContext newContext = MessageCollectorActor.expectFirstMatching(defaultShardActor, DatastoreContext.class);
367         assertEquals("getShardElectionTimeoutFactor", 66, newContext.getShardElectionTimeoutFactor());
368
369         newContext = MessageCollectorActor.expectFirstMatching(topologyShardActor, DatastoreContext.class);
370         assertEquals("getShardElectionTimeoutFactor", 77, newContext.getShardElectionTimeoutFactor());
371
372         LOG.info("testPerShardDatastoreContext ending");
373     }
374
375     @Test
376     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
377         new JavaTestKit(getSystem()) {{
378             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
379
380             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
381
382             shardManager.tell(new FindPrimary("non-existent", false), getRef());
383
384             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
385         }};
386     }
387
388     @Test
389     public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception {
390         LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard starting");
391         new JavaTestKit(getSystem()) {{
392             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
393
394             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
395
396             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
397             shardManager.tell(new ActorInitialized(), mockShardActor);
398
399             DataTree mockDataTree = mock(DataTree.class);
400             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
401                     DataStoreVersions.CURRENT_VERSION), getRef());
402
403             MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
404             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
405                     RaftState.Leader.name())), mockShardActor);
406
407             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
408
409             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
410             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
411                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
412             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
413         }};
414
415         LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard ending");
416     }
417
418     @Test
419     public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() throws Exception {
420         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp starting");
421         new JavaTestKit(getSystem()) {{
422             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
423
424             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
425             shardManager.tell(new ActorInitialized(), mockShardActor);
426
427             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
428             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
429             shardManager.tell(new RoleChangeNotification(memberId1,
430                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
431             shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION), mockShardActor);
432
433             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
434
435             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
436         }};
437
438         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp ending");
439     }
440
441     @Test
442     public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception {
443         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard starting");
444         new JavaTestKit(getSystem()) {{
445             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
446
447             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
448             shardManager.tell(new ActorInitialized(), mockShardActor);
449
450             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
451             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
452
453             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
454             shardManager.tell(new RoleChangeNotification(memberId1,
455                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
456             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
457             shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, Optional.<DataTree>absent(),
458                     leaderVersion), mockShardActor);
459
460             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
461
462             RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
463             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
464                     primaryFound.getPrimaryPath().contains("member-2-shard-default"));
465             assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
466         }};
467
468         LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard ending");
469     }
470
471     @Test
472     public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception {
473         new JavaTestKit(getSystem()) {{
474             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
475
476             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
477
478             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
479         }};
480     }
481
482     @Test
483     public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception {
484         new JavaTestKit(getSystem()) {{
485             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
486
487             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
488             shardManager.tell(new ActorInitialized(), mockShardActor);
489
490             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
491
492             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
493         }};
494     }
495
496     @Test
497     public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception {
498         LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
499         new JavaTestKit(getSystem()) {{
500             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
501
502             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
503             shardManager.tell(new ActorInitialized(), mockShardActor);
504
505             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
506             shardManager.tell(new RoleChangeNotification(memberId,
507                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
508
509             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
510
511             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
512
513             DataTree mockDataTree = mock(DataTree.class);
514             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
515                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
516
517             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
518
519             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
520             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
521                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
522             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
523         }};
524
525         LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting");
526     }
527
528     @Test
529     public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
530         LOG.info("testOnReceiveFindPrimaryWaitForShardLeader starting");
531         datastoreContextBuilder.shardInitializationTimeout(10, TimeUnit.SECONDS);
532         new JavaTestKit(getSystem()) {{
533             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
534
535             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
536
537             // We're passing waitUntilInitialized = true to FindPrimary so the response should be
538             // delayed until we send ActorInitialized and RoleChangeNotification.
539             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
540
541             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
542
543             shardManager.tell(new ActorInitialized(), mockShardActor);
544
545             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
546
547             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
548             shardManager.tell(new RoleChangeNotification(memberId,
549                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor);
550
551             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
552
553             DataTree mockDataTree = mock(DataTree.class);
554             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
555                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
556
557             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
558             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
559                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
560             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
561
562             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
563         }};
564
565         LOG.info("testOnReceiveFindPrimaryWaitForShardLeader ending");
566     }
567
568     @Test
569     public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception {
570         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard starting");
571         new JavaTestKit(getSystem()) {{
572             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
573
574             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
575
576             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
577
578             expectMsgClass(duration("2 seconds"), NotInitializedException.class);
579
580             shardManager.tell(new ActorInitialized(), mockShardActor);
581
582             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
583         }};
584
585         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard ending");
586     }
587
588     @Test
589     public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception {
590         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard starting");
591         new JavaTestKit(getSystem()) {{
592             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
593
594             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
595             shardManager.tell(new ActorInitialized(), mockShardActor);
596             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
597                     null, RaftState.Candidate.name()), mockShardActor);
598
599             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
600
601             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
602         }};
603
604         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard ending");
605     }
606
607     @Test
608     public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() throws Exception {
609         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard starting");
610         new JavaTestKit(getSystem()) {{
611             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
612
613             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
614             shardManager.tell(new ActorInitialized(), mockShardActor);
615             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
616                     null, RaftState.IsolatedLeader.name()), mockShardActor);
617
618             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
619
620             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
621         }};
622
623         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard ending");
624     }
625
626     @Test
627     public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
628         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard starting");
629         new JavaTestKit(getSystem()) {{
630             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
631
632             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
633             shardManager.tell(new ActorInitialized(), mockShardActor);
634
635             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
636
637             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
638         }};
639
640         LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard ending");
641     }
642
643     @Test
644     public void testOnReceiveFindPrimaryForRemoteShard() throws Exception {
645         LOG.info("testOnReceiveFindPrimaryForRemoteShard starting");
646         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
647
648         // Create an ActorSystem ShardManager actor for member-1.
649
650         final ActorSystem system1 = newActorSystem("Member1");
651         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
652
653         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
654                 newTestShardMgrBuilderWithMockShardActor().cluster(
655                         new ClusterWrapperImpl(system1)).props().withDispatcher(
656                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
657
658         // Create an ActorSystem ShardManager actor for member-2.
659
660         final ActorSystem system2 = newActorSystem("Member2");
661
662         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
663
664         final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
665
666         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
667                 put("default", Arrays.asList("member-1", "member-2")).
668                 put("astronauts", Arrays.asList("member-2")).build());
669
670         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
671                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
672                         new ClusterWrapperImpl(system2)).props().withDispatcher(
673                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
674
675         new JavaTestKit(system1) {{
676
677             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
678             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
679
680             shardManager2.tell(new ActorInitialized(), mockShardActor2);
681
682             String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
683             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
684             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2,
685                     Optional.of(mock(DataTree.class)), leaderVersion), mockShardActor2);
686             shardManager2.tell(new RoleChangeNotification(memberId2,
687                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
688
689             shardManager1.underlyingActor().waitForMemberUp();
690
691             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
692
693             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
694             String path = found.getPrimaryPath();
695             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
696             assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
697
698             shardManager2.underlyingActor().verifyFindPrimary();
699
700             Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
701
702             shardManager1.underlyingActor().waitForMemberRemoved();
703
704             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
705
706             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
707         }};
708
709         LOG.info("testOnReceiveFindPrimaryForRemoteShard ending");
710     }
711
712     @Test
713     public void testShardAvailabilityOnChangeOfMemberReachability() throws Exception {
714         LOG.info("testShardAvailabilityOnChangeOfMemberReachability starting");
715         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
716
717         // Create an ActorSystem ShardManager actor for member-1.
718
719         final ActorSystem system1 = newActorSystem("Member1");
720         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
721
722         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
723
724         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
725                 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(
726                         new ClusterWrapperImpl(system1)).props().withDispatcher(
727                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
728
729         // Create an ActorSystem ShardManager actor for member-2.
730
731         final ActorSystem system2 = newActorSystem("Member2");
732
733         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
734
735         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
736
737         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
738             put("default", Arrays.asList("member-1", "member-2")).build());
739
740         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
741                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
742                         new ClusterWrapperImpl(system2)).props().withDispatcher(
743                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
744
745         new JavaTestKit(system1) {{
746
747             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
748             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
749             shardManager1.tell(new ActorInitialized(), mockShardActor1);
750             shardManager2.tell(new ActorInitialized(), mockShardActor2);
751
752             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
753             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
754             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
755                 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
756             shardManager1.tell(new RoleChangeNotification(memberId1,
757                 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
758             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)),
759                     DataStoreVersions.CURRENT_VERSION),
760                 mockShardActor2);
761             shardManager2.tell(new RoleChangeNotification(memberId2,
762                 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
763             shardManager1.underlyingActor().waitForMemberUp();
764
765             shardManager1.tell(new FindPrimary("default", true), getRef());
766
767             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
768             String path = found.getPrimaryPath();
769             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
770
771             shardManager1.tell(MockClusterWrapper.
772                 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
773
774             shardManager1.underlyingActor().waitForUnreachableMember();
775
776             PeerDown peerDown = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
777             assertEquals("getMemberName", "member-2", peerDown.getMemberName());
778             MessageCollectorActor.clearMessages(mockShardActor1);
779
780             shardManager1.tell(MockClusterWrapper.
781                     createMemberRemoved("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
782
783             MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
784
785             shardManager1.tell(new FindPrimary("default", true), getRef());
786
787             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
788
789             shardManager1.tell(MockClusterWrapper.
790                 createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
791
792             shardManager1.underlyingActor().waitForReachableMember();
793
794             PeerUp peerUp = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
795             assertEquals("getMemberName", "member-2", peerUp.getMemberName());
796             MessageCollectorActor.clearMessages(mockShardActor1);
797
798             shardManager1.tell(new FindPrimary("default", true), getRef());
799
800             RemotePrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
801             String path1 = found1.getPrimaryPath();
802             assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
803
804             shardManager1.tell(MockClusterWrapper.
805                     createMemberUp("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
806
807             MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
808
809             // Test FindPrimary wait succeeds after reachable member event.
810
811             shardManager1.tell(MockClusterWrapper.
812                     createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
813             shardManager1.underlyingActor().waitForUnreachableMember();
814
815             shardManager1.tell(new FindPrimary("default", true), getRef());
816
817             shardManager1.tell(MockClusterWrapper.
818                     createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
819
820             RemotePrimaryShardFound found2 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
821             String path2 = found2.getPrimaryPath();
822             assertTrue("Unexpected primary path " + path2, path2.contains("member-2-shard-default-config"));
823         }};
824
825         LOG.info("testShardAvailabilityOnChangeOfMemberReachability ending");
826     }
827
828     @Test
829     public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() throws Exception {
830         LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange starting");
831         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
832
833         // Create an ActorSystem ShardManager actor for member-1.
834
835         final ActorSystem system1 = newActorSystem("Member1");
836         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
837
838         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
839
840         final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
841         final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
842                 newTestShardMgrBuilder().shardActor(mockShardActor1).cluster(
843                         new ClusterWrapperImpl(system1)).primaryShardInfoCache(primaryShardInfoCache).props().
844                             withDispatcher(Dispatchers.DefaultDispatcherId()), shardManagerID);
845
846         // Create an ActorSystem ShardManager actor for member-2.
847
848         final ActorSystem system2 = newActorSystem("Member2");
849
850         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
851
852         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
853
854         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
855             put("default", Arrays.asList("member-1", "member-2")).build());
856
857         final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
858                 newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster(
859                         new ClusterWrapperImpl(system2)).props().withDispatcher(
860                                 Dispatchers.DefaultDispatcherId()), shardManagerID);
861
862         new JavaTestKit(system1) {{
863             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
864             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
865             shardManager1.tell(new ActorInitialized(), mockShardActor1);
866             shardManager2.tell(new ActorInitialized(), mockShardActor2);
867
868             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
869             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
870             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
871                 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
872             shardManager1.tell(new RoleChangeNotification(memberId1,
873                 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
874             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)),
875                     DataStoreVersions.CURRENT_VERSION),
876                 mockShardActor2);
877             shardManager2.tell(new RoleChangeNotification(memberId2,
878                 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
879             shardManager1.underlyingActor().waitForMemberUp();
880
881             shardManager1.tell(new FindPrimary("default", true), getRef());
882
883             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
884             String path = found.getPrimaryPath();
885             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
886
887             primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(system1.actorSelection(
888                     mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION, Optional.<DataTree>absent()));
889
890             shardManager1.tell(MockClusterWrapper.
891                 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
892
893             shardManager1.underlyingActor().waitForUnreachableMember();
894
895             shardManager1.tell(new FindPrimary("default", true), getRef());
896
897             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
898
899             assertNull("Expected primaryShardInfoCache entry removed", primaryShardInfoCache.getIfPresent("default"));
900
901             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, Optional.of(mock(DataTree.class)),
902                     DataStoreVersions.CURRENT_VERSION), mockShardActor1);
903             shardManager1.tell(new RoleChangeNotification(memberId1,
904                 RaftState.Follower.name(), RaftState.Leader.name()), mockShardActor1);
905
906             shardManager1.tell(new FindPrimary("default", true), getRef());
907
908             LocalPrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
909             String path1 = found1.getPrimaryPath();
910             assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
911
912         }};
913
914         LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange ending");
915     }
916
917
918     @Test
919     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
920         new JavaTestKit(getSystem()) {{
921             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
922
923             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
924
925             shardManager.tell(new FindLocalShard("non-existent", false), getRef());
926
927             LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
928
929             assertEquals("getShardName", "non-existent", notFound.getShardName());
930         }};
931     }
932
933     @Test
934     public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
935         new JavaTestKit(getSystem()) {{
936             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
937
938             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
939             shardManager.tell(new ActorInitialized(), mockShardActor);
940
941             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
942
943             LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
944
945             assertTrue("Found path contains " + found.getPath().path().toString(),
946                     found.getPath().path().toString().contains("member-1-shard-default-config"));
947         }};
948     }
949
950     @Test
951     public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
952         new JavaTestKit(getSystem()) {{
953             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
954
955             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
956
957             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
958         }};
959     }
960
961     @Test
962     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
963         LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
964         new JavaTestKit(getSystem()) {{
965             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
966
967             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
968
969             // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
970             // delayed until we send ActorInitialized.
971             Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
972                     new Timeout(5, TimeUnit.SECONDS));
973
974             shardManager.tell(new ActorInitialized(), mockShardActor);
975
976             Object resp = Await.result(future, duration("5 seconds"));
977             assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
978         }};
979
980         LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting");
981     }
982
983     @Test
984     public void testOnRecoveryJournalIsCleaned() {
985         String persistenceID = "shard-manager-" + shardMrgIDSuffix;
986         InMemoryJournal.addEntry(persistenceID, 1L, new SchemaContextModules(ImmutableSet.of("foo")));
987         InMemoryJournal.addEntry(persistenceID, 2L, new SchemaContextModules(ImmutableSet.of("bar")));
988         InMemoryJournal.addDeleteMessagesCompleteLatch(persistenceID);
989
990         TestShardManager shardManager = newTestShardManager();
991
992         InMemoryJournal.waitForDeleteMessagesComplete(persistenceID);
993
994         // Journal entries up to the last one should've been deleted
995         Map<Long, Object> journal = InMemoryJournal.get(persistenceID);
996         synchronized (journal) {
997             assertEquals("Journal size", 0, journal.size());
998         }
999     }
1000
1001     @Test
1002     public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
1003         TestShardManager shardManager = newTestShardManager();
1004
1005         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1006         shardManager.onReceiveCommand(new RoleChangeNotification(
1007                 memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
1008
1009         verify(ready, never()).countDown();
1010
1011         shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
1012                 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION));
1013
1014         verify(ready, times(1)).countDown();
1015     }
1016
1017     @Test
1018     public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
1019         new JavaTestKit(getSystem()) {{
1020             TestShardManager shardManager = newTestShardManager();
1021
1022             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1023             shardManager.onReceiveCommand(new RoleChangeNotification(
1024                     memberId, null, RaftState.Follower.name()));
1025
1026             verify(ready, never()).countDown();
1027
1028             shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
1029
1030             shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId,
1031                     "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
1032                     DataStoreVersions.CURRENT_VERSION));
1033
1034             verify(ready, times(1)).countDown();
1035         }};
1036     }
1037
1038     @Test
1039     public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
1040         new JavaTestKit(getSystem()) {{
1041             TestShardManager shardManager = newTestShardManager();
1042
1043             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1044             shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
1045
1046             verify(ready, never()).countDown();
1047
1048             shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId,
1049                     "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
1050                     DataStoreVersions.CURRENT_VERSION));
1051
1052             shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
1053
1054             verify(ready, times(1)).countDown();
1055         }};
1056     }
1057
1058     @Test
1059     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
1060         TestShardManager shardManager = newTestShardManager();
1061
1062         shardManager.onReceiveCommand(new RoleChangeNotification(
1063                 "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
1064
1065         verify(ready, never()).countDown();
1066     }
1067
1068     @Test
1069     public void testByDefaultSyncStatusIsFalse() throws Exception{
1070         TestShardManager shardManager = newTestShardManager();
1071
1072         assertEquals(false, shardManager.getMBean().getSyncStatus());
1073     }
1074
1075     @Test
1076     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
1077         TestShardManager shardManager = newTestShardManager();
1078
1079         shardManager.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
1080                 RaftState.Follower.name(), RaftState.Leader.name()));
1081
1082         assertEquals(true, shardManager.getMBean().getSyncStatus());
1083     }
1084
1085     @Test
1086     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
1087         TestShardManager shardManager = newTestShardManager();
1088
1089         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1090         shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
1091                 RaftState.Follower.name(), RaftState.Candidate.name()));
1092
1093         assertEquals(false, shardManager.getMBean().getSyncStatus());
1094
1095         // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
1096         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(
1097                 true, shardId));
1098
1099         assertEquals(false, shardManager.getMBean().getSyncStatus());
1100     }
1101
1102     @Test
1103     public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
1104         TestShardManager shardManager = newTestShardManager();
1105
1106         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
1107         shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
1108                 RaftState.Candidate.name(), RaftState.Follower.name()));
1109
1110         // Initially will be false
1111         assertEquals(false, shardManager.getMBean().getSyncStatus());
1112
1113         // Send status true will make sync status true
1114         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, shardId));
1115
1116         assertEquals(true, shardManager.getMBean().getSyncStatus());
1117
1118         // Send status false will make sync status false
1119         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(false, shardId));
1120
1121         assertEquals(false, shardManager.getMBean().getSyncStatus());
1122
1123     }
1124
1125     @Test
1126     public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
1127         LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards starting");
1128         TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
1129             @Override
1130             public List<String> getMemberShardNames(String memberName) {
1131                 return Arrays.asList("default", "astronauts");
1132             }
1133         }));
1134
1135         // Initially will be false
1136         assertEquals(false, shardManager.getMBean().getSyncStatus());
1137
1138         // Make default shard leader
1139         String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix;
1140         shardManager.onReceiveCommand(new RoleChangeNotification(defaultShardId,
1141                 RaftState.Follower.name(), RaftState.Leader.name()));
1142
1143         // default = Leader, astronauts is unknown so sync status remains false
1144         assertEquals(false, shardManager.getMBean().getSyncStatus());
1145
1146         // Make astronauts shard leader as well
1147         String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1148         shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
1149                 RaftState.Follower.name(), RaftState.Leader.name()));
1150
1151         // Now sync status should be true
1152         assertEquals(true, shardManager.getMBean().getSyncStatus());
1153
1154         // Make astronauts a Follower
1155         shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
1156                 RaftState.Leader.name(), RaftState.Follower.name()));
1157
1158         // Sync status is not true
1159         assertEquals(false, shardManager.getMBean().getSyncStatus());
1160
1161         // Make the astronauts follower sync status true
1162         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
1163
1164         // Sync status is now true
1165         assertEquals(true, shardManager.getMBean().getSyncStatus());
1166
1167         LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards ending");
1168     }
1169
1170     @Test
1171     public void testOnReceiveSwitchShardBehavior() throws Exception {
1172         new JavaTestKit(getSystem()) {{
1173             final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1174
1175             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1176             shardManager.tell(new ActorInitialized(), mockShardActor);
1177
1178             shardManager.tell(new SwitchShardBehavior(mockShardName, RaftState.Leader, 1000), getRef());
1179
1180             SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor, SwitchBehavior.class);
1181
1182             assertEquals(RaftState.Leader, switchBehavior.getNewState());
1183             assertEquals(1000, switchBehavior.getNewTerm());
1184         }};
1185     }
1186
1187     @Test
1188     public void testOnCreateShard() {
1189         LOG.info("testOnCreateShard starting");
1190         new JavaTestKit(getSystem()) {{
1191             datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1192
1193             ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
1194                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1195
1196             SchemaContext schemaContext = TestModel.createTestContext();
1197             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1198
1199             DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100).
1200                     persistent(false).build();
1201             Shard.Builder shardBuilder = Shard.builder();
1202
1203             ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1204                     "foo", null, Arrays.asList("member-1", "member-5", "member-6"));
1205             shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), getRef());
1206
1207             expectMsgClass(duration("5 seconds"), Success.class);
1208
1209             shardManager.tell(new FindLocalShard("foo", true), getRef());
1210
1211             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1212
1213             assertEquals("isRecoveryApplicable", false, shardBuilder.getDatastoreContext().isPersistent());
1214             assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig().
1215                     getPeerAddressResolver() instanceof ShardPeerAddressResolver);
1216             assertEquals("peerMembers", Sets.newHashSet(new ShardIdentifier("foo", "member-5", shardMrgIDSuffix).toString(),
1217                     new ShardIdentifier("foo", "member-6", shardMrgIDSuffix).toString()),
1218                     shardBuilder.getPeerAddresses().keySet());
1219             assertEquals("ShardIdentifier", new ShardIdentifier("foo", "member-1", shardMrgIDSuffix),
1220                     shardBuilder.getId());
1221             assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1222
1223             // Send CreateShard with same name - should return Success with a message.
1224
1225             shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1226
1227             Success success = expectMsgClass(duration("5 seconds"), Success.class);
1228             assertNotNull("Success status is null", success.status());
1229         }};
1230
1231         LOG.info("testOnCreateShard ending");
1232     }
1233
1234     @Test
1235     public void testOnCreateShardWithLocalMemberNotInShardConfig() {
1236         LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig starting");
1237         new JavaTestKit(getSystem()) {{
1238             datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1239
1240             ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
1241                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1242
1243             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1244
1245             Shard.Builder shardBuilder = Shard.builder();
1246             ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1247                     "foo", null, Arrays.asList("member-5", "member-6"));
1248
1249             shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1250             expectMsgClass(duration("5 seconds"), Success.class);
1251
1252             shardManager.tell(new FindLocalShard("foo", true), getRef());
1253             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1254
1255             assertEquals("peerMembers size", 0, shardBuilder.getPeerAddresses().size());
1256             assertEquals("schemaContext", DisableElectionsRaftPolicy.class.getName(),
1257                     shardBuilder.getDatastoreContext().getShardRaftConfig().getCustomRaftPolicyImplementationClass());
1258         }};
1259
1260         LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig ending");
1261     }
1262
1263     @Test
1264     public void testOnCreateShardWithNoInitialSchemaContext() {
1265         LOG.info("testOnCreateShardWithNoInitialSchemaContext starting");
1266         new JavaTestKit(getSystem()) {{
1267             ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
1268                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1269
1270             Shard.Builder shardBuilder = Shard.builder();
1271
1272             ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1273                     "foo", null, Arrays.asList("member-1"));
1274             shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1275
1276             expectMsgClass(duration("5 seconds"), Success.class);
1277
1278             SchemaContext schemaContext = TestModel.createTestContext();
1279             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1280
1281             shardManager.tell(new FindLocalShard("foo", true), getRef());
1282
1283             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1284
1285             assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1286             assertNotNull("schemaContext is null", shardBuilder.getDatastoreContext());
1287         }};
1288
1289         LOG.info("testOnCreateShardWithNoInitialSchemaContext ending");
1290     }
1291
1292     @Test
1293     public void testGetSnapshot() throws Throwable {
1294         LOG.info("testGetSnapshot starting");
1295         JavaTestKit kit = new JavaTestKit(getSystem());
1296
1297         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1298                    put("shard1", Arrays.asList("member-1")).
1299                    put("shard2", Arrays.asList("member-1")).
1300                    put("astronauts", Collections.<String>emptyList()).build());
1301
1302         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig).
1303                 withDispatcher(Dispatchers.DefaultDispatcherId()));
1304
1305         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1306         Failure failure = kit.expectMsgClass(Failure.class);
1307         assertEquals("Failure cause type", IllegalStateException.class, failure.cause().getClass());
1308
1309         shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1310
1311         waitForShardInitialized(shardManager, "shard1", kit);
1312         waitForShardInitialized(shardManager, "shard2", kit);
1313
1314         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1315
1316         DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1317
1318         assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1319         assertNull("Expected null ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
1320
1321         Function<ShardSnapshot, String> shardNameTransformer = new Function<ShardSnapshot, String>() {
1322             @Override
1323             public String apply(ShardSnapshot s) {
1324                 return s.getName();
1325             }
1326         };
1327
1328         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), Sets.newHashSet(
1329                 Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer)));
1330
1331         // Add a new replica
1332
1333         JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
1334
1335         TestShardManager shardManagerInstance = shardManager.underlyingActor();
1336         shardManagerInstance.setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1337
1338         shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
1339         mockShardLeaderKit.expectMsgClass(AddServer.class);
1340         mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.OK, ""));
1341         kit.expectMsgClass(Status.Success.class);
1342         waitForShardInitialized(shardManager, "astronauts", kit);
1343
1344         // Send another GetSnapshot and verify
1345
1346         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1347         datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1348
1349         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"), Sets.newHashSet(
1350                 Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer)));
1351
1352         byte[] snapshotBytes = datastoreSnapshot.getShardManagerSnapshot();
1353         assertNotNull("Expected ShardManagerSnapshot", snapshotBytes);
1354         ShardManagerSnapshot snapshot = SerializationUtils.deserialize(snapshotBytes);
1355         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1356                 Sets.newHashSet(snapshot.getShardList()));
1357
1358         LOG.info("testGetSnapshot ending");
1359     }
1360
1361     @Test
1362     public void testRestoreFromSnapshot() throws Throwable {
1363         LOG.info("testRestoreFromSnapshot starting");
1364
1365         datastoreContextBuilder.shardInitializationTimeout(3, TimeUnit.SECONDS);
1366
1367         JavaTestKit kit = new JavaTestKit(getSystem());
1368
1369         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1370                    put("shard1", Collections.<String>emptyList()).
1371                    put("shard2", Collections.<String>emptyList()).
1372                    put("astronauts", Collections.<String>emptyList()).build());
1373
1374
1375         ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts"));
1376         DatastoreSnapshot restoreFromSnapshot = new DatastoreSnapshot(shardMrgIDSuffix,
1377                 SerializationUtils.serialize(snapshot), Collections.<ShardSnapshot>emptyList());
1378         TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(mockConfig).
1379                 restoreFromSnapshot(restoreFromSnapshot).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
1380
1381         shardManager.underlyingActor().waitForRecoveryComplete();
1382
1383         shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1384
1385         waitForShardInitialized(shardManager, "shard1", kit);
1386         waitForShardInitialized(shardManager, "shard2", kit);
1387         waitForShardInitialized(shardManager, "astronauts", kit);
1388
1389         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1390
1391         DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
1392
1393         assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1394
1395         byte[] snapshotBytes = datastoreSnapshot.getShardManagerSnapshot();
1396         assertNotNull("Expected ShardManagerSnapshot", snapshotBytes);
1397         snapshot = SerializationUtils.deserialize(snapshotBytes);
1398         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"),
1399                 Sets.newHashSet(snapshot.getShardList()));
1400
1401         LOG.info("testRestoreFromSnapshot ending");
1402     }
1403
1404     @Test
1405     public void testAddShardReplicaForNonExistentShardConfig() throws Exception {
1406         new JavaTestKit(getSystem()) {{
1407             ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
1408                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1409
1410             shardManager.tell(new AddShardReplica("model-inventory"), getRef());
1411             Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class);
1412
1413             assertEquals("Failure obtained", true,
1414                           (resp.cause() instanceof IllegalArgumentException));
1415         }};
1416     }
1417
1418     @Test
1419     public void testAddShardReplica() throws Exception {
1420         LOG.info("testAddShardReplica starting");
1421         MockConfiguration mockConfig =
1422                 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1423                    put("default", Arrays.asList("member-1", "member-2")).
1424                    put("astronauts", Arrays.asList("member-2")).build());
1425
1426         final String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1427         datastoreContextBuilder.shardManagerPersistenceId(shardManagerID);
1428
1429         // Create an ActorSystem ShardManager actor for member-1.
1430         final ActorSystem system1 = newActorSystem("Member1");
1431         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1432         ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1433         final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1434                 newTestShardMgrBuilder(mockConfig).shardActor(mockDefaultShardActor).cluster(
1435                         new ClusterWrapperImpl(system1)).props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardManagerID);
1436
1437         // Create an ActorSystem ShardManager actor for member-2.
1438         final ActorSystem system2 = newActorSystem("Member2");
1439         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1440
1441         String name = new ShardIdentifier("astronauts", "member-2", "config").toString();
1442         final TestActorRef<MockRespondActor> mockShardLeaderActor =
1443                 TestActorRef.create(system2, Props.create(MockRespondActor.class).
1444                         withDispatcher(Dispatchers.DefaultDispatcherId()), name);
1445         final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1446                 newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor).cluster(
1447                         new ClusterWrapperImpl(system2)).props().
1448                             withDispatcher(Dispatchers.DefaultDispatcherId()), shardManagerID);
1449
1450         new JavaTestKit(system1) {{
1451
1452             newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1453             leaderShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1454
1455             leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1456
1457             String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
1458             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1459             leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2,
1460                     Optional.of(mock(DataTree.class)), leaderVersion), mockShardLeaderActor);
1461             leaderShardManager.tell(new RoleChangeNotification(memberId2,
1462                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardLeaderActor);
1463
1464             newReplicaShardManager.underlyingActor().waitForMemberUp();
1465             leaderShardManager.underlyingActor().waitForMemberUp();
1466
1467             //Have a dummy snapshot to be overwritten by the new data persisted.
1468             String[] restoredShards = {"default", "people"};
1469             ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards));
1470             InMemorySnapshotStore.addSnapshot(shardManagerID, snapshot);
1471             Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS);
1472
1473             InMemorySnapshotStore.addSnapshotSavedLatch(shardManagerID);
1474             InMemorySnapshotStore.addSnapshotDeletedLatch(shardManagerID);
1475
1476             //construct a mock response message
1477             AddServerReply response = new AddServerReply(ServerChangeStatus.OK, memberId2);
1478             mockShardLeaderActor.underlyingActor().updateResponse(response);
1479             newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
1480             AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1481                 AddServer.class);
1482             String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1483             assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
1484             expectMsgClass(duration("5 seconds"), Status.Success.class);
1485
1486             InMemorySnapshotStore.waitForSavedSnapshot(shardManagerID, ShardManagerSnapshot.class);
1487             InMemorySnapshotStore.waitForDeletedSnapshot(shardManagerID);
1488             List<ShardManagerSnapshot> persistedSnapshots =
1489                 InMemorySnapshotStore.getSnapshots(shardManagerID, ShardManagerSnapshot.class);
1490             assertEquals("Number of snapshots persisted", 1, persistedSnapshots.size());
1491             ShardManagerSnapshot shardManagerSnapshot = persistedSnapshots.get(0);
1492             assertEquals("Persisted local shards", Sets.newHashSet("default", "astronauts"),
1493                     Sets.newHashSet(shardManagerSnapshot.getShardList()));
1494         }};
1495         LOG.info("testAddShardReplica ending");
1496     }
1497
1498     @Test
1499     public void testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader() throws Exception {
1500         LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader starting");
1501         new JavaTestKit(getSystem()) {{
1502             TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1503                     newPropsShardMgrWithMockShardActor(), shardMgrID);
1504
1505             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1506             shardManager.tell(new ActorInitialized(), mockShardActor);
1507
1508             String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix;
1509             AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null);
1510             ActorRef leaderShardActor = shardManager.underlyingActor().getContext().actorOf(
1511                     Props.create(MockRespondActor.class, addServerReply), leaderId);
1512
1513             MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString());
1514
1515             String newReplicaId = "member-1-shard-default-" + shardMrgIDSuffix;
1516             shardManager.tell(new RoleChangeNotification(newReplicaId,
1517                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
1518             shardManager.tell(new ShardLeaderStateChanged(newReplicaId, leaderId, Optional.<DataTree>absent(),
1519                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
1520
1521             shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1522
1523             MessageCollectorActor.expectFirstMatching(leaderShardActor, AddServer.class);
1524
1525             Failure resp = expectMsgClass(duration("5 seconds"), Failure.class);
1526             assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1527
1528             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1529             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1530
1531             // Send message again to verify previous in progress state is cleared
1532
1533             shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1534             resp = expectMsgClass(duration("5 seconds"), Failure.class);
1535             assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1536
1537             // Send message again with an AddServer timeout to verify the pre-existing shard actor isn't terminated.
1538
1539             shardManager.tell(newDatastoreContextFactory(datastoreContextBuilder.
1540                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build()), getRef());
1541             leaderShardActor.tell(MockRespondActor.CLEAR_RESPONSE, ActorRef.noSender());
1542             shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1543             expectMsgClass(duration("5 seconds"), Failure.class);
1544
1545             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1546             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1547         }};
1548
1549         LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader ending");
1550     }
1551
1552     @Test
1553     public void testAddShardReplicaWithPreExistingLocalReplicaLeader() throws Exception {
1554         LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader starting");
1555         new JavaTestKit(getSystem()) {{
1556             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1557             ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
1558
1559             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1560             shardManager.tell(new ActorInitialized(), mockShardActor);
1561             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mock(DataTree.class)),
1562                     DataStoreVersions.CURRENT_VERSION), getRef());
1563             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
1564                     RaftState.Leader.name())), mockShardActor);
1565
1566             shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1567             Failure resp = expectMsgClass(duration("5 seconds"), Failure.class);
1568             assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1569
1570             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1571             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1572         }};
1573
1574         LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader ending");
1575     }
1576
1577     @Test
1578     public void testAddShardReplicaWithAddServerReplyFailure() throws Exception {
1579         LOG.info("testAddShardReplicaWithAddServerReplyFailure starting");
1580         new JavaTestKit(getSystem()) {{
1581             JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
1582
1583             MockConfiguration mockConfig =
1584                     new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1585                        put("astronauts", Arrays.asList("member-2")).build());
1586
1587             ActorRef mockNewReplicaShardActor = newMockShardActor(getSystem(), "astronauts", "member-1");
1588             final TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1589                     newTestShardMgrBuilder(mockConfig).shardActor(mockNewReplicaShardActor).props(), shardMgrID);
1590             shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1591
1592             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1593
1594             JavaTestKit terminateWatcher = new JavaTestKit(getSystem());
1595             terminateWatcher.watch(mockNewReplicaShardActor);
1596
1597             shardManager.tell(new AddShardReplica("astronauts"), getRef());
1598
1599             AddServer addServerMsg = mockShardLeaderKit.expectMsgClass(AddServer.class);
1600             assertEquals("AddServer serverId", "member-1-shard-astronauts-" + shardMrgIDSuffix,
1601                     addServerMsg.getNewServerId());
1602             mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.TIMEOUT, null));
1603
1604             Failure failure = expectMsgClass(duration("5 seconds"), Failure.class);
1605             assertEquals("Failure cause", TimeoutException.class, failure.cause().getClass());
1606
1607             shardManager.tell(new FindLocalShard("astronauts", false), getRef());
1608             expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
1609
1610             terminateWatcher.expectTerminated(mockNewReplicaShardActor);
1611
1612             shardManager.tell(new AddShardReplica("astronauts"), getRef());
1613             mockShardLeaderKit.expectMsgClass(AddServer.class);
1614             mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.NO_LEADER, null));
1615             failure = expectMsgClass(duration("5 seconds"), Failure.class);
1616             assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass());
1617         }};
1618
1619         LOG.info("testAddShardReplicaWithAddServerReplyFailure ending");
1620     }
1621
1622     @Test
1623     public void testAddShardReplicaWithAlreadyInProgress() throws Exception {
1624         testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1625                 AddServer.class, new AddShardReplica("astronauts"));
1626     }
1627
1628     @Test
1629     public void testAddShardReplicaWithFindPrimaryTimeout() throws Exception {
1630         LOG.info("testAddShardReplicaWithFindPrimaryTimeout starting");
1631         datastoreContextBuilder.shardInitializationTimeout(100, TimeUnit.MILLISECONDS);
1632         new JavaTestKit(getSystem()) {{
1633             MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1634                        put("astronauts", Arrays.asList("member-2")).build());
1635
1636             final ActorRef newReplicaShardManager = actorFactory.createActor(newTestShardMgrBuilder(mockConfig).
1637                     shardActor(mockShardActor).props(), shardMgrID);
1638
1639             newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1640             MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2",
1641                     AddressFromURIString.parse("akka.tcp://non-existent@127.0.0.1:5").toString());
1642
1643             newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
1644             Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class);
1645             assertEquals("Failure obtained", true,
1646                           (resp.cause() instanceof RuntimeException));
1647         }};
1648
1649         LOG.info("testAddShardReplicaWithFindPrimaryTimeout ending");
1650     }
1651
1652     @Test
1653     public void testRemoveShardReplicaForNonExistentShard() throws Exception {
1654         new JavaTestKit(getSystem()) {{
1655             ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
1656                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1657
1658             shardManager.tell(new RemoveShardReplica("model-inventory", "member-1"), getRef());
1659             Status.Failure resp = expectMsgClass(duration("10 seconds"), Status.Failure.class);
1660             assertEquals("Failure obtained", true,
1661                          (resp.cause() instanceof PrimaryNotFoundException));
1662         }};
1663     }
1664
1665     @Test
1666     /**
1667      * Primary is Local
1668      */
1669     public void testRemoveShardReplicaLocal() throws Exception {
1670         new JavaTestKit(getSystem()) {{
1671             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1672
1673             final TestActorRef<MockRespondActor> respondActor =
1674                     TestActorRef.create(getSystem(), Props.create(MockRespondActor.class), memberId);
1675
1676             ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
1677
1678             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1679             shardManager.tell(new ActorInitialized(), respondActor);
1680             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mock(DataTree.class)),
1681                     DataStoreVersions.CURRENT_VERSION), getRef());
1682             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
1683                     RaftState.Leader.name())), respondActor);
1684
1685             respondActor.underlyingActor().updateResponse(new RemoveServerReply(ServerChangeStatus.OK, null));
1686             shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, "member-1"), getRef());
1687             final RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(respondActor, RemoveServer.class);
1688             assertEquals(new ShardIdentifier("default", "member-1", shardMrgIDSuffix).toString(),
1689                     removeServer.getServerId());
1690             expectMsgClass(duration("5 seconds"), Success.class);
1691         }};
1692     }
1693
1694     @Test
1695     public void testRemoveShardReplicaRemote() throws Exception {
1696         MockConfiguration mockConfig =
1697                 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1698                         put("default", Arrays.asList("member-1", "member-2")).
1699                         put("astronauts", Arrays.asList("member-1")).build());
1700
1701         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1702
1703         // Create an ActorSystem ShardManager actor for member-1.
1704         final ActorSystem system1 = newActorSystem("Member1");
1705         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1706         ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1707
1708         final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
1709                 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockDefaultShardActor).cluster(
1710                         new ClusterWrapperImpl(system1)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1711                 shardManagerID);
1712
1713         // Create an ActorSystem ShardManager actor for member-2.
1714         final ActorSystem system2 = newActorSystem("Member2");
1715         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1716
1717         String name = new ShardIdentifier("default", "member-2", shardMrgIDSuffix).toString();
1718         final TestActorRef<MockRespondActor> mockShardLeaderActor =
1719                 TestActorRef.create(system2, Props.create(MockRespondActor.class), name);
1720
1721         LOG.error("Mock Shard Leader Actor : {}", mockShardLeaderActor);
1722
1723         final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
1724                 newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardLeaderActor).cluster(
1725                         new ClusterWrapperImpl(system2)).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1726                 shardManagerID);
1727
1728         // Because mockShardLeaderActor is created at the top level of the actor system it has an address like so,
1729         //    akka.tcp://cluster-test@127.0.0.1:2559/user/member-2-shard-default-config1
1730         // However when a shard manager has a local shard which is a follower and a leader that is remote it will
1731         // try to compute an address for the remote shard leader using the ShardPeerAddressResolver. This address will
1732         // look like so,
1733         //    akka.tcp://cluster-test@127.0.0.1:2559/user/shardmanager-config1/member-2-shard-default-config1
1734         // In this specific case if we did a FindPrimary for shard default from member-1 we would come up
1735         // with the address of an actor which does not exist, therefore any message sent to that actor would go to
1736         // dead letters.
1737         // To work around this problem we create a ForwardingActor with the right address and pass to it the
1738         // mockShardLeaderActor. The ForwardingActor simply forwards all messages to the mockShardLeaderActor and every
1739         // thing works as expected
1740         final ActorRef actorRef = leaderShardManager.underlyingActor().context()
1741                 .actorOf(Props.create(ForwardingActor.class, mockShardLeaderActor), "member-2-shard-default-" + shardMrgIDSuffix);
1742
1743         LOG.error("Forwarding actor : {}", actorRef);
1744
1745         new JavaTestKit(system1) {{
1746
1747             newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1748             leaderShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1749
1750             leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1751             newReplicaShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1752
1753             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
1754             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1755             leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2,
1756                     Optional.of(mock(DataTree.class)), leaderVersion), mockShardLeaderActor);
1757             leaderShardManager.tell(new RoleChangeNotification(memberId2,
1758                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardLeaderActor);
1759
1760             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
1761             newReplicaShardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2,
1762                     Optional.of(mock(DataTree.class)), leaderVersion), mockShardActor);
1763             newReplicaShardManager.tell(new RoleChangeNotification(memberId1,
1764                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
1765
1766             newReplicaShardManager.underlyingActor().waitForMemberUp();
1767             leaderShardManager.underlyingActor().waitForMemberUp();
1768
1769             //construct a mock response message
1770             RemoveServerReply response = new RemoveServerReply(ServerChangeStatus.OK, memberId2);
1771             mockShardLeaderActor.underlyingActor().updateResponse(response);
1772             newReplicaShardManager.tell(new RemoveShardReplica("default", "member-1"), getRef());
1773             RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1774                     RemoveServer.class);
1775             String removeServerId = new ShardIdentifier("default", "member-1", shardMrgIDSuffix).toString();
1776             assertEquals("RemoveServer serverId", removeServerId, removeServer.getServerId());
1777             expectMsgClass(duration("5 seconds"), Status.Success.class);
1778         }};
1779
1780     }
1781
1782     @Test
1783     public void testRemoveShardReplicaWhenAnotherRemoveShardReplicaAlreadyInProgress() throws Exception {
1784         testServerChangeWhenAlreadyInProgress("astronauts", new RemoveShardReplica("astronauts", "member-2"),
1785                 RemoveServer.class, new RemoveShardReplica("astronauts", "member-3"));
1786     }
1787
1788     @Test
1789     public void testRemoveShardReplicaWhenAddShardReplicaAlreadyInProgress() throws Exception {
1790         testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"),
1791                 AddServer.class, new RemoveShardReplica("astronauts", "member-2"));
1792     }
1793
1794
1795     public void testServerChangeWhenAlreadyInProgress(final String shardName, final Object firstServerChange,
1796                                                       final Class<?> firstForwardedServerChangeClass,
1797                                                       final Object secondServerChange) throws Exception {
1798         new JavaTestKit(getSystem()) {{
1799             JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
1800             JavaTestKit secondRequestKit = new JavaTestKit(getSystem());
1801
1802             MockConfiguration mockConfig =
1803                     new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1804                             put(shardName, Arrays.asList("member-2")).build());
1805
1806             final TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
1807                     newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardActor).cluster(
1808                             new MockClusterWrapper()).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1809                     shardMgrID);
1810
1811             shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1812
1813             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1814
1815             shardManager.tell(firstServerChange, getRef());
1816
1817             mockShardLeaderKit.expectMsgClass(firstForwardedServerChangeClass);
1818
1819             shardManager.tell(secondServerChange, secondRequestKit.getRef());
1820
1821             secondRequestKit.expectMsgClass(duration("5 seconds"), Failure.class);
1822         }};
1823     }
1824
1825     @Test
1826     public void testServerRemovedShardActorNotRunning() throws Exception {
1827         LOG.info("testServerRemovedShardActorNotRunning starting");
1828         new JavaTestKit(getSystem()) {{
1829             MockConfiguration mockConfig =
1830                     new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1831                             put("default", Arrays.asList("member-1", "member-2")).
1832                             put("astronauts", Arrays.asList("member-2")).
1833                             put("people", Arrays.asList("member-1", "member-2")).build());
1834
1835             TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig));
1836
1837             shardManager.underlyingActor().waitForRecoveryComplete();
1838             shardManager.tell(new FindLocalShard("people", false), getRef());
1839             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1840
1841             shardManager.tell(new FindLocalShard("default", false), getRef());
1842             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1843
1844             // Removed the default shard replica from member-1
1845             ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
1846             ShardIdentifier shardId = builder.shardName("default").memberName("member-1").type(shardMrgIDSuffix).build();
1847             shardManager.tell(new ServerRemoved(shardId.toString()), getRef());
1848
1849             shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1850         }};
1851
1852         LOG.info("testServerRemovedShardActorNotRunning ending");
1853     }
1854
1855     @Test
1856     public void testServerRemovedShardActorRunning() throws Exception {
1857         LOG.info("testServerRemovedShardActorRunning starting");
1858         new JavaTestKit(getSystem()) {{
1859             MockConfiguration mockConfig =
1860                     new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1861                             put("default", Arrays.asList("member-1", "member-2")).
1862                             put("astronauts", Arrays.asList("member-2")).
1863                             put("people", Arrays.asList("member-1", "member-2")).build());
1864
1865             String shardId = ShardIdentifier.builder().shardName("default").memberName("member-1").
1866                     type(shardMrgIDSuffix).build().toString();
1867             TestActorRef<MessageCollectorActor> shard = actorFactory.createTestActor(
1868                     MessageCollectorActor.props(), shardId);
1869
1870             TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(
1871                     newTestShardMgrBuilder(mockConfig).addShardActor("default", shard).props());
1872
1873             shardManager.underlyingActor().waitForRecoveryComplete();
1874
1875             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1876             shardManager.tell(new ActorInitialized(), shard);
1877
1878             waitForShardInitialized(shardManager, "people", this);
1879             waitForShardInitialized(shardManager, "default", this);
1880
1881             // Removed the default shard replica from member-1
1882             shardManager.tell(new ServerRemoved(shardId), getRef());
1883
1884             shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1885
1886             MessageCollectorActor.expectFirstMatching(shard, Shutdown.class);
1887         }};
1888
1889         LOG.info("testServerRemovedShardActorRunning ending");
1890     }
1891
1892
1893     @Test
1894     public void testShardPersistenceWithRestoredData() throws Exception {
1895         LOG.info("testShardPersistenceWithRestoredData starting");
1896         new JavaTestKit(getSystem()) {{
1897             MockConfiguration mockConfig =
1898                 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1899                    put("default", Arrays.asList("member-1", "member-2")).
1900                    put("astronauts", Arrays.asList("member-2")).
1901                    put("people", Arrays.asList("member-1", "member-2")).build());
1902             String[] restoredShards = {"default", "astronauts"};
1903             ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards));
1904             InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot);
1905
1906             //create shardManager to come up with restored data
1907             TestActorRef<TestShardManager> newRestoredShardManager = actorFactory.createTestActor(
1908                     newShardMgrProps(mockConfig));
1909
1910             newRestoredShardManager.underlyingActor().waitForRecoveryComplete();
1911
1912             newRestoredShardManager.tell(new FindLocalShard("people", false), getRef());
1913             LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
1914             assertEquals("for uninitialized shard", "people", notFound.getShardName());
1915
1916             //Verify a local shard is created for the restored shards,
1917             //although we expect a NotInitializedException for the shards as the actor initialization
1918             //message is not sent for them
1919             newRestoredShardManager.tell(new FindLocalShard("default", false), getRef());
1920             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1921
1922             newRestoredShardManager.tell(new FindLocalShard("astronauts", false), getRef());
1923             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1924         }};
1925
1926         LOG.info("testShardPersistenceWithRestoredData ending");
1927     }
1928
1929     @Test
1930     public void testShutDown() throws Exception {
1931         LOG.info("testShutDown starting");
1932         new JavaTestKit(getSystem()) {{
1933             MockConfiguration mockConfig =
1934                     new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1935                             put("shard1", Arrays.asList("member-1")).
1936                             put("shard2", Arrays.asList("member-1")).build());
1937
1938             String shardId1 = ShardIdentifier.builder().shardName("shard1").memberName("member-1").
1939                     type(shardMrgIDSuffix).build().toString();
1940             TestActorRef<MessageCollectorActor> shard1 = actorFactory.createTestActor(
1941                     MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId1);
1942
1943             String shardId2 = ShardIdentifier.builder().shardName("shard2").memberName("member-1").
1944                     type(shardMrgIDSuffix).build().toString();
1945             TestActorRef<MessageCollectorActor> shard2 = actorFactory.createTestActor(
1946                     MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId2);
1947
1948             TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(
1949                     mockConfig).addShardActor("shard1", shard1).addShardActor("shard2", shard2).props().
1950                         withDispatcher(Dispatchers.DefaultDispatcherId()));
1951
1952             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1953             shardManager.tell(new ActorInitialized(), shard1);
1954             shardManager.tell(new ActorInitialized(), shard2);
1955
1956             FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
1957             Future<Boolean> stopFuture = Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE);
1958
1959             MessageCollectorActor.expectFirstMatching(shard1, Shutdown.class);
1960             MessageCollectorActor.expectFirstMatching(shard2, Shutdown.class);
1961
1962             try {
1963                 Await.ready(stopFuture, FiniteDuration.create(500, TimeUnit.MILLISECONDS));
1964                 fail("ShardManager actor stopped without waiting for the Shards to be stopped");
1965             } catch(TimeoutException e) {
1966                 // expected
1967             }
1968
1969             actorFactory.killActor(shard1, this);
1970             actorFactory.killActor(shard2, this);
1971
1972             Boolean stopped = Await.result(stopFuture, duration);
1973             assertEquals("Stopped", Boolean.TRUE, stopped);
1974         }};
1975
1976         LOG.info("testShutDown ending");
1977     }
1978
1979     private static class TestShardManager extends ShardManager {
1980         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
1981         private final CountDownLatch snapshotPersist = new CountDownLatch(1);
1982         private ShardManagerSnapshot snapshot;
1983         private final Map<String, ActorRef> shardActors;
1984         private final ActorRef shardActor;
1985         private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
1986         private CountDownLatch memberUpReceived = new CountDownLatch(1);
1987         private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
1988         private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
1989         private CountDownLatch memberReachableReceived = new CountDownLatch(1);
1990         private volatile MessageInterceptor messageInterceptor;
1991
1992         private TestShardManager(Builder builder) {
1993             super(builder);
1994             shardActor = builder.shardActor;
1995             shardActors = builder.shardActors;
1996         }
1997
1998         @Override
1999         protected void handleRecover(Object message) throws Exception {
2000             try {
2001                 super.handleRecover(message);
2002             } finally {
2003                 if(message instanceof RecoveryCompleted) {
2004                     recoveryComplete.countDown();
2005                 }
2006             }
2007         }
2008
2009         @Override
2010         public void handleCommand(Object message) throws Exception {
2011             try{
2012                 if(messageInterceptor != null && messageInterceptor.canIntercept(message)) {
2013                     getSender().tell(messageInterceptor.apply(message), getSelf());
2014                 } else {
2015                     super.handleCommand(message);
2016                 }
2017             } finally {
2018                 if(message instanceof FindPrimary) {
2019                     findPrimaryMessageReceived.countDown();
2020                 } else if(message instanceof ClusterEvent.MemberUp) {
2021                     String role = ((ClusterEvent.MemberUp)message).member().roles().iterator().next();
2022                     if(!getCluster().getCurrentMemberName().equals(role)) {
2023                         memberUpReceived.countDown();
2024                     }
2025                 } else if(message instanceof ClusterEvent.MemberRemoved) {
2026                     String role = ((ClusterEvent.MemberRemoved)message).member().roles().iterator().next();
2027                     if(!getCluster().getCurrentMemberName().equals(role)) {
2028                         memberRemovedReceived.countDown();
2029                     }
2030                 } else if(message instanceof ClusterEvent.UnreachableMember) {
2031                     String role = ((ClusterEvent.UnreachableMember)message).member().roles().iterator().next();
2032                     if(!getCluster().getCurrentMemberName().equals(role)) {
2033                         memberUnreachableReceived.countDown();
2034                     }
2035                 } else if(message instanceof ClusterEvent.ReachableMember) {
2036                     String role = ((ClusterEvent.ReachableMember)message).member().roles().iterator().next();
2037                     if(!getCluster().getCurrentMemberName().equals(role)) {
2038                         memberReachableReceived.countDown();
2039                     }
2040                 }
2041             }
2042         }
2043
2044         void setMessageInterceptor(MessageInterceptor messageInterceptor) {
2045             this.messageInterceptor = messageInterceptor;
2046         }
2047
2048         void waitForRecoveryComplete() {
2049             assertEquals("Recovery complete", true,
2050                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
2051         }
2052
2053         void waitForMemberUp() {
2054             assertEquals("MemberUp received", true,
2055                     Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
2056             memberUpReceived = new CountDownLatch(1);
2057         }
2058
2059         void waitForMemberRemoved() {
2060             assertEquals("MemberRemoved received", true,
2061                     Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
2062             memberRemovedReceived = new CountDownLatch(1);
2063         }
2064
2065         void waitForUnreachableMember() {
2066             assertEquals("UnreachableMember received", true,
2067                 Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS
2068                 ));
2069             memberUnreachableReceived = new CountDownLatch(1);
2070         }
2071
2072         void waitForReachableMember() {
2073             assertEquals("ReachableMember received", true,
2074                 Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
2075             memberReachableReceived = new CountDownLatch(1);
2076         }
2077
2078         void verifyFindPrimary() {
2079             assertEquals("FindPrimary received", true,
2080                     Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
2081             findPrimaryMessageReceived = new CountDownLatch(1);
2082         }
2083
2084         public static Builder builder(DatastoreContext.Builder datastoreContextBuilder) {
2085             return new Builder(datastoreContextBuilder);
2086         }
2087
2088         private static class Builder extends AbstractGenericBuilder<Builder, TestShardManager> {
2089             private ActorRef shardActor;
2090             private final Map<String, ActorRef> shardActors = new HashMap<>();
2091
2092             Builder(DatastoreContext.Builder datastoreContextBuilder) {
2093                 super(TestShardManager.class);
2094                 datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build()));
2095             }
2096
2097             Builder shardActor(ActorRef shardActor) {
2098                 this.shardActor = shardActor;
2099                 return this;
2100             }
2101
2102             Builder addShardActor(String shardName, ActorRef actorRef){
2103                 shardActors.put(shardName, actorRef);
2104                 return this;
2105             }
2106         }
2107
2108         @Override
2109         public void saveSnapshot(Object obj) {
2110             snapshot = (ShardManagerSnapshot) obj;
2111             snapshotPersist.countDown();
2112             super.saveSnapshot(obj);
2113         }
2114
2115         void verifySnapshotPersisted(Set<String> shardList) {
2116             assertEquals("saveSnapshot invoked", true,
2117                     Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
2118             assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
2119         }
2120
2121         @Override
2122         protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
2123             if(shardActors.get(info.getShardName()) != null){
2124                 return shardActors.get(info.getShardName());
2125             }
2126
2127             if(shardActor != null) {
2128                 return shardActor;
2129             }
2130
2131             return super.newShardActor(schemaContext, info);
2132         }
2133     }
2134
2135     private static abstract class AbstractGenericBuilder<T extends AbstractGenericBuilder<T, ?>, C extends ShardManager>
2136                                                      extends ShardManager.AbstractBuilder<T> {
2137         private final Class<C> shardManagerClass;
2138
2139         AbstractGenericBuilder(Class<C> shardManagerClass) {
2140             this.shardManagerClass = shardManagerClass;
2141             cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).
2142                     waitTillReadyCountdownLatch(ready).primaryShardInfoCache(new PrimaryShardInfoFutureCache());
2143         }
2144
2145         @Override
2146         public Props props() {
2147             verify();
2148             return Props.create(shardManagerClass, this);
2149         }
2150     }
2151
2152     private static class GenericBuilder<C extends ShardManager> extends AbstractGenericBuilder<GenericBuilder<C>, C> {
2153         GenericBuilder(Class<C> shardManagerClass) {
2154             super(shardManagerClass);
2155         }
2156     }
2157
2158     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
2159         private static final long serialVersionUID = 1L;
2160         private final Creator<ShardManager> delegate;
2161
2162         public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
2163             this.delegate = delegate;
2164         }
2165
2166         @Override
2167         public ShardManager create() throws Exception {
2168             return delegate.create();
2169         }
2170     }
2171
2172     interface MessageInterceptor extends Function<Object, Object> {
2173         boolean canIntercept(Object message);
2174     }
2175
2176     private static MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) {
2177         return new MessageInterceptor(){
2178             @Override
2179             public Object apply(Object message) {
2180                 return new RemotePrimaryShardFound(Serialization.serializedActorPath(primaryActor), (short) 1);
2181             }
2182
2183             @Override
2184             public boolean canIntercept(Object message) {
2185                 return message instanceof FindPrimary;
2186             }
2187         };
2188     }
2189
2190     private static class MockRespondActor extends MessageCollectorActor {
2191         static final String CLEAR_RESPONSE = "clear-response";
2192         static final org.slf4j.Logger LOG = LoggerFactory.getLogger(MockRespondActor.class);
2193
2194         private volatile Object responseMsg;
2195
2196         @SuppressWarnings("unused")
2197         public MockRespondActor() {
2198         }
2199
2200         @SuppressWarnings("unused")
2201         public MockRespondActor(Object responseMsg) {
2202             this.responseMsg = responseMsg;
2203         }
2204
2205         public void updateResponse(Object response) {
2206             responseMsg = response;
2207         }
2208
2209         @Override
2210         public void onReceive(Object message) throws Exception {
2211             if(!"get-all-messages".equals(message)) {
2212                 LOG.debug("Received message : {}", message);
2213             }
2214             super.onReceive(message);
2215             if (message instanceof AddServer && responseMsg != null) {
2216                 getSender().tell(responseMsg, getSelf());
2217             } else if(message instanceof RemoveServer && responseMsg != null){
2218                 getSender().tell(responseMsg, getSelf());
2219             } else if(message.equals(CLEAR_RESPONSE)) {
2220                 responseMsg = null;
2221             }
2222         }
2223     }
2224 }