BUG 2187 - Creating ShardReplica
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardManagerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Mockito.mock;
17 import static org.mockito.Mockito.never;
18 import static org.mockito.Mockito.times;
19 import static org.mockito.Mockito.verify;
20 import akka.actor.ActorRef;
21 import akka.actor.ActorSystem;
22 import akka.actor.AddressFromURIString;
23 import akka.actor.Props;
24 import akka.actor.Status;
25 import akka.cluster.Cluster;
26 import akka.cluster.ClusterEvent;
27 import akka.dispatch.Dispatchers;
28 import akka.japi.Creator;
29 import akka.pattern.Patterns;
30 import akka.persistence.RecoveryCompleted;
31 import akka.testkit.JavaTestKit;
32 import akka.testkit.TestActorRef;
33 import akka.util.Timeout;
34 import com.google.common.base.Optional;
35 import com.google.common.collect.ImmutableMap;
36 import com.google.common.collect.ImmutableSet;
37 import com.google.common.collect.Sets;
38 import com.google.common.util.concurrent.Uninterruptibles;
39 import com.typesafe.config.ConfigFactory;
40 import java.net.URI;
41 import java.util.Arrays;
42 import java.util.List;
43 import java.util.Map;
44 import java.util.concurrent.CountDownLatch;
45 import java.util.concurrent.TimeUnit;
46 import org.junit.After;
47 import org.junit.Before;
48 import org.junit.Test;
49 import org.mockito.Mock;
50 import org.mockito.MockitoAnnotations;
51 import org.opendaylight.controller.cluster.datastore.config.Configuration;
52 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
53 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
54 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
55 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
56 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
57 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
58 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
59 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
60 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
61 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
62 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
63 import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply;
64 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
65 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
66 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
67 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
68 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
69 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
70 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
71 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
72 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
73 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
74 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
75 import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
76 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
77 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
78 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
79 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
80 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
81 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
82 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
83 import org.opendaylight.controller.cluster.raft.RaftState;
84 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
85 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
86 import org.opendaylight.controller.cluster.raft.messages.AddServer;
87 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
88 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
89 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
90 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
91 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
92 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
93 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
94 import scala.concurrent.Await;
95 import scala.concurrent.Future;
96 import scala.concurrent.duration.FiniteDuration;
97
98 public class ShardManagerTest extends AbstractActorTest {
99     private static int ID_COUNTER = 1;
100
101     private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
102     private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix;
103
104     @Mock
105     private static CountDownLatch ready;
106
107     private static TestActorRef<MessageCollectorActor> mockShardActor;
108
109     private static String mockShardName;
110
111     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
112             dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
113                    .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
114
115     private static ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
116         String name = new ShardIdentifier(shardName, memberName,"config").toString();
117         return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
118     }
119
120     private final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
121
122     @Before
123     public void setUp() {
124         MockitoAnnotations.initMocks(this);
125
126         InMemoryJournal.clear();
127
128         if(mockShardActor == null) {
129             mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString();
130             mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), mockShardName);
131         }
132
133         mockShardActor.underlyingActor().clear();
134     }
135
136     @After
137     public void tearDown() {
138         InMemoryJournal.clear();
139     }
140
141     private Props newShardMgrProps() {
142         return newShardMgrProps(new MockConfiguration());
143     }
144
145     private Props newShardMgrProps(Configuration config) {
146         return ShardManager.props(new MockClusterWrapper(), config, datastoreContextBuilder.build(), ready,
147                 primaryShardInfoCache);
148     }
149
150     private Props newPropsShardMgrWithMockShardActor() {
151         return newPropsShardMgrWithMockShardActor("shardManager", mockShardActor, new MockClusterWrapper(),
152                 new MockConfiguration());
153     }
154
155     private Props newPropsShardMgrWithMockShardActor(final String name, final ActorRef shardActor,
156             final ClusterWrapper clusterWrapper, final Configuration config) {
157         Creator<ShardManager> creator = new Creator<ShardManager>() {
158             private static final long serialVersionUID = 1L;
159             @Override
160             public ShardManager create() throws Exception {
161                 return new ForwardingShardManager(clusterWrapper, config, datastoreContextBuilder.build(),
162                         ready, name, shardActor, primaryShardInfoCache);
163             }
164         };
165
166         return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
167     }
168
169     @Test
170     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
171         new JavaTestKit(getSystem()) {{
172             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
173
174             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
175
176             shardManager.tell(new FindPrimary("non-existent", false), getRef());
177
178             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
179         }};
180     }
181
182     @Test
183     public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception {
184         new JavaTestKit(getSystem()) {{
185             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
186
187             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
188
189             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
190             shardManager.tell(new ActorInitialized(), mockShardActor);
191
192             DataTree mockDataTree = mock(DataTree.class);
193             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
194                     DataStoreVersions.CURRENT_VERSION), getRef());
195
196             MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
197             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
198                     RaftState.Leader.name())), mockShardActor);
199
200             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
201
202             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
203             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
204                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
205             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
206         }};
207     }
208
209     @Test
210     public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() throws Exception {
211         new JavaTestKit(getSystem()) {{
212             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
213
214             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
215             shardManager.tell(new ActorInitialized(), mockShardActor);
216
217             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
218             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
219             shardManager.tell(new RoleChangeNotification(memberId1,
220                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
221             shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION), mockShardActor);
222
223             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
224
225             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
226         }};
227     }
228
229     @Test
230     public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception {
231         new JavaTestKit(getSystem()) {{
232             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
233
234             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
235             shardManager.tell(new ActorInitialized(), mockShardActor);
236
237             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
238             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
239
240             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
241             shardManager.tell(new RoleChangeNotification(memberId1,
242                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
243             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
244             shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, Optional.<DataTree>absent(),
245                     leaderVersion), mockShardActor);
246
247             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
248
249             RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
250             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
251                     primaryFound.getPrimaryPath().contains("member-2-shard-default"));
252             assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
253         }};
254     }
255
256     @Test
257     public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception {
258         new JavaTestKit(getSystem()) {{
259             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
260
261             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
262
263             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
264         }};
265     }
266
267     @Test
268     public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception {
269         new JavaTestKit(getSystem()) {{
270             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
271
272             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
273             shardManager.tell(new ActorInitialized(), mockShardActor);
274
275             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
276
277             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
278         }};
279     }
280
281     @Test
282     public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception {
283         new JavaTestKit(getSystem()) {{
284             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
285
286             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
287             shardManager.tell(new ActorInitialized(), mockShardActor);
288
289             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
290             shardManager.tell(new RoleChangeNotification(memberId,
291                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
292
293             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
294
295             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
296
297             DataTree mockDataTree = mock(DataTree.class);
298             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
299                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
300
301             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
302
303             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
304             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
305                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
306             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
307         }};
308     }
309
310     @Test
311     public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
312         new JavaTestKit(getSystem()) {{
313             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
314
315             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
316
317             // We're passing waitUntilInitialized = true to FindPrimary so the response should be
318             // delayed until we send ActorInitialized and RoleChangeNotification.
319             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
320
321             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
322
323             shardManager.tell(new ActorInitialized(), mockShardActor);
324
325             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
326
327             String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
328             shardManager.tell(new RoleChangeNotification(memberId,
329                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor);
330
331             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
332
333             DataTree mockDataTree = mock(DataTree.class);
334             shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree),
335                     DataStoreVersions.CURRENT_VERSION), mockShardActor);
336
337             LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
338             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
339                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
340             assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
341
342             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
343         }};
344     }
345
346     @Test
347     public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception {
348         new JavaTestKit(getSystem()) {{
349             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
350
351             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
352
353             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
354
355             expectMsgClass(duration("2 seconds"), NotInitializedException.class);
356
357             shardManager.tell(new ActorInitialized(), mockShardActor);
358
359             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
360         }};
361     }
362
363     @Test
364     public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception {
365         new JavaTestKit(getSystem()) {{
366             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
367
368             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
369             shardManager.tell(new ActorInitialized(), mockShardActor);
370             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
371                     null, RaftState.Candidate.name()), mockShardActor);
372
373             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
374
375             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
376         }};
377     }
378
379     @Test
380     public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() throws Exception {
381         new JavaTestKit(getSystem()) {{
382             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
383
384             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
385             shardManager.tell(new ActorInitialized(), mockShardActor);
386             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
387                     null, RaftState.IsolatedLeader.name()), mockShardActor);
388
389             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
390
391             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
392         }};
393     }
394
395     @Test
396     public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
397         new JavaTestKit(getSystem()) {{
398             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
399
400             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
401             shardManager.tell(new ActorInitialized(), mockShardActor);
402
403             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
404
405             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
406         }};
407     }
408
409     @Test
410     public void testOnReceiveFindPrimaryForRemoteShard() throws Exception {
411         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
412
413         // Create an ActorSystem ShardManager actor for member-1.
414
415         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
416         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
417
418         ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
419
420         final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
421                 newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
422                         new MockConfiguration()), shardManagerID);
423
424         // Create an ActorSystem ShardManager actor for member-2.
425
426         final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
427
428         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
429
430         final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
431
432         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
433                 put("default", Arrays.asList("member-1", "member-2")).
434                 put("astronauts", Arrays.asList("member-2")).build());
435
436         final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
437                 newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
438                         mockConfig2), shardManagerID);
439
440         new JavaTestKit(system1) {{
441
442             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
443             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
444
445             shardManager2.tell(new ActorInitialized(), mockShardActor2);
446
447             String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
448             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
449             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2,
450                     Optional.of(mock(DataTree.class)), leaderVersion), mockShardActor2);
451             shardManager2.tell(new RoleChangeNotification(memberId2,
452                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
453
454             shardManager1.underlyingActor().waitForMemberUp();
455
456             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
457
458             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
459             String path = found.getPrimaryPath();
460             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
461             assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
462
463             shardManager2.underlyingActor().verifyFindPrimary();
464
465             Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
466
467             shardManager1.underlyingActor().waitForMemberRemoved();
468
469             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
470
471             expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
472         }};
473
474         JavaTestKit.shutdownActorSystem(system1);
475         JavaTestKit.shutdownActorSystem(system2);
476     }
477
478     @Test
479     public void testShardAvailabilityOnChangeOfMemberReachability() throws Exception {
480         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
481
482         // Create an ActorSystem ShardManager actor for member-1.
483
484         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
485         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
486
487         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
488
489         final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
490             newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
491                 new MockConfiguration()), shardManagerID);
492
493         // Create an ActorSystem ShardManager actor for member-2.
494
495         final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
496
497         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
498
499         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
500
501         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
502             put("default", Arrays.asList("member-1", "member-2")).build());
503
504         final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
505             newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
506                 mockConfig2), shardManagerID);
507
508         new JavaTestKit(system1) {{
509
510             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
511             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
512             shardManager1.tell(new ActorInitialized(), mockShardActor1);
513             shardManager2.tell(new ActorInitialized(), mockShardActor2);
514
515             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
516             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
517             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
518                 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
519             shardManager1.tell(new RoleChangeNotification(memberId1,
520                 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
521             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)),
522                     DataStoreVersions.CURRENT_VERSION),
523                 mockShardActor2);
524             shardManager2.tell(new RoleChangeNotification(memberId2,
525                 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
526             shardManager1.underlyingActor().waitForMemberUp();
527
528             shardManager1.tell(new FindPrimary("default", true), getRef());
529
530             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
531             String path = found.getPrimaryPath();
532             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
533
534             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
535                 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
536
537             shardManager1.underlyingActor().waitForUnreachableMember();
538
539             PeerDown peerDown = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
540             assertEquals("getMemberName", "member-2", peerDown.getMemberName());
541             MessageCollectorActor.clearMessages(mockShardActor1);
542
543             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
544                     createMemberRemoved("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
545
546             MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
547
548             shardManager1.tell(new FindPrimary("default", true), getRef());
549
550             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
551
552             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
553                 createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
554
555             shardManager1.underlyingActor().waitForReachableMember();
556
557             PeerUp peerUp = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
558             assertEquals("getMemberName", "member-2", peerUp.getMemberName());
559             MessageCollectorActor.clearMessages(mockShardActor1);
560
561             shardManager1.tell(new FindPrimary("default", true), getRef());
562
563             RemotePrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
564             String path1 = found1.getPrimaryPath();
565             assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
566
567             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
568                     createMemberUp("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
569
570             MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
571
572         }};
573
574         JavaTestKit.shutdownActorSystem(system1);
575         JavaTestKit.shutdownActorSystem(system2);
576     }
577
578     @Test
579     public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() throws Exception {
580         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
581
582         // Create an ActorSystem ShardManager actor for member-1.
583
584         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
585         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
586
587         final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
588
589         final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
590             newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
591                 new MockConfiguration()), shardManagerID);
592
593         // Create an ActorSystem ShardManager actor for member-2.
594
595         final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
596
597         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
598
599         final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
600
601         MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
602             put("default", Arrays.asList("member-1", "member-2")).build());
603
604         final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
605             newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
606                 mockConfig2), shardManagerID);
607
608         new JavaTestKit(system1) {{
609
610             shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
611             shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
612             shardManager1.tell(new ActorInitialized(), mockShardActor1);
613             shardManager2.tell(new ActorInitialized(), mockShardActor2);
614
615             String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
616             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
617             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2,
618                 Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1);
619             shardManager1.tell(new RoleChangeNotification(memberId1,
620                 RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1);
621             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)),
622                     DataStoreVersions.CURRENT_VERSION),
623                 mockShardActor2);
624             shardManager2.tell(new RoleChangeNotification(memberId2,
625                 RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
626             shardManager1.underlyingActor().waitForMemberUp();
627
628             shardManager1.tell(new FindPrimary("default", true), getRef());
629
630             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
631             String path = found.getPrimaryPath();
632             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
633
634             primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(system1.actorSelection(
635                     mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION, Optional.<DataTree>absent()));
636
637             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
638                 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
639
640             shardManager1.underlyingActor().waitForUnreachableMember();
641
642             shardManager1.tell(new FindPrimary("default", true), getRef());
643
644             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
645
646             assertNull("Expected primaryShardInfoCache entry removed", primaryShardInfoCache.getIfPresent("default"));
647
648             shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, Optional.of(mock(DataTree.class)),
649                     DataStoreVersions.CURRENT_VERSION), mockShardActor1);
650             shardManager1.tell(new RoleChangeNotification(memberId1,
651                 RaftState.Follower.name(), RaftState.Leader.name()), mockShardActor1);
652
653             shardManager1.tell(new FindPrimary("default", true), getRef());
654
655             LocalPrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
656             String path1 = found1.getPrimaryPath();
657             assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
658
659         }};
660
661         JavaTestKit.shutdownActorSystem(system1);
662         JavaTestKit.shutdownActorSystem(system2);
663     }
664
665
666     @Test
667     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
668         new JavaTestKit(getSystem()) {{
669             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
670
671             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
672
673             shardManager.tell(new FindLocalShard("non-existent", false), getRef());
674
675             LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
676
677             assertEquals("getShardName", "non-existent", notFound.getShardName());
678         }};
679     }
680
681     @Test
682     public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
683         new JavaTestKit(getSystem()) {{
684             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
685
686             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
687             shardManager.tell(new ActorInitialized(), mockShardActor);
688
689             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
690
691             LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
692
693             assertTrue("Found path contains " + found.getPath().path().toString(),
694                     found.getPath().path().toString().contains("member-1-shard-default-config"));
695         }};
696     }
697
698     @Test
699     public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
700         new JavaTestKit(getSystem()) {{
701             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
702
703             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
704
705             expectMsgClass(duration("5 seconds"), NotInitializedException.class);
706         }};
707     }
708
709     @Test
710     public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
711         new JavaTestKit(getSystem()) {{
712             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
713
714             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
715
716             // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
717             // delayed until we send ActorInitialized.
718             Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
719                     new Timeout(5, TimeUnit.SECONDS));
720
721             shardManager.tell(new ActorInitialized(), mockShardActor);
722
723             Object resp = Await.result(future, duration("5 seconds"));
724             assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
725         }};
726     }
727
728     @Test
729     public void testOnRecoveryJournalIsCleaned() {
730         InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
731                 ImmutableSet.of("foo")));
732         InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules(
733                 ImmutableSet.of("bar")));
734         InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
735
736         new JavaTestKit(getSystem()) {{
737             TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
738                     Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
739
740             shardManager.underlyingActor().waitForRecoveryComplete();
741             InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
742
743             // Journal entries up to the last one should've been deleted
744             Map<Long, Object> journal = InMemoryJournal.get(shardMgrID);
745             synchronized (journal) {
746                 assertEquals("Journal size", 0, journal.size());
747             }
748         }};
749     }
750
751     @Test
752     public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
753         new JavaTestKit(getSystem()) {
754             {
755                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
756
757                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
758                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
759                         memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
760
761                 verify(ready, never()).countDown();
762
763                 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
764                         Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION));
765
766                 verify(ready, times(1)).countDown();
767
768             }};
769     }
770
771     @Test
772     public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
773         new JavaTestKit(getSystem()) {
774             {
775                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
776
777                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
778                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
779                         memberId, null, RaftState.Follower.name()));
780
781                 verify(ready, never()).countDown();
782
783                 shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
784
785                 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId,
786                         "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
787                         DataStoreVersions.CURRENT_VERSION));
788
789                 verify(ready, times(1)).countDown();
790
791             }};
792     }
793
794     @Test
795     public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
796         new JavaTestKit(getSystem()) {
797             {
798                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
799
800                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
801                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
802                         memberId, null, RaftState.Follower.name()));
803
804                 verify(ready, never()).countDown();
805
806                 shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId,
807                         "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)),
808                         DataStoreVersions.CURRENT_VERSION));
809
810                 shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
811
812                 verify(ready, times(1)).countDown();
813
814             }};
815     }
816
817     @Test
818     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
819         new JavaTestKit(getSystem()) {
820             {
821                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
822
823                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
824                         "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
825
826                 verify(ready, never()).countDown();
827
828             }};
829     }
830
831
832     @Test
833     public void testByDefaultSyncStatusIsFalse() throws Exception{
834         final Props persistentProps = newShardMgrProps();
835         final TestActorRef<ShardManager> shardManager =
836                 TestActorRef.create(getSystem(), persistentProps);
837
838         ShardManager shardManagerActor = shardManager.underlyingActor();
839
840         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
841     }
842
843     @Test
844     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
845         final Props persistentProps = ShardManager.props(
846                 new MockClusterWrapper(),
847                 new MockConfiguration(),
848                 DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
849         final TestActorRef<ShardManager> shardManager =
850                 TestActorRef.create(getSystem(), persistentProps);
851
852         ShardManager shardManagerActor = shardManager.underlyingActor();
853         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
854                 RaftState.Follower.name(), RaftState.Leader.name()));
855
856         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
857     }
858
859     @Test
860     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
861         final Props persistentProps = newShardMgrProps();
862         final TestActorRef<ShardManager> shardManager =
863                 TestActorRef.create(getSystem(), persistentProps);
864
865         ShardManager shardManagerActor = shardManager.underlyingActor();
866         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
867                 RaftState.Follower.name(), RaftState.Candidate.name()));
868
869         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
870
871         // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
872         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
873
874         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
875     }
876
877     @Test
878     public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
879         final Props persistentProps = ShardManager.props(
880                 new MockClusterWrapper(),
881                 new MockConfiguration(),
882                 DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
883         final TestActorRef<ShardManager> shardManager =
884                 TestActorRef.create(getSystem(), persistentProps);
885
886         ShardManager shardManagerActor = shardManager.underlyingActor();
887         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
888                 RaftState.Candidate.name(), RaftState.Follower.name()));
889
890         // Initially will be false
891         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
892
893         // Send status true will make sync status true
894         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
895
896         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
897
898         // Send status false will make sync status false
899         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-default-unknown"));
900
901         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
902
903     }
904
905     @Test
906     public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
907         final Props persistentProps = ShardManager.props(
908                 new MockClusterWrapper(),
909                 new MockConfiguration() {
910                     @Override
911                     public List<String> getMemberShardNames(String memberName) {
912                         return Arrays.asList("default", "astronauts");
913                     }
914                 },
915                 DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
916         final TestActorRef<ShardManager> shardManager =
917                 TestActorRef.create(getSystem(), persistentProps);
918
919         ShardManager shardManagerActor = shardManager.underlyingActor();
920
921         // Initially will be false
922         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
923
924         // Make default shard leader
925         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
926                 RaftState.Follower.name(), RaftState.Leader.name()));
927
928         // default = Leader, astronauts is unknown so sync status remains false
929         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
930
931         // Make astronauts shard leader as well
932         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
933                 RaftState.Follower.name(), RaftState.Leader.name()));
934
935         // Now sync status should be true
936         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
937
938         // Make astronauts a Follower
939         shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
940                 RaftState.Leader.name(), RaftState.Follower.name()));
941
942         // Sync status is not true
943         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
944
945         // Make the astronauts follower sync status true
946         shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-astronauts-unknown"));
947
948         // Sync status is now true
949         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
950
951     }
952
953     @Test
954     public void testOnReceiveSwitchShardBehavior() throws Exception {
955         new JavaTestKit(getSystem()) {{
956             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
957
958             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
959             shardManager.tell(new ActorInitialized(), mockShardActor);
960
961             shardManager.tell(new SwitchShardBehavior(mockShardName, "Leader", 1000), getRef());
962
963             SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor, SwitchBehavior.class);
964
965             assertEquals(RaftState.Leader, switchBehavior.getNewState());
966             assertEquals(1000, switchBehavior.getNewTerm());
967         }};
968     }
969
970     @Test
971     public void testOnReceiveCreateShard() {
972         new JavaTestKit(getSystem()) {{
973             datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
974
975             ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
976                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
977
978             SchemaContext schemaContext = TestModel.createTestContext();
979             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
980
981             DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100).
982                     persistent(false).build();
983             TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator();
984
985             ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
986                     "foo", null, Arrays.asList("member-1", "member-5", "member-6"));
987             shardManager.tell(new CreateShard(config, shardPropsCreator, datastoreContext), getRef());
988
989             expectMsgClass(duration("5 seconds"), CreateShardReply.class);
990
991             shardManager.tell(new FindLocalShard("foo", true), getRef());
992
993             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
994
995             assertEquals("isRecoveryApplicable", false, shardPropsCreator.datastoreContext.isPersistent());
996             assertTrue("Epxected ShardPeerAddressResolver", shardPropsCreator.datastoreContext.getShardRaftConfig().
997                     getPeerAddressResolver() instanceof ShardPeerAddressResolver);
998             assertEquals("peerMembers", Sets.newHashSet(new ShardIdentifier("foo", "member-5", shardMrgIDSuffix).toString(),
999                     new ShardIdentifier("foo", "member-6", shardMrgIDSuffix).toString()),
1000                     shardPropsCreator.peerAddresses.keySet());
1001             assertEquals("ShardIdentifier", new ShardIdentifier("foo", "member-1", shardMrgIDSuffix),
1002                     shardPropsCreator.shardId);
1003             assertSame("schemaContext", schemaContext, shardPropsCreator.schemaContext);
1004
1005             // Send CreateShard with same name - should fail.
1006
1007             shardManager.tell(new CreateShard(config, shardPropsCreator, null), getRef());
1008
1009             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1010         }};
1011     }
1012
1013     @Test
1014     public void testOnReceiveCreateShardWithNoInitialSchemaContext() {
1015         new JavaTestKit(getSystem()) {{
1016             ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
1017                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1018
1019             TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator();
1020
1021             ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
1022                     "foo", null, Arrays.asList("member-1"));
1023             shardManager.tell(new CreateShard(config, shardPropsCreator, null), getRef());
1024
1025             expectMsgClass(duration("5 seconds"), CreateShardReply.class);
1026
1027             SchemaContext schemaContext = TestModel.createTestContext();
1028             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
1029
1030             shardManager.tell(new FindLocalShard("foo", true), getRef());
1031
1032             expectMsgClass(duration("5 seconds"), LocalShardFound.class);
1033
1034             assertSame("schemaContext", schemaContext, shardPropsCreator.schemaContext);
1035             assertNotNull("schemaContext is null", shardPropsCreator.datastoreContext);
1036         }};
1037     }
1038
1039     @Test
1040     public void testAddShardReplicaForNonExistentShard() throws Exception {
1041         new JavaTestKit(getSystem()) {{
1042             ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
1043                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1044
1045             shardManager.tell(new AddShardReplica("model-inventory"), getRef());
1046             Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class);
1047
1048             assertEquals("Failure obtained", true,
1049                           (resp.cause() instanceof IllegalArgumentException));
1050         }};
1051     }
1052
1053     @Test
1054     public void testAddShardReplicaForAlreadyCreatedShard() throws Exception {
1055         new JavaTestKit(getSystem()) {{
1056             ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
1057             shardManager.tell(new AddShardReplica("default"), getRef());
1058             Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class);
1059             assertEquals("Failure obtained", true,
1060                           (resp.cause() instanceof IllegalArgumentException));
1061         }};
1062     }
1063
1064     @Test
1065     public void testAddShardReplica() throws Exception {
1066         MockConfiguration mockConfig =
1067                 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1068                    put("default", Arrays.asList("member-1", "member-2")).
1069                    put("astronauts", Arrays.asList("member-2")).build());
1070
1071         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1072
1073         // Create an ActorSystem ShardManager actor for member-1.
1074         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
1075         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1076         ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1077         final TestActorRef<ForwardingShardManager> newReplicaShardManager = TestActorRef.create(system1,
1078                 newPropsShardMgrWithMockShardActor("shardManager1", mockDefaultShardActor,
1079                    new ClusterWrapperImpl(system1), mockConfig), shardManagerID);
1080
1081         // Create an ActorSystem ShardManager actor for member-2.
1082         final ActorSystem system2 = ActorSystem.create("cluster-test",
1083             ConfigFactory.load().getConfig("Member2"));
1084         Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1085
1086         String name = new ShardIdentifier("astronauts", "member-2", "config").toString();
1087         final TestActorRef<MockRespondActor> mockShardLeaderActor =
1088             TestActorRef.create(system2, Props.create(MockRespondActor.class), name);
1089         final TestActorRef<ForwardingShardManager> leaderShardManager = TestActorRef.create(system2,
1090                 newPropsShardMgrWithMockShardActor("shardManager2", mockShardLeaderActor,
1091                         new ClusterWrapperImpl(system2), mockConfig), shardManagerID);
1092
1093         new JavaTestKit(system1) {{
1094
1095             newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1096             leaderShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1097
1098             leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
1099
1100             String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
1101             short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
1102             leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2,
1103                     Optional.of(mock(DataTree.class)), leaderVersion), mockShardLeaderActor);
1104             leaderShardManager.tell(new RoleChangeNotification(memberId2,
1105                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardLeaderActor);
1106
1107             newReplicaShardManager.underlyingActor().waitForMemberUp();
1108             leaderShardManager.underlyingActor().waitForMemberUp();
1109
1110             //construct a mock response message
1111             AddServerReply response = new AddServerReply(ServerChangeStatus.OK, memberId2);
1112             mockShardLeaderActor.underlyingActor().updateResponse(response);
1113             newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
1114             AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
1115                 AddServer.class);
1116             String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
1117             assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
1118
1119             expectMsgClass(duration("5 seconds"), Status.Success.class);
1120         }};
1121
1122         JavaTestKit.shutdownActorSystem(system1);
1123         JavaTestKit.shutdownActorSystem(system2);
1124     }
1125
1126     @Test
1127     public void testAddShardReplicaWithFindPrimaryTimeout() throws Exception {
1128         MockConfiguration mockConfig =
1129                 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
1130                    put("default", Arrays.asList("member-1", "member-2")).
1131                    put("astronauts", Arrays.asList("member-2")).build());
1132
1133         String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
1134
1135         // Create an ActorSystem ShardManager actor for member-1.
1136         final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
1137         Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
1138         ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
1139         final TestActorRef<ForwardingShardManager> newReplicaShardManager = TestActorRef.create(system1,
1140                 newPropsShardMgrWithMockShardActor("shardManager1", mockDefaultShardActor,
1141                    new ClusterWrapperImpl(system1), mockConfig), shardManagerID);
1142
1143         new JavaTestKit(system1) {{
1144
1145             newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
1146             MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2", getRef().path().toString());
1147             newReplicaShardManager.underlyingActor().waitForMemberUp();
1148
1149             newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
1150             Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class);
1151             assertEquals("Failure obtained", true,
1152                           (resp.cause() instanceof RuntimeException));
1153         }};
1154
1155         JavaTestKit.shutdownActorSystem(system1);
1156     }
1157
1158     @Test
1159     public void testRemoveShardReplicaForNonExistentShard() throws Exception {
1160         new JavaTestKit(getSystem()) {{
1161             ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
1162                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
1163
1164             shardManager.tell(new RemoveShardReplica("model-inventory"), getRef());
1165             Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class);
1166             assertEquals("Failure obtained", true,
1167                          (resp.cause() instanceof IllegalArgumentException));
1168         }};
1169
1170     }
1171
1172     private static class TestShardPropsCreator implements ShardPropsCreator {
1173         ShardIdentifier shardId;
1174         Map<String, String> peerAddresses;
1175         SchemaContext schemaContext;
1176         DatastoreContext datastoreContext;
1177
1178         @Override
1179         public Props newProps(ShardIdentifier shardId, Map<String, String> peerAddresses,
1180                 DatastoreContext datastoreContext, SchemaContext schemaContext) {
1181             this.shardId = shardId;
1182             this.peerAddresses = peerAddresses;
1183             this.schemaContext = schemaContext;
1184             this.datastoreContext = datastoreContext;
1185             return Shard.props(shardId, peerAddresses, datastoreContext, schemaContext);
1186         }
1187
1188     }
1189
1190     private static class TestShardManager extends ShardManager {
1191         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
1192
1193         TestShardManager(String shardMrgIDSuffix) {
1194             super(new MockClusterWrapper(), new MockConfiguration(),
1195                     DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready,
1196                     new PrimaryShardInfoFutureCache());
1197         }
1198
1199         @Override
1200         public void handleRecover(Object message) throws Exception {
1201             try {
1202                 super.handleRecover(message);
1203             } finally {
1204                 if(message instanceof RecoveryCompleted) {
1205                     recoveryComplete.countDown();
1206                 }
1207             }
1208         }
1209
1210         void waitForRecoveryComplete() {
1211             assertEquals("Recovery complete", true,
1212                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
1213         }
1214     }
1215
1216     @SuppressWarnings("serial")
1217     static class TestShardManagerCreator implements Creator<TestShardManager> {
1218         String shardMrgIDSuffix;
1219
1220         TestShardManagerCreator(String shardMrgIDSuffix) {
1221             this.shardMrgIDSuffix = shardMrgIDSuffix;
1222         }
1223
1224         @Override
1225         public TestShardManager create() throws Exception {
1226             return new TestShardManager(shardMrgIDSuffix);
1227         }
1228
1229     }
1230
1231     private static class DelegatingShardManagerCreator implements Creator<ShardManager> {
1232         private static final long serialVersionUID = 1L;
1233         private final Creator<ShardManager> delegate;
1234
1235         public DelegatingShardManagerCreator(Creator<ShardManager> delegate) {
1236             this.delegate = delegate;
1237         }
1238
1239         @Override
1240         public ShardManager create() throws Exception {
1241             return delegate.create();
1242         }
1243     }
1244
1245     private static class ForwardingShardManager extends ShardManager {
1246         private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
1247         private CountDownLatch memberUpReceived = new CountDownLatch(1);
1248         private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
1249         private CountDownLatch memberUnreachableReceived = new CountDownLatch(1);
1250         private CountDownLatch memberReachableReceived = new CountDownLatch(1);
1251         private final ActorRef shardActor;
1252         private final String name;
1253
1254         protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration,
1255                 DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, String name,
1256                 ActorRef shardActor, PrimaryShardInfoFutureCache primaryShardInfoCache) {
1257             super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch, primaryShardInfoCache);
1258             this.shardActor = shardActor;
1259             this.name = name;
1260         }
1261
1262         @Override
1263         public void handleCommand(Object message) throws Exception {
1264             try{
1265                 super.handleCommand(message);
1266             } finally {
1267                 if(message instanceof FindPrimary) {
1268                     findPrimaryMessageReceived.countDown();
1269                 } else if(message instanceof ClusterEvent.MemberUp) {
1270                     String role = ((ClusterEvent.MemberUp)message).member().roles().head();
1271                     if(!getCluster().getCurrentMemberName().equals(role)) {
1272                         memberUpReceived.countDown();
1273                     }
1274                 } else if(message instanceof ClusterEvent.MemberRemoved) {
1275                     String role = ((ClusterEvent.MemberRemoved)message).member().roles().head();
1276                     if(!getCluster().getCurrentMemberName().equals(role)) {
1277                         memberRemovedReceived.countDown();
1278                     }
1279                 } else if(message instanceof ClusterEvent.UnreachableMember) {
1280                     String role = ((ClusterEvent.UnreachableMember)message).member().roles().head();
1281                     if(!getCluster().getCurrentMemberName().equals(role)) {
1282                         memberUnreachableReceived.countDown();
1283                     }
1284                 } else if(message instanceof ClusterEvent.ReachableMember) {
1285                     String role = ((ClusterEvent.ReachableMember)message).member().roles().head();
1286                     if(!getCluster().getCurrentMemberName().equals(role)) {
1287                         memberReachableReceived.countDown();
1288                     }
1289                 }
1290             }
1291         }
1292
1293         @Override
1294         public String persistenceId() {
1295             return name;
1296         }
1297
1298         @Override
1299         protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
1300             return shardActor;
1301         }
1302
1303         void waitForMemberUp() {
1304             assertEquals("MemberUp received", true,
1305                     Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
1306             memberUpReceived = new CountDownLatch(1);
1307         }
1308
1309         void waitForMemberRemoved() {
1310             assertEquals("MemberRemoved received", true,
1311                     Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
1312             memberRemovedReceived = new CountDownLatch(1);
1313         }
1314
1315         void waitForUnreachableMember() {
1316             assertEquals("UnreachableMember received", true,
1317                 Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS
1318                 ));
1319             memberUnreachableReceived = new CountDownLatch(1);
1320         }
1321
1322         void waitForReachableMember() {
1323             assertEquals("ReachableMember received", true,
1324                 Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
1325             memberReachableReceived = new CountDownLatch(1);
1326         }
1327
1328         void verifyFindPrimary() {
1329             assertEquals("FindPrimary received", true,
1330                     Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
1331             findPrimaryMessageReceived = new CountDownLatch(1);
1332         }
1333     }
1334
1335     private static class MockRespondActor extends MessageCollectorActor {
1336
1337         private Object responseMsg;
1338
1339         public void updateResponse(Object response) {
1340             responseMsg = response;
1341         }
1342
1343         @Override
1344         public void onReceive(Object message) throws Exception {
1345             super.onReceive(message);
1346             if (message instanceof AddServer) {
1347                 if (responseMsg != null) {
1348                     getSender().tell(responseMsg, getSelf());
1349                     responseMsg = null;
1350                 }
1351             }
1352         }
1353     }
1354 }