*/
package org.opendaylight.controller.cluster.datastore.shardmanager;
+import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyString;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.AddressFromURIString;
+import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Status;
import akka.actor.Status.Failure;
import akka.testkit.TestActorRef;
import akka.testkit.javadsl.TestKit;
import akka.util.Timeout;
-import com.google.common.base.Function;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
-import java.net.URI;
+import java.time.Duration;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+import java.util.function.Function;
import java.util.stream.Collectors;
+import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
-import org.opendaylight.controller.cluster.datastore.AbstractShardManagerTest;
+import org.opendaylight.controller.cluster.datastore.AbstractClusterRefActorTest;
import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
import org.opendaylight.controller.cluster.datastore.Shard;
+import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
-import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
-import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.yangtools.yang.common.XMLNamespace;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
-public class ShardManagerTest extends AbstractShardManagerTest {
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class ShardManagerTest extends AbstractClusterRefActorTest {
private static final Logger LOG = LoggerFactory.getLogger(ShardManagerTest.class);
+ private static final MemberName MEMBER_1 = MemberName.forName("member-1");
private static final MemberName MEMBER_2 = MemberName.forName("member-2");
private static final MemberName MEMBER_3 = MemberName.forName("member-3");
- private static SchemaContext TEST_SCHEMA_CONTEXT;
+ private static int ID_COUNTER = 1;
+ private static ActorRef mockShardActor;
+ private static ShardIdentifier mockShardName;
+ private static SettableFuture<Void> ready;
+ private static EffectiveModelContext TEST_SCHEMA_CONTEXT;
+
+ private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
+ private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
+ private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
+ .dataStoreName(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
+ .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
TEST_SCHEMA_CONTEXT = null;
}
+ @Before
+ public void setUp() {
+ ready = SettableFuture.create();
+
+ InMemoryJournal.clear();
+ InMemorySnapshotStore.clear();
+
+ if (mockShardActor == null) {
+ mockShardName = ShardIdentifier.create(Shard.DEFAULT_NAME, MEMBER_1, "config");
+ mockShardActor = getSystem().actorOf(MessageCollectorActor.props(), mockShardName.toString());
+ }
+
+ MessageCollectorActor.clearMessages(mockShardActor);
+ }
+
+ @After
+ public void tearDown() {
+ InMemoryJournal.clear();
+ InMemorySnapshotStore.clear();
+
+ mockShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ await().atMost(Duration.ofSeconds(10)).until(mockShardActor::isTerminated);
+ mockShardActor = null;
+
+ actorFactory.close();
+ }
+
+ private TestShardManager.Builder newTestShardMgrBuilder() {
+ return TestShardManager.builder(datastoreContextBuilder).distributedDataStore(mock(DistributedDataStore.class));
+ }
+
+ private TestShardManager.Builder newTestShardMgrBuilder(final Configuration config) {
+ return TestShardManager.builder(datastoreContextBuilder).configuration(config)
+ .distributedDataStore(mock(DistributedDataStore.class));
+ }
+
+ private Props newShardMgrProps() {
+ return newShardMgrProps(new MockConfiguration());
+ }
+
+ private Props newShardMgrProps(final Configuration config) {
+ return newTestShardMgrBuilder(config).readinessFuture(ready).props();
+ }
+
private ActorSystem newActorSystem(final String config) {
return newActorSystem("cluster-test", config);
}
return system.actorOf(MessageCollectorActor.props(), name);
}
- private Props newShardMgrProps() {
- return newShardMgrProps(new MockConfiguration());
- }
-
private static DatastoreContextFactory newDatastoreContextFactory(final DatastoreContext datastoreContext) {
DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class);
doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext();
}
}
- final Creator<ShardManager> creator = new Creator<ShardManager>() {
+ final Creator<ShardManager> creator = new Creator<>() {
private static final long serialVersionUID = 1L;
@Override
public ShardManager create() {
final TestKit kit = new TestKit(getSystem());
- final ActorRef shardManager = actorFactory.createActor(Props.create(
+ final ActorRef shardManager = actorFactory.createActor(Props.create(ShardManager.class,
new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()));
shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
- assertEquals("Shard actors created", true, newShardActorLatch.await(5, TimeUnit.SECONDS));
+ assertTrue("Shard actors created", newShardActorLatch.await(5, TimeUnit.SECONDS));
assertEquals("getShardElectionTimeoutFactor", 6,
shardInfoMap.get("default").getValue().getShardElectionTimeoutFactor());
assertEquals("getShardElectionTimeoutFactor", 7,
shardManager.tell(new FindPrimary("non-existent", false), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), PrimaryNotFoundException.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), PrimaryNotFoundException.class);
}
@Test
shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
- LocalPrimaryShardFound primaryFound = kit.expectMsgClass(kit.duration("5 seconds"),
+ LocalPrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5),
LocalPrimaryShardFound.class);
assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
primaryFound.getPrimaryPath().contains("member-1-shard-default"));
shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), NoShardLeaderException.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp ending");
}
shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
- RemotePrimaryShardFound primaryFound = kit.expectMsgClass(kit.duration("5 seconds"),
- RemotePrimaryShardFound.class);
+ RemotePrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
primaryFound.getPrimaryPath().contains("member-2-shard-default"));
assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
}
@Test
shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), NoShardLeaderException.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
}
@Test
shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), NoShardLeaderException.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
DataTree mockDataTree = mock(DataTree.class);
shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());
- LocalPrimaryShardFound primaryFound = kit.expectMsgClass(kit.duration("5 seconds"),
+ LocalPrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5),
LocalPrimaryShardFound.class);
assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
primaryFound.getPrimaryPath().contains("member-1-shard-default"));
// RoleChangeNotification.
shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
- kit.expectNoMessage(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
+ kit.expectNoMessage(Duration.ofMillis(150));
shardManager.tell(new ActorInitialized(), mockShardActor);
- kit.expectNoMessage(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
+ kit.expectNoMessage(Duration.ofMillis(150));
String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
shardManager.tell(
new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()),
mockShardActor);
- kit.expectNoMessage(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
+ kit.expectNoMessage(Duration.ofMillis(150));
DataTree mockDataTree = mock(DataTree.class);
shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree,
DataStoreVersions.CURRENT_VERSION), mockShardActor);
- LocalPrimaryShardFound primaryFound = kit.expectMsgClass(kit.duration("5 seconds"),
- LocalPrimaryShardFound.class);
+ LocalPrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
primaryFound.getPrimaryPath().contains("member-1-shard-default"));
assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree());
- kit.expectNoMessage(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
+ kit.expectNoMessage(Duration.ofMillis(200));
LOG.info("testOnReceiveFindPrimaryWaitForShardLeader ending");
}
shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
- kit.expectMsgClass(kit.duration("2 seconds"), NotInitializedException.class);
+ kit.expectMsgClass(Duration.ofSeconds(2), NotInitializedException.class);
shardManager.tell(new ActorInitialized(), mockShardActor);
- kit.expectNoMessage(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
+ kit.expectNoMessage(Duration.ofMillis(200));
LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard ending");
}
shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
- kit.expectMsgClass(kit.duration("2 seconds"), NoShardLeaderException.class);
+ kit.expectMsgClass(Duration.ofSeconds(2), NoShardLeaderException.class);
LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard ending");
}
shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true),kit. getRef());
- kit.expectMsgClass(kit.duration("2 seconds"), NoShardLeaderException.class);
+ kit.expectMsgClass(Duration.ofSeconds(2), NoShardLeaderException.class);
LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard ending");
}
shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef());
- kit.expectMsgClass(kit.duration("2 seconds"), NoShardLeaderException.class);
+ kit.expectMsgClass(Duration.ofSeconds(2), NoShardLeaderException.class);
LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard ending");
}
shardManager1.underlyingActor().waitForMemberUp();
shardManager1.tell(new FindPrimary("astronauts", false), kit.getRef());
- RemotePrimaryShardFound found = kit.expectMsgClass(kit.duration("5 seconds"), RemotePrimaryShardFound.class);
+ RemotePrimaryShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
String path = found.getPrimaryPath();
assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
//
// shardManager1.tell(new FindPrimary("astronauts", false), getRef());
//
-// expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
+// expectMsgClass(Duration.ofSeconds(5), PrimaryNotFoundException.class);
LOG.info("testOnReceiveFindPrimaryForRemoteShard ending");
}
shardManager1.tell(new FindPrimary("default", true), kit.getRef());
- RemotePrimaryShardFound found = kit.expectMsgClass(kit.duration("5 seconds"), RemotePrimaryShardFound.class);
+ RemotePrimaryShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
String path = found.getPrimaryPath();
assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
kit.getRef());
shardManager1.underlyingActor().waitForUnreachableMember();
-
- PeerDown peerDown = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
- assertEquals("getMemberName", MEMBER_2, peerDown.getMemberName());
MessageCollectorActor.clearMessages(mockShardActor1);
shardManager1.tell(MockClusterWrapper.createMemberRemoved("member-2", "akka://cluster-test@127.0.0.1:2558"),
kit.getRef());
- MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
-
shardManager1.tell(new FindPrimary("default", true), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), NoShardLeaderException.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
shardManager1.tell(MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"),
kit.getRef());
shardManager1.underlyingActor().waitForReachableMember();
- PeerUp peerUp = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
- assertEquals("getMemberName", MEMBER_2, peerUp.getMemberName());
- MessageCollectorActor.clearMessages(mockShardActor1);
-
shardManager1.tell(new FindPrimary("default", true), kit.getRef());
- RemotePrimaryShardFound found1 = kit.expectMsgClass(kit.duration("5 seconds"), RemotePrimaryShardFound.class);
+ RemotePrimaryShardFound found1 = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
String path1 = found1.getPrimaryPath();
assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
shardManager1.tell(MockClusterWrapper.createMemberUp("member-2", "akka://cluster-test@127.0.0.1:2558"),
kit.getRef());
- MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
-
// Test FindPrimary wait succeeds after reachable member event.
shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
shardManager1.tell(
MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"), kit.getRef());
- RemotePrimaryShardFound found2 = kit.expectMsgClass(kit.duration("5 seconds"), RemotePrimaryShardFound.class);
+ RemotePrimaryShardFound found2 = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
String path2 = found2.getPrimaryPath();
assertTrue("Unexpected primary path " + path2, path2.contains("member-2-shard-default-config"));
shardManager1.tell(new FindPrimary("default", true), kit.getRef());
- RemotePrimaryShardFound found = kit.expectMsgClass(kit.duration("5 seconds"), RemotePrimaryShardFound.class);
+ RemotePrimaryShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class);
String path = found.getPrimaryPath();
assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
shardManager1.tell(new FindPrimary("default", true), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), NoShardLeaderException.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class);
assertNull("Expected primaryShardInfoCache entry removed",
primaryShardInfoCache.getIfPresent("default"));
shardManager1.tell(new FindPrimary("default", true), kit.getRef());
- LocalPrimaryShardFound found1 = kit.expectMsgClass(kit.duration("5 seconds"), LocalPrimaryShardFound.class);
+ LocalPrimaryShardFound found1 = kit.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
String path1 = found1.getPrimaryPath();
assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config"));
shardManager256.tell(new FindPrimary("default", true), kit256.getRef());
- LocalPrimaryShardFound found = kit256.expectMsgClass(kit256.duration("5 seconds"),
- LocalPrimaryShardFound.class);
+ LocalPrimaryShardFound found = kit256.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
String path = found.getPrimaryPath();
assertTrue("Unexpected primary path " + path + " which must on member-256",
path.contains("member-256-shard-default-config"));
// Make sure leader shard on member-256 is still leader and still in the cache.
shardManager256.tell(new FindPrimary("default", true), kit256.getRef());
- found = kit256.expectMsgClass(kit256.duration("5 seconds"), LocalPrimaryShardFound.class);
+ found = kit256.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class);
path = found.getPrimaryPath();
assertTrue("Unexpected primary path " + path + " which must still not on member-256",
path.contains("member-256-shard-default-config"));
shardManager.tell(new FindLocalShard("non-existent", false), kit.getRef());
- LocalShardNotFound notFound = kit.expectMsgClass(kit.duration("5 seconds"), LocalShardNotFound.class);
+ LocalShardNotFound notFound = kit.expectMsgClass(Duration.ofSeconds(5), LocalShardNotFound.class);
assertEquals("getShardName", "non-existent", notFound.getShardName());
}
shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
- LocalShardFound found = kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
+ LocalShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
assertTrue("Found path contains " + found.getPath().path().toString(),
found.getPath().path().toString().contains("member-1-shard-default-config"));
shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
}
@Test
TestShardManager shardManager = newTestShardManager();
String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
- shardManager.onReceiveCommand(new RoleChangeNotification(
+ shardManager.handleCommand(new RoleChangeNotification(
memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
+ assertFalse(ready.isDone());
- verify(ready, never()).countDown();
-
- shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
+ shardManager.handleCommand(new ShardLeaderStateChanged(memberId, memberId,
mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
-
- verify(ready, times(1)).countDown();
+ assertTrue(ready.isDone());
}
@Test
TestShardManager shardManager = newTestShardManager();
String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
- shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
-
- verify(ready, never()).countDown();
+ shardManager.handleCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
+ assertFalse(ready.isDone());
- shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
+ shardManager.handleCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
- shardManager.onReceiveCommand(
+ shardManager.handleCommand(
new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
-
- verify(ready, times(1)).countDown();
+ assertTrue(ready.isDone());
}
@Test
TestShardManager shardManager = newTestShardManager();
String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
- shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
-
- verify(ready, never()).countDown();
+ shardManager.handleCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name()));
+ assertFalse(ready.isDone());
- shardManager.onReceiveCommand(
+ shardManager.handleCommand(
new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix,
mock(DataTree.class), DataStoreVersions.CURRENT_VERSION));
- shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
-
- verify(ready, times(1)).countDown();
+ shardManager.handleCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString()));
+ assertTrue(ready.isDone());
}
@Test
public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
TestShardManager shardManager = newTestShardManager();
- shardManager.onReceiveCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(),
+ shardManager.handleCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(),
RaftState.Leader.name()));
-
- verify(ready, never()).countDown();
+ assertFalse(ready.isDone());
}
@Test
public void testByDefaultSyncStatusIsFalse() {
TestShardManager shardManager = newTestShardManager();
- assertEquals(false, shardManager.getMBean().getSyncStatus());
+ assertFalse(shardManager.getMBean().getSyncStatus());
}
@Test
public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception {
TestShardManager shardManager = newTestShardManager();
- shardManager.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
+ shardManager.handleCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
RaftState.Follower.name(), RaftState.Leader.name()));
- assertEquals(true, shardManager.getMBean().getSyncStatus());
+ assertTrue(shardManager.getMBean().getSyncStatus());
}
@Test
TestShardManager shardManager = newTestShardManager();
String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
- shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
+ shardManager.handleCommand(new RoleChangeNotification(shardId,
RaftState.Follower.name(), RaftState.Candidate.name()));
- assertEquals(false, shardManager.getMBean().getSyncStatus());
+ assertFalse(shardManager.getMBean().getSyncStatus());
// Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
- shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(
+ shardManager.handleCommand(new FollowerInitialSyncUpStatus(
true, shardId));
- assertEquals(false, shardManager.getMBean().getSyncStatus());
+ assertFalse(shardManager.getMBean().getSyncStatus());
}
@Test
TestShardManager shardManager = newTestShardManager();
String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
- shardManager.onReceiveCommand(new RoleChangeNotification(shardId,
+ shardManager.handleCommand(new RoleChangeNotification(shardId,
RaftState.Candidate.name(), RaftState.Follower.name()));
// Initially will be false
- assertEquals(false, shardManager.getMBean().getSyncStatus());
+ assertFalse(shardManager.getMBean().getSyncStatus());
// Send status true will make sync status true
- shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, shardId));
+ shardManager.handleCommand(new FollowerInitialSyncUpStatus(true, shardId));
- assertEquals(true, shardManager.getMBean().getSyncStatus());
+ assertTrue(shardManager.getMBean().getSyncStatus());
// Send status false will make sync status false
- shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(false, shardId));
+ shardManager.handleCommand(new FollowerInitialSyncUpStatus(false, shardId));
- assertEquals(false, shardManager.getMBean().getSyncStatus());
+ assertFalse(shardManager.getMBean().getSyncStatus());
}
@Test
}));
// Initially will be false
- assertEquals(false, shardManager.getMBean().getSyncStatus());
+ assertFalse(shardManager.getMBean().getSyncStatus());
// Make default shard leader
String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix;
- shardManager.onReceiveCommand(new RoleChangeNotification(defaultShardId,
+ shardManager.handleCommand(new RoleChangeNotification(defaultShardId,
RaftState.Follower.name(), RaftState.Leader.name()));
// default = Leader, astronauts is unknown so sync status remains false
- assertEquals(false, shardManager.getMBean().getSyncStatus());
+ assertFalse(shardManager.getMBean().getSyncStatus());
// Make astronauts shard leader as well
String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
- shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
+ shardManager.handleCommand(new RoleChangeNotification(astronautsShardId,
RaftState.Follower.name(), RaftState.Leader.name()));
// Now sync status should be true
- assertEquals(true, shardManager.getMBean().getSyncStatus());
+ assertTrue(shardManager.getMBean().getSyncStatus());
// Make astronauts a Follower
- shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
+ shardManager.handleCommand(new RoleChangeNotification(astronautsShardId,
RaftState.Leader.name(), RaftState.Follower.name()));
// Sync status is not true
- assertEquals(false, shardManager.getMBean().getSyncStatus());
+ assertFalse(shardManager.getMBean().getSyncStatus());
// Make the astronauts follower sync status true
- shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
+ shardManager.handleCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
// Sync status is now true
- assertEquals(true, shardManager.getMBean().getSyncStatus());
+ assertTrue(shardManager.getMBean().getSyncStatus());
LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards ending");
}
.createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
.withDispatcher(Dispatchers.DefaultDispatcherId()));
- SchemaContext schemaContext = TEST_SCHEMA_CONTEXT;
+ EffectiveModelContext schemaContext = TEST_SCHEMA_CONTEXT;
shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100)
.persistent(false).build();
Shard.Builder shardBuilder = Shard.builder();
- ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
+ ModuleShardConfiguration config = new ModuleShardConfiguration(XMLNamespace.of("foo-ns"), "foo-module",
"foo", null, members("member-1", "member-5", "member-6"));
shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), Success.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
- assertEquals("isRecoveryApplicable", false, shardBuilder.getDatastoreContext().isPersistent());
+ assertFalse("isRecoveryApplicable", shardBuilder.getDatastoreContext().isPersistent());
assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig()
.getPeerAddressResolver() instanceof ShardPeerAddressResolver);
assertEquals("peerMembers", Sets.newHashSet(
shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
- Success success = kit.expectMsgClass(kit.duration("5 seconds"), Success.class);
+ Success success = kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
assertNotNull("Success status is null", success.status());
LOG.info("testOnCreateShard ending");
shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), ActorRef.noSender());
Shard.Builder shardBuilder = Shard.builder();
- ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
+ ModuleShardConfiguration config = new ModuleShardConfiguration(XMLNamespace.of("foo-ns"), "foo-module",
"foo", null, members("member-5", "member-6"));
shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), Success.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
assertEquals("peerMembers size", 0, shardBuilder.getPeerAddresses().size());
assertEquals("schemaContext", DisableElectionsRaftPolicy.class.getName(), shardBuilder
Shard.Builder shardBuilder = Shard.builder();
- ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
+ ModuleShardConfiguration config = new ModuleShardConfiguration(XMLNamespace.of("foo-ns"), "foo-module",
"foo", null, members("member-1"));
shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), Success.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
- SchemaContext schemaContext = TEST_SCHEMA_CONTEXT;
+ EffectiveModelContext schemaContext = TEST_SCHEMA_CONTEXT;
shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
shardManager.tell(new FindLocalShard("foo", true), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
assertNotNull("schemaContext is null", shardBuilder.getDatastoreContext());
assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
assertNull("Expected null ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
- Function<ShardSnapshot, String> shardNameTransformer = ShardSnapshot::getName;
-
assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), Sets.newHashSet(
- datastoreSnapshot.getShardSnapshots().stream().map(shardNameTransformer).collect(Collectors.toSet())));
+ datastoreSnapshot.getShardSnapshots().stream().map(ShardSnapshot::getName).collect(Collectors.toSet())));
// Add a new replica
datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot");
assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"), Sets.newHashSet(
- Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer)));
+ Lists.transform(datastoreSnapshot.getShardSnapshots(), ShardSnapshot::getName)));
ShardManagerSnapshot snapshot = datastoreSnapshot.getShardManagerSnapshot();
assertNotNull("Expected ShardManagerSnapshot", snapshot);
.put("astronauts", Collections.<String>emptyList()).build());
ShardManagerSnapshot snapshot =
- new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts"), Collections.emptyMap());
+ new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts"));
DatastoreSnapshot restoreFromSnapshot = new DatastoreSnapshot(shardMrgIDSuffix, snapshot,
Collections.<ShardSnapshot>emptyList());
TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(mockConfig)
.withDispatcher(Dispatchers.DefaultDispatcherId()));
shardManager.tell(new AddShardReplica("model-inventory"), kit.getRef());
- Status.Failure resp = kit.expectMsgClass(kit.duration("2 seconds"), Status.Failure.class);
+ Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(2), Status.Failure.class);
- assertEquals("Failure obtained", true, resp.cause() instanceof IllegalArgumentException);
+ assertTrue("Failure obtained", resp.cause() instanceof IllegalArgumentException);
}
@Test
// persisted.
String[] restoredShards = { "default", "people" };
ShardManagerSnapshot snapshot =
- new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap());
+ new ShardManagerSnapshot(Arrays.asList(restoredShards));
InMemorySnapshotStore.addSnapshot(shardManagerID, snapshot);
Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS);
AddServer.class);
String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
- kit.expectMsgClass(kit.duration("5 seconds"), Status.Success.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), Status.Success.class);
InMemorySnapshotStore.waitForSavedSnapshot(shardManagerID, ShardManagerSnapshot.class);
InMemorySnapshotStore.waitForDeletedSnapshot(shardManagerID);
MessageCollectorActor.expectFirstMatching(leaderShardActor, AddServer.class);
- Failure resp = kit.expectMsgClass(kit.duration("5 seconds"), Failure.class);
+ Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
// Send message again to verify previous in progress state is
// cleared
shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
- resp = kit.expectMsgClass(kit.duration("5 seconds"), Failure.class);
+ resp = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
// Send message again with an AddServer timeout to verify the
datastoreContextBuilder.shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build()), kit.getRef());
leaderShardActor.tell(MockRespondActor.CLEAR_RESPONSE, ActorRef.noSender());
shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), Failure.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader ending");
}
mockShardActor);
shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef());
- Failure resp = kit.expectMsgClass(kit.duration("5 seconds"), Failure.class);
+ Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class);
LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader ending");
}
addServerMsg.getNewServerId());
mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.TIMEOUT, null));
- Failure failure = kit.expectMsgClass(kit.duration("5 seconds"), Failure.class);
+ Failure failure = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
assertEquals("Failure cause", TimeoutException.class, failure.cause().getClass());
shardManager.tell(new FindLocalShard("astronauts", false), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), LocalShardNotFound.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), LocalShardNotFound.class);
terminateWatcher.expectTerminated(mockNewReplicaShardActor);
shardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
mockShardLeaderKit.expectMsgClass(AddServer.class);
mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.NO_LEADER, null));
- failure = kit.expectMsgClass(kit.duration("5 seconds"), Failure.class);
+ failure = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass());
LOG.info("testAddShardReplicaWithAddServerReplyFailure ending");
AddressFromURIString.parse("akka://non-existent@127.0.0.1:5").toString());
newReplicaShardManager.tell(new AddShardReplica("astronauts"), kit.getRef());
- Status.Failure resp = kit.expectMsgClass(kit.duration("5 seconds"), Status.Failure.class);
- assertEquals("Failure obtained", true, resp.cause() instanceof RuntimeException);
+ Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Status.Failure.class);
+ assertTrue("Failure obtained", resp.cause() instanceof RuntimeException);
LOG.info("testAddShardReplicaWithFindPrimaryTimeout ending");
}
.withDispatcher(Dispatchers.DefaultDispatcherId()));
shardManager.tell(new RemoveShardReplica("model-inventory", MEMBER_1), kit.getRef());
- Status.Failure resp = kit.expectMsgClass(kit.duration("10 seconds"), Status.Failure.class);
- assertEquals("Failure obtained", true, resp.cause() instanceof PrimaryNotFoundException);
+ Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(10), Status.Failure.class);
+ assertTrue("Failure obtained", resp.cause() instanceof PrimaryNotFoundException);
}
@Test
RemoveServer.class);
assertEquals(ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString(),
removeServer.getServerId());
- kit.expectMsgClass(kit.duration("5 seconds"), Success.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
}
@Test
RemoveServer.class);
String removeServerId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
assertEquals("RemoveServer serverId", removeServerId, removeServer.getServerId());
- kit.expectMsgClass(kit.duration("5 seconds"), Status.Success.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), Status.Success.class);
}
@Test
shardManager.tell(secondServerChange, secondRequestKit.getRef());
- secondRequestKit.expectMsgClass(secondRequestKit.duration("5 seconds"), Failure.class);
+ secondRequestKit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
}
@Test
shardManager.underlyingActor().waitForRecoveryComplete();
shardManager.tell(new FindLocalShard("people", false), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
shardManager.tell(new FindLocalShard("default", false), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
// Removed the default shard replica from member-1
ShardIdentifier.Builder builder = new ShardIdentifier.Builder();
.put("people", Arrays.asList("member-1", "member-2")).build());
String[] restoredShards = {"default", "astronauts"};
ShardManagerSnapshot snapshot =
- new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap());
+ new ShardManagerSnapshot(Arrays.asList(restoredShards));
InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot);
// create shardManager to come up with restored data
newRestoredShardManager.underlyingActor().waitForRecoveryComplete();
newRestoredShardManager.tell(new FindLocalShard("people", false), kit.getRef());
- LocalShardNotFound notFound = kit.expectMsgClass(kit.duration("5 seconds"), LocalShardNotFound.class);
+ LocalShardNotFound notFound = kit.expectMsgClass(Duration.ofSeconds(5), LocalShardNotFound.class);
assertEquals("for uninitialized shard", "people", notFound.getShardName());
// Verify a local shard is created for the restored shards,
// as the actor initialization
// message is not sent for them
newRestoredShardManager.tell(new FindLocalShard("default", false), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
newRestoredShardManager.tell(new FindLocalShard("astronauts", false), kit.getRef());
- kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class);
LOG.info("testShardPersistenceWithRestoredData ending");
}
.create("default", MemberName.forName("member-2"), shardMrgIDSuffix).toString(),
Boolean.TRUE));
- kit.expectMsgClass(kit.duration("5 seconds"), Success.class);
+ kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
}
@Test
MessageCollectorActor.expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
- Status.Failure resp = kit.expectMsgClass(kit.duration("5 seconds"), Status.Failure.class);
- assertEquals("Failure resposnse", true, resp.cause() instanceof NoShardLeaderException);
+ Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Status.Failure.class);
+ assertTrue("Failure resposnse", resp.cause() instanceof NoShardLeaderException);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testRegisterForShardLeaderChanges() {
+ LOG.info("testRegisterForShardLeaderChanges starting");
+
+ final String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
+ final String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
+ final TestKit kit = new TestKit(getSystem());
+ final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
+
+ shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef());
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+
+ final Consumer<String> mockCallback = mock(Consumer.class);
+ shardManager.tell(new RegisterForShardAvailabilityChanges(mockCallback), kit.getRef());
+
+ final Success reply = kit.expectMsgClass(Duration.ofSeconds(5), Success.class);
+ final Registration reg = (Registration) reply.status();
+
+ final DataTree mockDataTree = mock(DataTree.class);
+ shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
+ DataStoreVersions.CURRENT_VERSION), mockShardActor);
+
+ verify(mockCallback, timeout(5000)).accept("default");
+
+ reset(mockCallback);
+ shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
+ DataStoreVersions.CURRENT_VERSION), mockShardActor);
+
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ verifyNoMoreInteractions(mockCallback);
+
+ shardManager.tell(new ShardLeaderStateChanged(memberId1, null, mockDataTree,
+ DataStoreVersions.CURRENT_VERSION), mockShardActor);
+
+ verify(mockCallback, timeout(5000)).accept("default");
+
+ reset(mockCallback);
+ shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, mockDataTree,
+ DataStoreVersions.CURRENT_VERSION), mockShardActor);
+
+ verify(mockCallback, timeout(5000)).accept("default");
+
+ reset(mockCallback);
+ reg.close();
+
+ shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
+ DataStoreVersions.CURRENT_VERSION), mockShardActor);
+
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ verifyNoMoreInteractions(mockCallback);
+
+ LOG.info("testRegisterForShardLeaderChanges ending");
}
public static class TestShardManager extends ShardManager {
}
void waitForRecoveryComplete() {
- assertEquals("Recovery complete", true,
+ assertTrue("Recovery complete",
Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
}
public void waitForMemberUp() {
- assertEquals("MemberUp received", true,
+ assertTrue("MemberUp received",
Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
memberUpReceived = new CountDownLatch(1);
}
void waitForMemberRemoved() {
- assertEquals("MemberRemoved received", true,
+ assertTrue("MemberRemoved received",
Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
memberRemovedReceived = new CountDownLatch(1);
}
void waitForUnreachableMember() {
- assertEquals("UnreachableMember received", true,
- Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS
- ));
+ assertTrue("UnreachableMember received",
+ Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS));
memberUnreachableReceived = new CountDownLatch(1);
}
void waitForReachableMember() {
- assertEquals("ReachableMember received", true,
+ assertTrue("ReachableMember received",
Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS));
memberReachableReceived = new CountDownLatch(1);
}
void verifyFindPrimary() {
- assertEquals("FindPrimary received", true,
+ assertTrue("FindPrimary received",
Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
findPrimaryMessageReceived = new CountDownLatch(1);
}
}
Builder shardActor(final ActorRef newShardActor) {
- this.shardActor = newShardActor;
+ shardActor = newShardActor;
return this;
}
}
void verifySnapshotPersisted(final Set<String> shardList) {
- assertEquals("saveSnapshot invoked", true,
+ assertTrue("saveSnapshot invoked",
Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS));
assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList()));
}
AbstractGenericCreator(final Class<C> shardManagerClass) {
this.shardManagerClass = shardManagerClass;
- cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).waitTillReadyCountDownLatch(ready)
+ cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).readinessFuture(ready)
.primaryShardInfoCache(new PrimaryShardInfoFutureCache());
}