BUG 2817 : Handle ServerRemoved message in Shard/ShardManager
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardManagerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Mockito.mock;
17 import static org.mockito.Mockito.never;
18 import static org.mockito.Mockito.times;
19 import static org.mockito.Mockito.verify;
20 import akka.actor.ActorRef;
21 import akka.actor.ActorSystem;
22 import akka.actor.AddressFromURIString;
23 import akka.actor.PoisonPill;
24 import akka.actor.Props;
25 import akka.actor.Status;
26 import akka.actor.Status.Failure;
27 import akka.actor.Status.Success;
28 import akka.actor.Terminated;
29 import akka.cluster.Cluster;
30 import akka.cluster.ClusterEvent;
31 import akka.dispatch.Dispatchers;
32 import akka.japi.Creator;
33 import akka.pattern.Patterns;
34 import akka.persistence.RecoveryCompleted;
35 import akka.serialization.Serialization;
36 import akka.testkit.JavaTestKit;
37 import akka.testkit.TestActorRef;
38 import akka.util.Timeout;
39 import com.google.common.base.Function;
40 import com.google.common.base.Optional;
41 import com.google.common.collect.ImmutableMap;
42 import com.google.common.collect.ImmutableSet;
43 import com.google.common.collect.Sets;
44 import com.google.common.util.concurrent.Uninterruptibles;
45 import com.typesafe.config.ConfigFactory;
46 import java.net.URI;
47 import java.util.AbstractMap;
48 import java.util.Arrays;
49 import java.util.Collection;
50 import java.util.Collections;
51 import java.util.HashMap;
52 import java.util.HashSet;
53 import java.util.List;
54 import java.util.Map;
55 import java.util.Map.Entry;
56 import java.util.Set;
57 import java.util.concurrent.CountDownLatch;
58 import java.util.concurrent.TimeUnit;
59 import java.util.concurrent.TimeoutException;
60 import org.junit.After;
61 import org.junit.Before;
62 import org.junit.Test;
63 import org.mockito.Mock;
64 import org.mockito.Mockito;
65 import org.mockito.MockitoAnnotations;
66 import org.opendaylight.controller.cluster.datastore.config.Configuration;
67 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
68 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
69 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
70 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
71 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
72 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
73 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
74 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
75 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
76 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
77 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
78 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
79 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
80 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
81 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
82 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
83 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
84 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
85 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
86 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
87 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
88 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
89 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
90 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
91 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
92 import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
93 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
94 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
95 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
96 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
97 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
98 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
99 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
100 import org.opendaylight.controller.cluster.raft.RaftState;
101 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
102 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
103 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
104 import org.opendaylight.controller.cluster.raft.messages.AddServer;
105 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
106 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
107 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
108 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
109 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
110 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
111 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
112 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
113 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
114 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
115 import scala.concurrent.Await;
116 import scala.concurrent.Future;
117 import scala.concurrent.duration.FiniteDuration;
118
119 public class ShardManagerTest extends AbstractActorTest {
120     private static int ID_COUNTER = 1;
121
122     private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
123     private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
124
125     @Mock
126     private static CountDownLatch ready;
127
128     private static TestActorRef<MessageCollectorActor> mockShardActor;
129
130     private static String mockShardName;
131
132     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
133             dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
134                    .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
135
136     private static ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
137         String name = new ShardIdentifier(shardName, memberName,"config").toString();
138         return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
139     }
140
141     private final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
142
143     @Before
144     public void setUp() {
145         MockitoAnnotations.initMocks(this);
146
147         InMemoryJournal.clear();
148         InMemorySnapshotStore.clear();
149
150         if(mockShardActor == null) {
151             mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString();
152             mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), mockShardName);
153         }
154
155         mockShardActor.underlyingActor().clear();
156     }
157
158     @After
159     public void tearDown() {
160         InMemoryJournal.clear();
161         InMemorySnapshotStore.clear();
162     }
163
164     private Props newShardMgrProps() {
165         return newShardMgrProps(new MockConfiguration());
166     }
167
168     private static DatastoreContextFactory newDatastoreContextFactory(DatastoreContext datastoreContext) {
169         DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class);
170         Mockito.doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext();
171         Mockito.doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(Mockito.anyString());
172         return mockFactory;
173     }
174
175     private Props newShardMgrProps(Configuration config) {
176         return TestShardManager.builder(datastoreContextBuilder).configuration(config).props();
177     }
178
179     private Props newPropsShardMgrWithMockShardActor() {
180         return newPropsShardMgrWithMockShardActor("shardManager", mockShardActor, new MockClusterWrapper(),
181                 new MockConfiguration());
182     }
183
184     private Props newPropsShardMgrWithMockShardActor(final String name, final ActorRef shardActor,
185             final ClusterWrapper clusterWrapper, final Configuration config) {
186         Creator<ShardManager> creator = new Creator<ShardManager>() {
187             private static final long serialVersionUID = 1L;
188             @Override
189             public ShardManager create() throws Exception {
190                 return new ForwardingShardManager(ShardManager.builder().cluster(clusterWrapper).configuration(config).
191                         datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build())).
192                         waitTillReadyCountdownLatch(ready).primaryShardInfoCache(primaryShardInfoCache), name, shardActor);
193             }
194         };
195
196         return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
197     }
198
199     private TestShardManager newTestShardManager() {
200         return newTestShardManager(newShardMgrProps());
201     }
202
203     private TestShardManager newTestShardManager(Props props) {
204         TestActorRef<TestShardManager> shardManagerActor = TestActorRef.create(getSystem(), props);
205         TestShardManager shardManager = shardManagerActor.underlyingActor();
206         shardManager.waitForRecoveryComplete();
207         return shardManager;
208     }
209
210     @Test
211     public void testPerShardDatastoreContext() throws Exception {
212         final DatastoreContextFactory mockFactory = newDatastoreContextFactory(
213                 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
214
215         Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
216                 shardElectionTimeoutFactor(6).build()).when(mockFactory).getShardDatastoreContext("default");
217
218         Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
219                 shardElectionTimeoutFactor(7).build()).when(mockFactory).getShardDatastoreContext("topology");
220
221         final MockConfiguration mockConfig = new MockConfiguration() {
222             @Override
223             public Collection<String> getMemberShardNames(String memberName) {
224                 return Arrays.asList("default", "topology");
225             }
226
227             @Override
228             public Collection<String> getMembersFromShardName(String shardName) {
229                 return Arrays.asList("member-1");
230             }
231         };
232
233         final TestActorRef<MessageCollectorActor> defaultShardActor = TestActorRef.create(getSystem(),
234                 Props.create(MessageCollectorActor.class), "default");
235         final TestActorRef<MessageCollectorActor> topologyShardActor = TestActorRef.create(getSystem(),
236                 Props.create(MessageCollectorActor.class), "topology");
237
238         final Map<String, Entry<ActorRef, DatastoreContext>> shardInfoMap = Collections.synchronizedMap(
239                 new HashMap<String, Entry<ActorRef, DatastoreContext>>());
240         shardInfoMap.put("default", new AbstractMap.SimpleEntry<ActorRef, DatastoreContext>(defaultShardActor, null));
241         shardInfoMap.put("topology", new AbstractMap.SimpleEntry<ActorRef, DatastoreContext>(topologyShardActor, null));
242
243         final CountDownLatch newShardActorLatch = new CountDownLatch(2);
244         final Creator<ShardManager> creator = new Creator<ShardManager>() {
245             private static final long serialVersionUID = 1L;
246             @Override
247             public ShardManager create() throws Exception {
248                 return new ShardManager(ShardManager.builder().cluster(new MockClusterWrapper()).configuration(mockConfig).
249                         datastoreContextFactory(mockFactory).waitTillReadyCountdownLatch(ready).
250                         primaryShardInfoCache(primaryShardInfoCache)) {
251                     @Override
252                     protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
253                         Entry<ActorRef, DatastoreContext> entry = shardInfoMap.get(info.getShardName());
254                         ActorRef ref = null;
255                         if(entry != null) {
256                             ref = entry.getKey();
257                             entry.setValue(info.getDatastoreContext());
258                         }
259
260                         newShardActorLatch.countDown();
261                         return ref;
262                     }
263                 };
264             }
265         };
266
267         JavaTestKit kit = new JavaTestKit(getSystem());
268
269         final ActorRef shardManager = getSystem().actorOf(Props.create(new DelegatingShardManagerCreator(creator)).
270                     withDispatcher(Dispatchers.DefaultDispatcherId()));
271
272         shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), kit.getRef());
273
274         assertEquals("Shard actors created", true, newShardActorLatch.await(5, TimeUnit.SECONDS));
275         assertEquals("getShardElectionTimeoutFactor", 6, shardInfoMap.get("default").getValue().
276                 getShardElectionTimeoutFactor());
277         assertEquals("getShardElectionTimeoutFactor", 7, shardInfoMap.get("topology").getValue().
278                 getShardElectionTimeoutFactor());
279
280         DatastoreContextFactory newMockFactory = newDatastoreContextFactory(
281                 datastoreContextBuilder.shardElectionTimeoutFactor(5).build());
282         Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
283                 shardElectionTimeoutFactor(66).build()).when(newMockFactory).getShardDatastoreContext("default");
284
285         Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()).
286                 shardElectionTimeoutFactor(77).build()).when(newMockFactory).getShardDatastoreContext("topology");
287
288         shardManager.tell(newMockFactory, kit.getRef());
289
290         DatastoreContext newContext = MessageCollectorActor.expectFirstMatching(defaultShardActor, DatastoreContext.class);
291         assertEquals("getShardElectionTimeoutFactor", 66, newContext.getShardElectionTimeoutFactor());
292
293         newContext = MessageCollectorActor.expectFirstMatching(topologyShardActor, DatastoreContext.class);
294         assertEquals("getShardElectionTimeoutFactor", 77, newContext.getShardElectionTimeoutFactor());
295
296         defaultShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
297         topologyShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
298     }
299
300     @Test
301     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
302         new JavaTestKit(getSystem()) {{
303             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
304
305             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
306
307             shardManager.tell(new FindPrimary("non-existent", false), getRef());
308
309             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
310         }};
311     }
312
313     @Test
314     public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception {
315         new JavaTestKit(getSystem()) {{
316             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
317
318             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
319
320             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
321             shardManager.tell(new ActorInitialized(), mockShardActor);
322
323             DataTree mockDataTree = mock(DataTree.class);
324             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
325                     DataStoreVersions.CURRENT_VERSION), getRef());
326
327             MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
328             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
329                     RaftState.Leader.name())), mockShardActor);
330
331             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
332
333             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
334             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
335                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
336             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
337         }};
338     }
339
340     @Test
341     public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() throws Exception {
342         new JavaTestKit(getSystem()) {{
343             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
344
345             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
346             shardManager.tell(new ActorInitialized(), mockShardActor);
347
348             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
349             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
350             shardManager.tell(new RoleChangeNotification(memberId1,
351                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
352             shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION), mockShardActor);
353
354             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
355
356             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
357         }};
358     }
359
360     @Test
361     public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception {
362         new JavaTestKit(getSystem()) {{
363             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
364
365             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
366             shardManager.tell(new ActorInitialized(), mockShardActor);
367
368             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
369             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
370
371             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
372             shardManager.tell(new RoleChangeNotification(memberId1,
373                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
374             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
375             shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, Optional.<DataTree>absent(),
376                     leaderVersion), mockShardActor);
377
378             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
379
380             RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
381             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
382                     primaryFound.getPrimaryPath().contains("member-2-shard-default"));
383             assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
384         }};
385     }
386
387     @Test
388     public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception {
389         new JavaTestKit(getSystem()) {{
390             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
391
392             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
393
394             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
395         }};
396     }
397
398     @Test
399     public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception {
400         new JavaTestKit(getSystem()) {{
401             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
402
403             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
404             shardManager.tell(new ActorInitialized(), mockShardActor);
405
406             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
407
408             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
409         }};
410     }
411
412     @Test
413     public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception {
414         new JavaTestKit(getSystem()) {{
415             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
416
417             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
418             shardManager.tell(new ActorInitialized(), mockShardActor);
419
420             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
421             shardManager.tell(new RoleChangeNotification(memberId,
422                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
423
424             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
425
426             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
427
428             DataTree mockDataTree = mock(DataTree.class);
429             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
430                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
431
432             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
433
434             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
435             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
436                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
437             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
438         }};
439     }
440
441     @Test
442     public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
443         new JavaTestKit(getSystem()) {{
444             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
445
446             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
447
448             // We're passing waitUntilInitialized = true to FindPrimary so the response should be
449             // delayed until we send ActorInitialized and RoleChangeNotification.
450             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
451
452             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
453
454             shardManager.tell(new ActorInitialized(), mockShardActor);
455
456             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
457
458             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
459             shardManager.tell(new RoleChangeNotification(memberId,
460                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor);
461
462             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
463
464             DataTree mockDataTree = mock(DataTree.class);
465             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
466                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
467
468             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
469             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
470                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
471             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
472
473             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
474         }};
475     }
476
477     @Test
478     public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception {
479         new JavaTestKit(getSystem()) {{
480             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
481
482             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
483
484             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
485
486             expectMsgClass(duration("2 seconds"), NotInitializedException.class);
487
488             shardManager.tell(new ActorInitialized(), mockShardActor);
489
490             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
491         }};
492     }
493
494     @Test
495     public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception {
496         new JavaTestKit(getSystem()) {{
497             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
498
499             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
500             shardManager.tell(new ActorInitialized(), mockShardActor);
501             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
502                     null, RaftState.Candidate.name()), mockShardActor);
503
504             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
505
506             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
507         }};
508     }
509
510     @Test
511     public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() throws Exception {
512         new JavaTestKit(getSystem()) {{
513             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
514
515             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
516             shardManager.tell(new ActorInitialized(), mockShardActor);
517             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
518                     null, RaftState.IsolatedLeader.name()), mockShardActor);
519
520             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
521
522             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
523         }};
524     }
525
526     @Test
527     public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
528         new JavaTestKit(getSystem()) {{
529             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
530
531             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
532             shardManager.tell(new ActorInitialized(), mockShardActor);
533
534             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
535
536             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
537         }};
538     }
539
540     @Test
541     public void testOnReceiveFindPrimaryForRemoteShard() throws Exception {
542         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
543
544         // Create an ActorSystem ShardManager actor for member-1.
545
546         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
547         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
548
549         ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
550
551         final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
552                 newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
553                         new MockConfiguration()), shardManagerID);
554
555         // Create an ActorSystem ShardManager actor for member-2.
556
557         final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
558
559         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
560
561         final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
562
563         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
564                 put("default", Arrays.asList("member-1", "member-2")).
565                 put("astronauts", Arrays.asList("member-2")).build());
566
567         final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
568                 newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
569                         mockConfig2), shardManagerID);
570
571         new JavaTestKit(system1) {{
572
573             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
574             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
575
576             shardManager2.tell(new ActorInitialized(), mockShardActor2);
577
578             String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
579             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
580             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2,
581                     Optional.of(mock(DataTree.class)), leaderVersion), mockShardActor2);
582             shardManager2.tell(new RoleChangeNotification(memberId2,
583                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
584
585             shardManager1.underlyingActor().waitForMemberUp();
586
587             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
588
589             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
590             String path = found.getPrimaryPath();
591             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
592             assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
593
594             shardManager2.underlyingActor().verifyFindPrimary();
595
596             Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
597
598             shardManager1.underlyingActor().waitForMemberRemoved();
599
600             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
601
602             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
603         }};
604
605         JavaTestKit.shutdownActorSystem(system1);
606         JavaTestKit.shutdownActorSystem(system2);
607     }
608
609     @Test
610     public void testShardAvailabilityOnChangeOfMemberReachability() throws Exception {
611         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
612
613         // Create an ActorSystem ShardManager actor for member-1.
614
615         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
616         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
617
618         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
619
620         final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
621             newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
622                 new MockConfiguration()), shardManagerID);
623
624         // Create an ActorSystem ShardManager actor for member-2.
625
626         final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
627
628         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
629
630         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
631
632         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
633             put("default", Arrays.asList("member-1", "member-2")).build());
634
635         final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
636             newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
637                 mockConfig2), shardManagerID);
638
639         new JavaTestKit(system1) {{
640
641             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
642             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
643             shardManager1.tell(new ActorInitialized(), mockShardActor1);
644             shardManager2.tell(new ActorInitialized(), mockShardActor2);
645
646             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
647             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
648             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
649                 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
650             shardManager1.tell(new RoleChangeNotification(memberId1,
651                 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
652             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)),
653                     DataStoreVersions.CURRENT_VERSION),
654                 mockShardActor2);
655             shardManager2.tell(new RoleChangeNotification(memberId2,
656                 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
657             shardManager1.underlyingActor().waitForMemberUp();
658
659             shardManager1.tell(new FindPrimary("default", true), getRef());
660
661             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
662             String path = found.getPrimaryPath();
663             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
664
665             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
666                 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
667
668             shardManager1.underlyingActor().waitForUnreachableMember();
669
670             PeerDown peerDown = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
671             assertEquals("getMemberName", "member-2", peerDown.getMemberName());
672             MessageCollectorActor.clearMessages(mockShardActor1);
673
674             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
675                     createMemberRemoved("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
676
677             MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
678
679             shardManager1.tell(new FindPrimary("default", true), getRef());
680
681             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
682
683             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
684                 createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
685
686             shardManager1.underlyingActor().waitForReachableMember();
687
688             PeerUp peerUp = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
689             assertEquals("getMemberName", "member-2", peerUp.getMemberName());
690             MessageCollectorActor.clearMessages(mockShardActor1);
691
692             shardManager1.tell(new FindPrimary("default", true), getRef());
693
694             RemotePrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
695             String path1 = found1.getPrimaryPath();
696             assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
697
698             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
699                     createMemberUp("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
700
701             MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
702
703         }};
704
705         JavaTestKit.shutdownActorSystem(system1);
706         JavaTestKit.shutdownActorSystem(system2);
707     }
708
709     @Test
710     public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() throws Exception {
711         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
712
713         // Create an ActorSystem ShardManager actor for member-1.
714
715         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
716         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
717
718         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
719
720         final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
721             newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
722                 new MockConfiguration()), shardManagerID);
723
724         // Create an ActorSystem ShardManager actor for member-2.
725
726         final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
727
728         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
729
730         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
731
732         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
733             put("default", Arrays.asList("member-1", "member-2")).build());
734
735         final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
736             newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
737                 mockConfig2), shardManagerID);
738
739         new JavaTestKit(system1) {{
740
741             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
742             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
743             shardManager1.tell(new ActorInitialized(), mockShardActor1);
744             shardManager2.tell(new ActorInitialized(), mockShardActor2);
745
746             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
747             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
748             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
749                 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
750             shardManager1.tell(new RoleChangeNotification(memberId1,
751                 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
752             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)),
753                     DataStoreVersions.CURRENT_VERSION),
754                 mockShardActor2);
755             shardManager2.tell(new RoleChangeNotification(memberId2,
756                 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
757             shardManager1.underlyingActor().waitForMemberUp();
758
759             shardManager1.tell(new FindPrimary("default", true), getRef());
760
761             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
762             String path = found.getPrimaryPath();
763             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
764
765             primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(system1.actorSelection(
766                     mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION, Optional.<DataTree>absent()));
767
768             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
769                 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
770
771             shardManager1.underlyingActor().waitForUnreachableMember();
772
773             shardManager1.tell(new FindPrimary("default", true), getRef());
774
775             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
776
777             assertNull("Expected primaryShardInfoCache entry removed", primaryShardInfoCache.getIfPresent("default"));
778
779             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, Optional.of(mock(DataTree.class)),
780                     DataStoreVersions.CURRENT_VERSION), mockShardActor1);
781             shardManager1.tell(new RoleChangeNotification(memberId1,
782                 RaftState.Follower.name(), RaftState.Leader.name()), mockShardActor1);
783
784             shardManager1.tell(new FindPrimary("default", true), getRef());
785
786             LocalPrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
787             String path1 = found1.getPrimaryPath();
788             assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
789
790         }};
791
792         JavaTestKit.shutdownActorSystem(system1);
793         JavaTestKit.shutdownActorSystem(system2);
794     }
795
796
797     @Test
798     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
799         new JavaTestKit(getSystem()) {{
800             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
801
802             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
803
804             shardManager.tell(new FindLocalShard("non-existent", false), getRef());
805
806             LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
807
808             assertEquals("getShardName", "non-existent", notFound.getShardName());
809         }};
810     }
811
812     @Test
813     public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
814         new JavaTestKit(getSystem()) {{
815             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
816
817             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
818             shardManager.tell(new ActorInitialized(), mockShardActor);
819
820             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
821
822             LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
823
824             assertTrue("Found path contains " + found.getPath().path().toString(),
825                     found.getPath().path().toString().contains("member-1-shard-default-config"));
826         }};
827     }
828
829     @Test
830     public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
831         new JavaTestKit(getSystem()) {{
832             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
833
834             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
835
836             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
837         }};
838     }
839
840     @Test
841     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
842         new JavaTestKit(getSystem()) {{
843             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
844
845             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
846
847             // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
848             // delayed until we send ActorInitialized.
849             Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
850                     new Timeout(5, TimeUnit.SECONDS));
851
852             shardManager.tell(new ActorInitialized(), mockShardActor);
853
854             Object resp = Await.result(future, duration("5 seconds"));
855             assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
856         }};
857     }
858
859     @Test
860     public void testOnRecoveryJournalIsCleaned() {
861         String persistenceID = "shard-manager-" + shardMrgIDSuffix;
862         InMemoryJournal.addEntry(persistenceID, 1L, new ShardManager.SchemaContextModules(
863                 ImmutableSet.of("foo")));
864         InMemoryJournal.addEntry(persistenceID, 2L, new ShardManager.SchemaContextModules(
865                 ImmutableSet.of("bar")));
866         InMemoryJournal.addDeleteMessagesCompleteLatch(persistenceID);
867
868         TestShardManager shardManager = newTestShardManager();
869
870         InMemoryJournal.waitForDeleteMessagesComplete(persistenceID);
871
872         // Journal entries up to the last one should've been deleted
873         Map<Long, Object> journal = InMemoryJournal.get(persistenceID);
874         synchronized (journal) {
875             assertEquals("Journal size", 0, journal.size());
876         }
877     }
878
879     @Test
880     public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
881         TestShardManager shardManager = newTestShardManager();
882
883         String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
884         shardManager.onReceiveCommand(new RoleChangeNotification(
885                 memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
886
887         verify(ready, never()).countDown();
888
889         shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
890                 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION));
891
892         verify(ready, times(1)).countDown();
893     }
894
895     @Test
896     public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
897         new JavaTestKit(getSystem()) {{
898             TestShardManager shardManager = newTestShardManager();
899
900             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
901             shardManager.onReceiveCommand(new RoleChangeNotification(
902                     memberId, null, RaftState.Follower.name()));
903
904             verify(ready, never()).countDown();
905
906             shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
907
908             shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId,
909                     "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
910                     DataStoreVersions.CURRENT_VERSION));
911
912             verify(ready, times(1)).countDown();
913         }};
914     }
915
916     @Test
917     public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
918         new JavaTestKit(getSystem()) {{
919             TestShardManager shardManager = newTestShardManager();
920
921             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
922             shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
923
924             verify(ready, never()).countDown();
925
926             shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId,
927                     "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
928                     DataStoreVersions.CURRENT_VERSION));
929
930             shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
931
932             verify(ready, times(1)).countDown();
933         }};
934     }
935
936     @Test
937     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
938         TestShardManager shardManager = newTestShardManager();
939
940         shardManager.onReceiveCommand(new RoleChangeNotification(
941                 "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
942
943         verify(ready, never()).countDown();
944     }
945
946     @Test
947     public void testByDefaultSyncStatusIsFalse() throws Exception{
948         TestShardManager shardManager = newTestShardManager();
949
950         assertEquals(false, shardManager.getMBean().getSyncStatus());
951     }
952
953     @Test
954     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
955         TestShardManager shardManager = newTestShardManager();
956
957         shardManager.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
958                 RaftState.Follower.name(), RaftState.Leader.name()));
959
960         assertEquals(true, shardManager.getMBean().getSyncStatus());
961     }
962
963     @Test
964     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
965         TestShardManager shardManager = newTestShardManager();
966
967         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
968         shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
969                 RaftState.Follower.name(), RaftState.Candidate.name()));
970
971         assertEquals(false, shardManager.getMBean().getSyncStatus());
972
973         // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
974         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(
975                 true, shardId));
976
977         assertEquals(false, shardManager.getMBean().getSyncStatus());
978     }
979
980     @Test
981     public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
982         TestShardManager shardManager = newTestShardManager();
983
984         String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
985         shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
986                 RaftState.Candidate.name(), RaftState.Follower.name()));
987
988         // Initially will be false
989         assertEquals(false, shardManager.getMBean().getSyncStatus());
990
991         // Send status true will make sync status true
992         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, shardId));
993
994         assertEquals(true, shardManager.getMBean().getSyncStatus());
995
996         // Send status false will make sync status false
997         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(false, shardId));
998
999         assertEquals(false, shardManager.getMBean().getSyncStatus());
1000
1001     }
1002
1003     @Test
1004     public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
1005         TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() {
1006             @Override
1007             public List<String> getMemberShardNames(String memberName) {
1008                 return Arrays.asList("default", "astronauts");
1009             }
1010         }));
1011
1012         // Initially will be false
1013         assertEquals(false, shardManager.getMBean().getSyncStatus());
1014
1015         // Make default shard leader
1016         String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix;
1017         shardManager.onReceiveCommand(new RoleChangeNotification(defaultShardId,
1018                 RaftState.Follower.name(), RaftState.Leader.name()));
1019
1020         // default = Leader, astronauts is unknown so sync status remains false
1021         assertEquals(false, shardManager.getMBean().getSyncStatus());
1022
1023         // Make astronauts shard leader as well
1024         String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1025         shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
1026                 RaftState.Follower.name(), RaftState.Leader.name()));
1027
1028         // Now sync status should be true
1029         assertEquals(true, shardManager.getMBean().getSyncStatus());
1030
1031         // Make astronauts a Follower
1032         shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
1033                 RaftState.Leader.name(), RaftState.Follower.name()));
1034
1035         // Sync status is not true
1036         assertEquals(false, shardManager.getMBean().getSyncStatus());
1037
1038         // Make the astronauts follower sync status true
1039         shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
1040
1041         // Sync status is now true
1042         assertEquals(true, shardManager.getMBean().getSyncStatus());
1043
1044     }
1045
1046     @Test
1047     public void testOnReceiveSwitchShardBehavior() throws Exception {
1048         new JavaTestKit(getSystem()) {{
1049             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
1050
1051             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1052             shardManager.tell(new ActorInitialized(), mockShardActor);
1053
1054             shardManager.tell(new SwitchShardBehavior(mockShardName, "Leader", 1000), getRef());
1055
1056             SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor, SwitchBehavior.class);
1057
1058             assertEquals(RaftState.Leader, switchBehavior.getNewState());
1059             assertEquals(1000, switchBehavior.getNewTerm());
1060         }};
1061     }
1062
1063     @Test
1064     public void testOnCreateShard() {
1065         new JavaTestKit(getSystem()) {{
1066             datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1067
1068             ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
1069                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1070
1071             SchemaContext schemaContext = TestModel.createTestContext();
1072             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1073
1074             DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100).
1075                     persistent(false).build();
1076             Shard.Builder shardBuilder = Shard.builder();
1077
1078             ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1079                     "foo", null, Arrays.asList("member-1", "member-5", "member-6"));
1080             shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), getRef());
1081
1082             expectMsgClass(duration("5 seconds"), Success.class);
1083
1084             shardManager.tell(new FindLocalShard("foo", true), getRef());
1085
1086             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1087
1088             assertEquals("isRecoveryApplicable", false, shardBuilder.getDatastoreContext().isPersistent());
1089             assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig().
1090                     getPeerAddressResolver() instanceof ShardPeerAddressResolver);
1091             assertEquals("peerMembers", Sets.newHashSet(new ShardIdentifier("foo", "member-5", shardMrgIDSuffix).toString(),
1092                     new ShardIdentifier("foo", "member-6", shardMrgIDSuffix).toString()),
1093                     shardBuilder.getPeerAddresses().keySet());
1094             assertEquals("ShardIdentifier", new ShardIdentifier("foo", "member-1", shardMrgIDSuffix),
1095                     shardBuilder.getId());
1096             assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1097
1098             // Send CreateShard with same name - should return Success with a message.
1099
1100             shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1101
1102             Success success = expectMsgClass(duration("5 seconds"), Success.class);
1103             assertNotNull("Success status is null", success.status());
1104         }};
1105     }
1106
1107     @Test
1108     public void testOnCreateShardWithLocalMemberNotInShardConfig() {
1109         new JavaTestKit(getSystem()) {{
1110             datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
1111
1112             ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
1113                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1114
1115             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1116
1117             Shard.Builder shardBuilder = Shard.builder();
1118             ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1119                     "foo", null, Arrays.asList("member-5", "member-6"));
1120
1121             shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1122             expectMsgClass(duration("5 seconds"), Success.class);
1123
1124             shardManager.tell(new FindLocalShard("foo", true), getRef());
1125             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1126
1127             assertEquals("peerMembers size", 0, shardBuilder.getPeerAddresses().size());
1128             assertEquals("schemaContext", DisableElectionsRaftPolicy.class.getName(),
1129                     shardBuilder.getDatastoreContext().getShardRaftConfig().getCustomRaftPolicyImplementationClass());
1130         }};
1131     }
1132
1133     @Test
1134     public void testOnCreateShardWithNoInitialSchemaContext() {
1135         new JavaTestKit(getSystem()) {{
1136             ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
1137                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1138
1139             Shard.Builder shardBuilder = Shard.builder();
1140
1141             ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1142                     "foo", null, Arrays.asList("member-1"));
1143             shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
1144
1145             expectMsgClass(duration("5 seconds"), Success.class);
1146
1147             SchemaContext schemaContext = TestModel.createTestContext();
1148             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1149
1150             shardManager.tell(new FindLocalShard("foo", true), getRef());
1151
1152             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1153
1154             assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
1155             assertNotNull("schemaContext is null", shardBuilder.getDatastoreContext());
1156         }};
1157     }
1158
1159     @Test
1160     public void testGetSnapshot() throws Throwable {
1161         JavaTestKit kit = new JavaTestKit(getSystem());
1162
1163         MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1164                    put("shard1", Arrays.asList("member-1")).
1165                    put("shard2", Arrays.asList("member-1")).build());
1166
1167         ActorRef shardManager = getSystem().actorOf(newShardMgrProps(mockConfig).withDispatcher(
1168                 Dispatchers.DefaultDispatcherId()));
1169
1170         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1171         Failure failure = kit.expectMsgClass(Failure.class);
1172         assertEquals("Failure cause type", IllegalStateException.class, failure.cause().getClass());
1173
1174         kit = new JavaTestKit(getSystem());
1175
1176         shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
1177
1178         shardManager.tell(new FindLocalShard("shard1", true), kit.getRef());
1179         kit.expectMsgClass(LocalShardFound.class);
1180         shardManager.tell(new FindLocalShard("shard2", true), kit.getRef());
1181         kit.expectMsgClass(LocalShardFound.class);
1182
1183         shardManager.tell(GetSnapshot.INSTANCE, kit.getRef());
1184
1185         DatastoreSnapshot datastoreSnapshot = kit.expectMsgClass(DatastoreSnapshot.class);
1186
1187         assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
1188         List<ShardSnapshot> shardSnapshots = datastoreSnapshot.getShardSnapshots();
1189         Set<String> actualShardNames = new HashSet<>();
1190         for(ShardSnapshot s: shardSnapshots) {
1191             actualShardNames.add(s.getName());
1192         }
1193
1194         assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), actualShardNames);
1195
1196         shardManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
1197     }
1198
1199     @Test
1200     public void testAddShardReplicaForNonExistentShardConfig() throws Exception {
1201         new JavaTestKit(getSystem()) {{
1202             ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
1203                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1204
1205             shardManager.tell(new AddShardReplica("model-inventory"), getRef());
1206             Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class);
1207
1208             assertEquals("Failure obtained", true,
1209                           (resp.cause() instanceof IllegalArgumentException));
1210         }};
1211     }
1212
1213     @Test
1214     public void testAddShardReplica() throws Exception {
1215         MockConfiguration mockConfig =
1216                 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1217                    put("default", Arrays.asList("member-1", "member-2")).
1218                    put("astronauts", Arrays.asList("member-2")).build());
1219
1220         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1221
1222         // Create an ActorSystem ShardManager actor for member-1.
1223         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
1224         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1225         ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1226         final TestActorRef<ForwardingShardManager> newReplicaShardManager = TestActorRef.create(system1,
1227                 newPropsShardMgrWithMockShardActor("shardManager1", mockDefaultShardActor,
1228                    new ClusterWrapperImpl(system1), mockConfig), shardManagerID);
1229
1230         // Create an ActorSystem ShardManager actor for member-2.
1231         final ActorSystem system2 = ActorSystem.create("cluster-test",
1232             ConfigFactory.load().getConfig("Member2"));
1233         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1234
1235         String name = new ShardIdentifier("astronauts", "member-2", "config").toString();
1236         final TestActorRef<MockRespondActor> mockShardLeaderActor =
1237             TestActorRef.create(system2, Props.create(MockRespondActor.class), name);
1238         final TestActorRef<ForwardingShardManager> leaderShardManager = TestActorRef.create(system2,
1239                 newPropsShardMgrWithMockShardActor("shardManager2", mockShardLeaderActor,
1240                         new ClusterWrapperImpl(system2), mockConfig), shardManagerID);
1241
1242         new JavaTestKit(system1) {{
1243
1244             newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1245             leaderShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1246
1247             leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1248
1249             String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
1250             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1251             leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2,
1252                     Optional.of(mock(DataTree.class)), leaderVersion), mockShardLeaderActor);
1253             leaderShardManager.tell(new RoleChangeNotification(memberId2,
1254                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardLeaderActor);
1255
1256             newReplicaShardManager.underlyingActor().waitForMemberUp();
1257             leaderShardManager.underlyingActor().waitForMemberUp();
1258
1259             //construct a mock response message
1260             AddServerReply response = new AddServerReply(ServerChangeStatus.OK, memberId2);
1261             mockShardLeaderActor.underlyingActor().updateResponse(response);
1262             newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
1263             AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1264                 AddServer.class);
1265             String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1266             assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
1267             newReplicaShardManager.underlyingActor()
1268                 .verifySnapshotPersisted(Sets.newHashSet("default", "astronauts"));
1269             expectMsgClass(duration("5 seconds"), Status.Success.class);
1270         }};
1271
1272         JavaTestKit.shutdownActorSystem(system1);
1273         JavaTestKit.shutdownActorSystem(system2);
1274     }
1275
1276     @Test
1277     public void testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader() throws Exception {
1278         new JavaTestKit(getSystem()) {{
1279             TestActorRef<ForwardingShardManager> shardManager = TestActorRef.create(getSystem(),
1280                     newPropsShardMgrWithMockShardActor(), shardMgrID);
1281
1282             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1283             shardManager.tell(new ActorInitialized(), mockShardActor);
1284
1285             String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix;
1286             AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null);
1287             ActorRef leaderShardActor = shardManager.underlyingActor().getContext().actorOf(
1288                     Props.create(MockRespondActor.class, addServerReply), leaderId);
1289
1290             MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString());
1291
1292             String newReplicaId = "member-1-shard-default-" + shardMrgIDSuffix;
1293             shardManager.tell(new RoleChangeNotification(newReplicaId,
1294                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
1295             shardManager.tell(new ShardLeaderStateChanged(newReplicaId, leaderId, Optional.<DataTree>absent(),
1296                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
1297
1298             shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1299
1300             MessageCollectorActor.expectFirstMatching(leaderShardActor, AddServer.class);
1301
1302             Failure resp = expectMsgClass(duration("5 seconds"), Failure.class);
1303             assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1304
1305             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1306             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1307
1308             // Send message again to verify previous in progress state is cleared
1309
1310             shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1311             resp = expectMsgClass(duration("5 seconds"), Failure.class);
1312             assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1313
1314             // Send message again with an AddServer timeout to verify the pre-existing shard actor isn't terminated.
1315
1316             shardManager.tell(newDatastoreContextFactory(datastoreContextBuilder.
1317                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build()), getRef());
1318             leaderShardActor.tell(MockRespondActor.CLEAR_RESPONSE, ActorRef.noSender());
1319             shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1320             expectMsgClass(duration("5 seconds"), Failure.class);
1321
1322             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1323             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1324
1325             leaderShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
1326         }};
1327     }
1328
1329     @Test
1330     public void testAddShardReplicaWithPreExistingLocalReplicaLeader() throws Exception {
1331         new JavaTestKit(getSystem()) {{
1332             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
1333             ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
1334
1335             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1336             shardManager.tell(new ActorInitialized(), mockShardActor);
1337             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mock(DataTree.class)),
1338                     DataStoreVersions.CURRENT_VERSION), getRef());
1339             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
1340                     RaftState.Leader.name())), mockShardActor);
1341
1342             shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
1343             Failure resp = expectMsgClass(duration("5 seconds"), Failure.class);
1344             assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
1345
1346             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
1347             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1348         }};
1349     }
1350
1351     @Test
1352     public void testAddShardReplicaWithAddServerReplyFailure() throws Exception {
1353         new JavaTestKit(getSystem()) {{
1354             JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
1355
1356             MockConfiguration mockConfig =
1357                     new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1358                        put("astronauts", Arrays.asList("member-2")).build());
1359
1360             ActorRef mockNewReplicaShardActor = newMockShardActor(getSystem(), "astronauts", "member-1");
1361             TestActorRef<ForwardingShardManager> shardManager = TestActorRef.create(getSystem(),
1362                     newPropsShardMgrWithMockShardActor("newReplicaShardManager", mockNewReplicaShardActor,
1363                             new MockClusterWrapper(), mockConfig), shardMgrID);
1364             shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1365
1366             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1367
1368             JavaTestKit terminateWatcher = new JavaTestKit(getSystem());
1369             terminateWatcher.watch(mockNewReplicaShardActor);
1370
1371             shardManager.tell(new AddShardReplica("astronauts"), getRef());
1372
1373             AddServer addServerMsg = mockShardLeaderKit.expectMsgClass(AddServer.class);
1374             assertEquals("AddServer serverId", "member-1-shard-astronauts-" + shardMrgIDSuffix,
1375                     addServerMsg.getNewServerId());
1376             mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.TIMEOUT, null));
1377
1378             Failure failure = expectMsgClass(duration("5 seconds"), Failure.class);
1379             assertEquals("Failure cause", TimeoutException.class, failure.cause().getClass());
1380
1381             shardManager.tell(new FindLocalShard("astronauts", false), getRef());
1382             expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
1383
1384             terminateWatcher.expectTerminated(mockNewReplicaShardActor);
1385
1386             shardManager.tell(new AddShardReplica("astronauts"), getRef());
1387             mockShardLeaderKit.expectMsgClass(AddServer.class);
1388             mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.NO_LEADER, null));
1389             failure = expectMsgClass(duration("5 seconds"), Failure.class);
1390             assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass());
1391         }};
1392     }
1393
1394     @Test
1395     public void testAddShardReplicaWithAlreadyInProgress() throws Exception {
1396         new JavaTestKit(getSystem()) {{
1397             JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
1398             JavaTestKit secondRequestKit = new JavaTestKit(getSystem());
1399
1400             MockConfiguration mockConfig =
1401                     new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1402                        put("astronauts", Arrays.asList("member-2")).build());
1403
1404             TestActorRef<ForwardingShardManager> shardManager = TestActorRef.create(getSystem(),
1405                     newPropsShardMgrWithMockShardActor("newReplicaShardManager", mockShardActor,
1406                             new MockClusterWrapper(), mockConfig), shardMgrID);
1407             shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
1408
1409             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1410
1411             shardManager.tell(new AddShardReplica("astronauts"), getRef());
1412
1413             mockShardLeaderKit.expectMsgClass(AddServer.class);
1414
1415             shardManager.tell(new AddShardReplica("astronauts"), secondRequestKit.getRef());
1416
1417             secondRequestKit.expectMsgClass(duration("5 seconds"), Failure.class);
1418         }};
1419     }
1420
1421     @Test
1422     public void testAddShardReplicaWithFindPrimaryTimeout() throws Exception {
1423         new JavaTestKit(getSystem()) {{
1424             MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1425                        put("astronauts", Arrays.asList("member-2")).build());
1426
1427             ActorRef newReplicaShardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(
1428                     "shardManager", mockShardActor, new MockClusterWrapper(), mockConfig));
1429
1430             newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1431             MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2", getRef().path().toString());
1432
1433             newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
1434             Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class);
1435             assertEquals("Failure obtained", true,
1436                           (resp.cause() instanceof RuntimeException));
1437         }};
1438     }
1439
1440     @Test
1441     public void testRemoveShardReplicaForNonExistentShard() throws Exception {
1442         new JavaTestKit(getSystem()) {{
1443             ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
1444                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1445
1446             shardManager.tell(new RemoveShardReplica("model-inventory"), getRef());
1447             Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class);
1448             assertEquals("Failure obtained", true,
1449                          (resp.cause() instanceof IllegalArgumentException));
1450         }};
1451
1452     }
1453
1454     @Test
1455     public void testServerRemovedShardActorNotRunning() throws Exception {
1456         new JavaTestKit(getSystem()) {{
1457             MockConfiguration mockConfig =
1458                     new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1459                             put("default", Arrays.asList("member-1", "member-2")).
1460                             put("astronauts", Arrays.asList("member-2")).
1461                             put("people", Arrays.asList("member-1", "member-2")).build());
1462
1463             TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
1464                     newShardMgrProps(mockConfig));
1465
1466             shardManager.underlyingActor().waitForRecoveryComplete();
1467
1468             shardManager.tell(new FindLocalShard("people", false), getRef());
1469             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1470
1471             shardManager.tell(new FindLocalShard("default", false), getRef());
1472             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1473
1474             // Removed the default shard replica from member-1
1475             ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
1476             final ShardIdentifier shardId = builder.shardName("default").memberName("member-1").type("config1").build();
1477             shardManager.tell(new ServerRemoved(shardId.toString()), getRef());
1478
1479             shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1480         }};
1481     }
1482
1483     @Test
1484     public void testServerRemovedShardActorRunning() throws Exception {
1485         new JavaTestKit(getSystem()) {{
1486             MockConfiguration mockConfig =
1487                     new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1488                             put("default", Arrays.asList("member-1", "member-2")).
1489                             put("astronauts", Arrays.asList("member-2")).
1490                             put("people", Arrays.asList("member-1", "member-2")).build());
1491
1492             TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
1493                     newShardMgrProps(mockConfig));
1494
1495             TestActorRef<MessageCollectorActor> shard = TestActorRef.create(getSystem(), MessageCollectorActor.props());
1496
1497             watch(shard);
1498
1499             shardManager.underlyingActor().waitForRecoveryComplete();
1500
1501             shardManager.underlyingActor().addShardActor("default", shard);
1502
1503             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1504
1505             shardManager.tell(new FindLocalShard("people", false), getRef());
1506             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1507
1508             shardManager.tell(new FindLocalShard("default", false), getRef());
1509             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1510
1511             // Removed the default shard replica from member-1
1512             ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
1513             final ShardIdentifier shardId = builder.shardName("default").memberName("member-1").type("config1").build();
1514             shardManager.tell(new ServerRemoved(shardId.toString()), getRef());
1515
1516             shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people"));
1517
1518             expectMsgClass(duration("5 seconds"), Terminated.class);
1519         }};
1520     }
1521
1522
1523     @Test
1524     public void testShardPersistenceWithRestoredData() throws Exception {
1525         new JavaTestKit(getSystem()) {{
1526             MockConfiguration mockConfig =
1527                 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1528                    put("default", Arrays.asList("member-1", "member-2")).
1529                    put("astronauts", Arrays.asList("member-2")).
1530                    put("people", Arrays.asList("member-1", "member-2")).build());
1531             String[] restoredShards = {"default", "astronauts"};
1532             ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards));
1533             InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot);
1534
1535             //create shardManager to come up with restored data
1536             TestActorRef<TestShardManager> newRestoredShardManager = TestActorRef.create(getSystem(),
1537                     newShardMgrProps(mockConfig));
1538
1539             newRestoredShardManager.underlyingActor().waitForRecoveryComplete();
1540
1541             newRestoredShardManager.tell(new FindLocalShard("people", false), getRef());
1542             LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
1543             assertEquals("for uninitialized shard", "people", notFound.getShardName());
1544
1545             //Verify a local shard is created for the restored shards,
1546             //although we expect a NotInitializedException for the shards as the actor initialization
1547             //message is not sent for them
1548             newRestoredShardManager.tell(new FindLocalShard("default", false), getRef());
1549             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1550
1551             newRestoredShardManager.tell(new FindLocalShard("astronauts", false), getRef());
1552             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
1553         }};
1554     }
1555
1556
1557     private static class TestShardManager extends ShardManager {
1558         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
1559         private final CountDownLatch snapshotPersist = new CountDownLatch(1);
1560         private ShardManagerSnapshot snapshot;
1561         private Map<String, ActorRef> shardActors = new HashMap<>();
1562
1563         private TestShardManager(Builder builder) {
1564             super(builder);
1565         }
1566
1567         @Override
1568         public void handleRecover(Object message) throws Exception {
1569             try {
1570                 super.handleRecover(message);
1571             } finally {
1572                 if(message instanceof RecoveryCompleted) {
1573                     recoveryComplete.countDown();
1574                 }
1575             }
1576         }
1577
1578         void waitForRecoveryComplete() {
1579             assertEquals("Recovery complete", true,
1580                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
1581         }
1582
1583         public static Builder builder(DatastoreContext.Builder datastoreContextBuilder) {
1584             return new Builder(datastoreContextBuilder);
1585         }
1586
1587         private static class Builder extends ShardManager.Builder {
1588             Builder(DatastoreContext.Builder datastoreContextBuilder) {
1589                 cluster(new MockClusterWrapper()).configuration(new MockConfiguration());
1590                 datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build()));
1591                 waitTillReadyCountdownLatch(ready).primaryShardInfoCache(new PrimaryShardInfoFutureCache());
1592             }
1593
1594             @Override
1595             public Props props() {
1596                 verify();
1597                 return Props.create(TestShardManager.class, this);
1598             }
1599         }
1600
1601         @Override
1602         public void saveSnapshot(Object obj) {
1603             snapshot = (ShardManagerSnapshot) obj;
1604             snapshotPersist.countDown();
1605         }
1606
1607         void verifySnapshotPersisted(Set<String> shardList) {
1608             assertEquals("saveSnapshot invoked", true,
1609                     Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
1610             assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
1611         }
1612
1613         @Override
1614         protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
1615             if(shardActors.get(info.getShardName()) != null){
1616                 return shardActors.get(info.getShardName());
1617             }
1618             return super.newShardActor(schemaContext, info);
1619         }
1620
1621         public void addShardActor(String shardName, ActorRef actorRef){
1622             shardActors.put(shardName, actorRef);
1623         }
1624     }
1625
1626     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
1627         private static final long serialVersionUID = 1L;
1628         private final Creator<ShardManager> delegate;
1629
1630         public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
1631             this.delegate = delegate;
1632         }
1633
1634         @Override
1635         public ShardManager create() throws Exception {
1636             return delegate.create();
1637         }
1638     }
1639
1640     interface MessageInterceptor extends Function<Object, Object> {
1641         boolean canIntercept(Object message);
1642     }
1643
1644     private MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) {
1645         return new MessageInterceptor(){
1646             @Override
1647             public Object apply(Object message) {
1648                 return new RemotePrimaryShardFound(Serialization.serializedActorPath(primaryActor), (short) 1);
1649             }
1650
1651             @Override
1652             public boolean canIntercept(Object message) {
1653                 return message instanceof FindPrimary;
1654             }
1655         };
1656     }
1657
1658     private static class ForwardingShardManager extends ShardManager {
1659         private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
1660         private CountDownLatch memberUpReceived = new CountDownLatch(1);
1661         private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
1662         private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
1663         private CountDownLatch memberReachableReceived = new CountDownLatch(1);
1664         private final ActorRef shardActor;
1665         private final String name;
1666         private final CountDownLatch snapshotPersist = new CountDownLatch(1);
1667         private ShardManagerSnapshot snapshot;
1668         private volatile MessageInterceptor messageInterceptor;
1669
1670         public ForwardingShardManager(Builder builder, String name, ActorRef shardActor) {
1671             super(builder);
1672             this.shardActor = shardActor;
1673             this.name = name;
1674         }
1675
1676         void setMessageInterceptor(MessageInterceptor messageInterceptor) {
1677             this.messageInterceptor = messageInterceptor;
1678         }
1679
1680
1681         @Override
1682         public void handleCommand(Object message) throws Exception {
1683             try{
1684                 if(messageInterceptor != null && messageInterceptor.canIntercept(message)) {
1685                     getSender().tell(messageInterceptor.apply(message), getSelf());
1686                 } else {
1687                     super.handleCommand(message);
1688                 }
1689             } finally {
1690                 if(message instanceof FindPrimary) {
1691                     findPrimaryMessageReceived.countDown();
1692                 } else if(message instanceof ClusterEvent.MemberUp) {
1693                     String role = ((ClusterEvent.MemberUp)message).member().roles().head();
1694                     if(!getCluster().getCurrentMemberName().equals(role)) {
1695                         memberUpReceived.countDown();
1696                     }
1697                 } else if(message instanceof ClusterEvent.MemberRemoved) {
1698                     String role = ((ClusterEvent.MemberRemoved)message).member().roles().head();
1699                     if(!getCluster().getCurrentMemberName().equals(role)) {
1700                         memberRemovedReceived.countDown();
1701                     }
1702                 } else if(message instanceof ClusterEvent.UnreachableMember) {
1703                     String role = ((ClusterEvent.UnreachableMember)message).member().roles().head();
1704                     if(!getCluster().getCurrentMemberName().equals(role)) {
1705                         memberUnreachableReceived.countDown();
1706                     }
1707                 } else if(message instanceof ClusterEvent.ReachableMember) {
1708                     String role = ((ClusterEvent.ReachableMember)message).member().roles().head();
1709                     if(!getCluster().getCurrentMemberName().equals(role)) {
1710                         memberReachableReceived.countDown();
1711                     }
1712                 }
1713             }
1714         }
1715
1716         @Override
1717         public String persistenceId() {
1718             return name;
1719         }
1720
1721         @Override
1722         protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
1723             return shardActor;
1724         }
1725
1726         void waitForMemberUp() {
1727             assertEquals("MemberUp received", true,
1728                     Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
1729             memberUpReceived = new CountDownLatch(1);
1730         }
1731
1732         void waitForMemberRemoved() {
1733             assertEquals("MemberRemoved received", true,
1734                     Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
1735             memberRemovedReceived = new CountDownLatch(1);
1736         }
1737
1738         void waitForUnreachableMember() {
1739             assertEquals("UnreachableMember received", true,
1740                 Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS
1741                 ));
1742             memberUnreachableReceived = new CountDownLatch(1);
1743         }
1744
1745         void waitForReachableMember() {
1746             assertEquals("ReachableMember received", true,
1747                 Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
1748             memberReachableReceived = new CountDownLatch(1);
1749         }
1750
1751         void verifyFindPrimary() {
1752             assertEquals("FindPrimary received", true,
1753                     Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
1754             findPrimaryMessageReceived = new CountDownLatch(1);
1755         }
1756
1757         @Override
1758         public void saveSnapshot(Object obj) {
1759             snapshot = (ShardManagerSnapshot) obj;
1760             snapshotPersist.countDown();
1761         }
1762
1763         void verifySnapshotPersisted(Set<String> shardList) {
1764             assertEquals("saveSnapshot invoked", true,
1765                 Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
1766             assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
1767         }
1768     }
1769
1770     private static class MockRespondActor extends MessageCollectorActor {
1771         static final String CLEAR_RESPONSE = "clear-response";
1772
1773         private volatile Object responseMsg;
1774
1775         @SuppressWarnings("unused")
1776         public MockRespondActor() {
1777         }
1778
1779         @SuppressWarnings("unused")
1780         public MockRespondActor(Object responseMsg) {
1781             this.responseMsg = responseMsg;
1782         }
1783
1784         public void updateResponse(Object response) {
1785             responseMsg = response;
1786         }
1787
1788         @Override
1789         public void onReceive(Object message) throws Exception {
1790             super.onReceive(message);
1791             if (message instanceof AddServer) {
1792                 if (responseMsg != null) {
1793                     getSender().tell(responseMsg, getSelf());
1794                 }
1795             } if(message.equals(CLEAR_RESPONSE)) {
1796                 responseMsg = null;
1797             }
1798         }
1799     }
1800 }