X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fshardmanager%2FShardManagerTest.java;h=c8c3f6670bc64c5ea54201575c70c08f062385f5;hp=4b2ce82cd51d081d49f82842dc06f932f7cef546;hb=e31fcae52c0255058a583890ddea320e6673e0b0;hpb=bf887b0ecebf65746684691a0cd4d448ad8606f1 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java index 4b2ce82cd5..c8c3f6670b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java @@ -7,22 +7,26 @@ */ package org.opendaylight.controller.cluster.datastore.shardmanager; +import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.anyString; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.AddressFromURIString; +import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.Status; import akka.actor.Status.Failure; @@ -39,13 +43,13 @@ import akka.serialization.Serialization; import akka.testkit.TestActorRef; import akka.testkit.javadsl.TestKit; import akka.util.Timeout; -import com.google.common.base.Function; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.Uninterruptibles; -import java.net.URI; +import java.time.Duration; import java.util.AbstractMap; import java.util.Arrays; import java.util.Collection; @@ -58,18 +62,25 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; +import java.util.function.Function; import java.util.stream.Collectors; +import org.junit.After; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; import org.opendaylight.controller.cluster.access.concepts.MemberName; -import org.opendaylight.controller.cluster.datastore.AbstractShardManagerTest; +import org.opendaylight.controller.cluster.datastore.AbstractClusterRefActorTest; import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl; import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory; import org.opendaylight.controller.cluster.datastore.DistributedDataStore; import org.opendaylight.controller.cluster.datastore.Shard; +import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl; import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider; import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration; @@ -88,8 +99,6 @@ import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; -import org.opendaylight.controller.cluster.datastore.messages.PeerDown; -import org.opendaylight.controller.cluster.datastore.messages.PeerUp; import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica; @@ -106,6 +115,7 @@ import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.TestActorFactory; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; @@ -119,23 +129,38 @@ import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply; import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; +import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.yangtools.concepts.Registration; +import org.opendaylight.yangtools.yang.common.XMLNamespace; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; -public class ShardManagerTest extends AbstractShardManagerTest { +@RunWith(MockitoJUnitRunner.StrictStubs.class) +public class ShardManagerTest extends AbstractClusterRefActorTest { private static final Logger LOG = LoggerFactory.getLogger(ShardManagerTest.class); + private static final MemberName MEMBER_1 = MemberName.forName("member-1"); private static final MemberName MEMBER_2 = MemberName.forName("member-2"); private static final MemberName MEMBER_3 = MemberName.forName("member-3"); - private static SchemaContext TEST_SCHEMA_CONTEXT; + private static int ID_COUNTER = 1; + private static ActorRef mockShardActor; + private static ShardIdentifier mockShardName; + private static SettableFuture ready; + private static EffectiveModelContext TEST_SCHEMA_CONTEXT; + + private final String shardMrgIDSuffix = "config" + ID_COUNTER++; + private final TestActorFactory actorFactory = new TestActorFactory(getSystem()); + private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder() + .dataStoreName(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS) + .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6); private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); @@ -149,6 +174,50 @@ public class ShardManagerTest extends AbstractShardManagerTest { TEST_SCHEMA_CONTEXT = null; } + @Before + public void setUp() { + ready = SettableFuture.create(); + + InMemoryJournal.clear(); + InMemorySnapshotStore.clear(); + + if (mockShardActor == null) { + mockShardName = ShardIdentifier.create(Shard.DEFAULT_NAME, MEMBER_1, "config"); + mockShardActor = getSystem().actorOf(MessageCollectorActor.props(), mockShardName.toString()); + } + + MessageCollectorActor.clearMessages(mockShardActor); + } + + @After + public void tearDown() { + InMemoryJournal.clear(); + InMemorySnapshotStore.clear(); + + mockShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); + await().atMost(Duration.ofSeconds(10)).until(mockShardActor::isTerminated); + mockShardActor = null; + + actorFactory.close(); + } + + private TestShardManager.Builder newTestShardMgrBuilder() { + return TestShardManager.builder(datastoreContextBuilder).distributedDataStore(mock(DistributedDataStore.class)); + } + + private TestShardManager.Builder newTestShardMgrBuilder(final Configuration config) { + return TestShardManager.builder(datastoreContextBuilder).configuration(config) + .distributedDataStore(mock(DistributedDataStore.class)); + } + + private Props newShardMgrProps() { + return newShardMgrProps(new MockConfiguration()); + } + + private Props newShardMgrProps(final Configuration config) { + return newTestShardMgrBuilder(config).readinessFuture(ready).props(); + } + private ActorSystem newActorSystem(final String config) { return newActorSystem("cluster-test", config); } @@ -162,10 +231,6 @@ public class ShardManagerTest extends AbstractShardManagerTest { return system.actorOf(MessageCollectorActor.props(), name); } - private Props newShardMgrProps() { - return newShardMgrProps(new MockConfiguration()); - } - private static DatastoreContextFactory newDatastoreContextFactory(final DatastoreContext datastoreContext) { DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class); doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext(); @@ -291,7 +356,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { } } - final Creator creator = new Creator() { + final Creator creator = new Creator<>() { private static final long serialVersionUID = 1L; @Override public ShardManager create() { @@ -303,12 +368,12 @@ public class ShardManagerTest extends AbstractShardManagerTest { final TestKit kit = new TestKit(getSystem()); - final ActorRef shardManager = actorFactory.createActor(Props.create( + final ActorRef shardManager = actorFactory.createActor(Props.create(ShardManager.class, new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId())); shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); - assertEquals("Shard actors created", true, newShardActorLatch.await(5, TimeUnit.SECONDS)); + assertTrue("Shard actors created", newShardActorLatch.await(5, TimeUnit.SECONDS)); assertEquals("getShardElectionTimeoutFactor", 6, shardInfoMap.get("default").getValue().getShardElectionTimeoutFactor()); assertEquals("getShardElectionTimeoutFactor", 7, @@ -345,7 +410,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { shardManager.tell(new FindPrimary("non-existent", false), kit.getRef()); - kit.expectMsgClass(kit.duration("5 seconds"), PrimaryNotFoundException.class); + kit.expectMsgClass(Duration.ofSeconds(5), PrimaryNotFoundException.class); } @Test @@ -370,7 +435,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef()); - LocalPrimaryShardFound primaryFound = kit.expectMsgClass(kit.duration("5 seconds"), + LocalPrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class); assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), primaryFound.getPrimaryPath().contains("member-1-shard-default")); @@ -398,7 +463,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef()); - kit.expectMsgClass(kit.duration("5 seconds"), NoShardLeaderException.class); + kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class); LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp ending"); } @@ -424,8 +489,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef()); - RemotePrimaryShardFound primaryFound = kit.expectMsgClass(kit.duration("5 seconds"), - RemotePrimaryShardFound.class); + RemotePrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class); assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), primaryFound.getPrimaryPath().contains("member-2-shard-default")); assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion()); @@ -440,7 +504,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef()); - kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class); + kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class); } @Test @@ -453,7 +517,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef()); - kit.expectMsgClass(kit.duration("5 seconds"), NoShardLeaderException.class); + kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class); } @Test @@ -472,7 +536,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef()); - kit.expectMsgClass(kit.duration("5 seconds"), NoShardLeaderException.class); + kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class); DataTree mockDataTree = mock(DataTree.class); shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree, @@ -480,7 +544,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef()); - LocalPrimaryShardFound primaryFound = kit.expectMsgClass(kit.duration("5 seconds"), + LocalPrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class); assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), primaryFound.getPrimaryPath().contains("member-1-shard-default")); @@ -504,30 +568,29 @@ public class ShardManagerTest extends AbstractShardManagerTest { // RoleChangeNotification. shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef()); - kit.expectNoMessage(FiniteDuration.create(150, TimeUnit.MILLISECONDS)); + kit.expectNoMessage(Duration.ofMillis(150)); shardManager.tell(new ActorInitialized(), mockShardActor); - kit.expectNoMessage(FiniteDuration.create(150, TimeUnit.MILLISECONDS)); + kit.expectNoMessage(Duration.ofMillis(150)); String memberId = "member-1-shard-default-" + shardMrgIDSuffix; shardManager.tell( new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor); - kit.expectNoMessage(FiniteDuration.create(150, TimeUnit.MILLISECONDS)); + kit.expectNoMessage(Duration.ofMillis(150)); DataTree mockDataTree = mock(DataTree.class); shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree, DataStoreVersions.CURRENT_VERSION), mockShardActor); - LocalPrimaryShardFound primaryFound = kit.expectMsgClass(kit.duration("5 seconds"), - LocalPrimaryShardFound.class); + LocalPrimaryShardFound primaryFound = kit.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class); assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), primaryFound.getPrimaryPath().contains("member-1-shard-default")); assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree()); - kit.expectNoMessage(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); + kit.expectNoMessage(Duration.ofMillis(200)); LOG.info("testOnReceiveFindPrimaryWaitForShardLeader ending"); } @@ -542,11 +605,11 @@ public class ShardManagerTest extends AbstractShardManagerTest { shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef()); - kit.expectMsgClass(kit.duration("2 seconds"), NotInitializedException.class); + kit.expectMsgClass(Duration.ofSeconds(2), NotInitializedException.class); shardManager.tell(new ActorInitialized(), mockShardActor); - kit.expectNoMessage(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); + kit.expectNoMessage(Duration.ofMillis(200)); LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard ending"); } @@ -564,7 +627,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef()); - kit.expectMsgClass(kit.duration("2 seconds"), NoShardLeaderException.class); + kit.expectMsgClass(Duration.ofSeconds(2), NoShardLeaderException.class); LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard ending"); } @@ -582,7 +645,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true),kit. getRef()); - kit.expectMsgClass(kit.duration("2 seconds"), NoShardLeaderException.class); + kit.expectMsgClass(Duration.ofSeconds(2), NoShardLeaderException.class); LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard ending"); } @@ -598,7 +661,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef()); - kit.expectMsgClass(kit.duration("2 seconds"), NoShardLeaderException.class); + kit.expectMsgClass(Duration.ofSeconds(2), NoShardLeaderException.class); LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard ending"); } @@ -651,7 +714,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { shardManager1.underlyingActor().waitForMemberUp(); shardManager1.tell(new FindPrimary("astronauts", false), kit.getRef()); - RemotePrimaryShardFound found = kit.expectMsgClass(kit.duration("5 seconds"), RemotePrimaryShardFound.class); + RemotePrimaryShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class); String path = found.getPrimaryPath(); assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config")); assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion()); @@ -666,7 +729,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { // // shardManager1.tell(new FindPrimary("astronauts", false), getRef()); // -// expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class); +// expectMsgClass(Duration.ofSeconds(5), PrimaryNotFoundException.class); LOG.info("testOnReceiveFindPrimaryForRemoteShard ending"); } @@ -726,7 +789,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { shardManager1.tell(new FindPrimary("default", true), kit.getRef()); - RemotePrimaryShardFound found = kit.expectMsgClass(kit.duration("5 seconds"), RemotePrimaryShardFound.class); + RemotePrimaryShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class); String path = found.getPrimaryPath(); assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config")); @@ -734,40 +797,29 @@ public class ShardManagerTest extends AbstractShardManagerTest { kit.getRef()); shardManager1.underlyingActor().waitForUnreachableMember(); - - PeerDown peerDown = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class); - assertEquals("getMemberName", MEMBER_2, peerDown.getMemberName()); MessageCollectorActor.clearMessages(mockShardActor1); shardManager1.tell(MockClusterWrapper.createMemberRemoved("member-2", "akka://cluster-test@127.0.0.1:2558"), kit.getRef()); - MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class); - shardManager1.tell(new FindPrimary("default", true), kit.getRef()); - kit.expectMsgClass(kit.duration("5 seconds"), NoShardLeaderException.class); + kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class); shardManager1.tell(MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"), kit.getRef()); shardManager1.underlyingActor().waitForReachableMember(); - PeerUp peerUp = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class); - assertEquals("getMemberName", MEMBER_2, peerUp.getMemberName()); - MessageCollectorActor.clearMessages(mockShardActor1); - shardManager1.tell(new FindPrimary("default", true), kit.getRef()); - RemotePrimaryShardFound found1 = kit.expectMsgClass(kit.duration("5 seconds"), RemotePrimaryShardFound.class); + RemotePrimaryShardFound found1 = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class); String path1 = found1.getPrimaryPath(); assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config")); shardManager1.tell(MockClusterWrapper.createMemberUp("member-2", "akka://cluster-test@127.0.0.1:2558"), kit.getRef()); - MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class); - // Test FindPrimary wait succeeds after reachable member event. shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2", @@ -779,7 +831,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { shardManager1.tell( MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"), kit.getRef()); - RemotePrimaryShardFound found2 = kit.expectMsgClass(kit.duration("5 seconds"), RemotePrimaryShardFound.class); + RemotePrimaryShardFound found2 = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class); String path2 = found2.getPrimaryPath(); assertTrue("Unexpected primary path " + path2, path2.contains("member-2-shard-default-config")); @@ -843,7 +895,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { shardManager1.tell(new FindPrimary("default", true), kit.getRef()); - RemotePrimaryShardFound found = kit.expectMsgClass(kit.duration("5 seconds"), RemotePrimaryShardFound.class); + RemotePrimaryShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), RemotePrimaryShardFound.class); String path = found.getPrimaryPath(); assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config")); @@ -857,7 +909,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { shardManager1.tell(new FindPrimary("default", true), kit.getRef()); - kit.expectMsgClass(kit.duration("5 seconds"), NoShardLeaderException.class); + kit.expectMsgClass(Duration.ofSeconds(5), NoShardLeaderException.class); assertNull("Expected primaryShardInfoCache entry removed", primaryShardInfoCache.getIfPresent("default")); @@ -870,7 +922,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { shardManager1.tell(new FindPrimary("default", true), kit.getRef()); - LocalPrimaryShardFound found1 = kit.expectMsgClass(kit.duration("5 seconds"), LocalPrimaryShardFound.class); + LocalPrimaryShardFound found1 = kit.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class); String path1 = found1.getPrimaryPath(); assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config")); @@ -939,8 +991,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { shardManager256.tell(new FindPrimary("default", true), kit256.getRef()); - LocalPrimaryShardFound found = kit256.expectMsgClass(kit256.duration("5 seconds"), - LocalPrimaryShardFound.class); + LocalPrimaryShardFound found = kit256.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class); String path = found.getPrimaryPath(); assertTrue("Unexpected primary path " + path + " which must on member-256", path.contains("member-256-shard-default-config")); @@ -956,7 +1007,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { // Make sure leader shard on member-256 is still leader and still in the cache. shardManager256.tell(new FindPrimary("default", true), kit256.getRef()); - found = kit256.expectMsgClass(kit256.duration("5 seconds"), LocalPrimaryShardFound.class); + found = kit256.expectMsgClass(Duration.ofSeconds(5), LocalPrimaryShardFound.class); path = found.getPrimaryPath(); assertTrue("Unexpected primary path " + path + " which must still not on member-256", path.contains("member-256-shard-default-config")); @@ -985,7 +1036,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { shardManager.tell(new FindLocalShard("non-existent", false), kit.getRef()); - LocalShardNotFound notFound = kit.expectMsgClass(kit.duration("5 seconds"), LocalShardNotFound.class); + LocalShardNotFound notFound = kit.expectMsgClass(Duration.ofSeconds(5), LocalShardNotFound.class); assertEquals("getShardName", "non-existent", notFound.getShardName()); } @@ -1000,7 +1051,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef()); - LocalShardFound found = kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class); + LocalShardFound found = kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class); assertTrue("Found path contains " + found.getPath().path().toString(), found.getPath().path().toString().contains("member-1-shard-default-config")); @@ -1013,7 +1064,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef()); - kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class); + kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class); } @Test @@ -1043,15 +1094,13 @@ public class ShardManagerTest extends AbstractShardManagerTest { TestShardManager shardManager = newTestShardManager(); String memberId = "member-1-shard-default-" + shardMrgIDSuffix; - shardManager.onReceiveCommand(new RoleChangeNotification( + shardManager.handleCommand(new RoleChangeNotification( memberId, RaftState.Candidate.name(), RaftState.Leader.name())); + assertFalse(ready.isDone()); - verify(ready, never()).countDown(); - - shardManager.onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId, + shardManager.handleCommand(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class), DataStoreVersions.CURRENT_VERSION)); - - verify(ready, times(1)).countDown(); + assertTrue(ready.isDone()); } @Test @@ -1060,17 +1109,15 @@ public class ShardManagerTest extends AbstractShardManagerTest { TestShardManager shardManager = newTestShardManager(); String memberId = "member-1-shard-default-" + shardMrgIDSuffix; - shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name())); - - verify(ready, never()).countDown(); + shardManager.handleCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name())); + assertFalse(ready.isDone()); - shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString())); + shardManager.handleCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString())); - shardManager.onReceiveCommand( + shardManager.handleCommand( new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix, mock(DataTree.class), DataStoreVersions.CURRENT_VERSION)); - - verify(ready, times(1)).countDown(); + assertTrue(ready.isDone()); } @Test @@ -1079,44 +1126,41 @@ public class ShardManagerTest extends AbstractShardManagerTest { TestShardManager shardManager = newTestShardManager(); String memberId = "member-1-shard-default-" + shardMrgIDSuffix; - shardManager.onReceiveCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name())); - - verify(ready, never()).countDown(); + shardManager.handleCommand(new RoleChangeNotification(memberId, null, RaftState.Follower.name())); + assertFalse(ready.isDone()); - shardManager.onReceiveCommand( + shardManager.handleCommand( new ShardLeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix, mock(DataTree.class), DataStoreVersions.CURRENT_VERSION)); - shardManager.onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString())); - - verify(ready, times(1)).countDown(); + shardManager.handleCommand(MockClusterWrapper.createMemberUp("member-2", kit.getRef().path().toString())); + assertTrue(ready.isDone()); } @Test public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception { TestShardManager shardManager = newTestShardManager(); - shardManager.onReceiveCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(), + shardManager.handleCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(), RaftState.Leader.name())); - - verify(ready, never()).countDown(); + assertFalse(ready.isDone()); } @Test public void testByDefaultSyncStatusIsFalse() { TestShardManager shardManager = newTestShardManager(); - assertEquals(false, shardManager.getMBean().getSyncStatus()); + assertFalse(shardManager.getMBean().getSyncStatus()); } @Test public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception { TestShardManager shardManager = newTestShardManager(); - shardManager.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, + shardManager.handleCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, RaftState.Follower.name(), RaftState.Leader.name())); - assertEquals(true, shardManager.getMBean().getSyncStatus()); + assertTrue(shardManager.getMBean().getSyncStatus()); } @Test @@ -1124,16 +1168,16 @@ public class ShardManagerTest extends AbstractShardManagerTest { TestShardManager shardManager = newTestShardManager(); String shardId = "member-1-shard-default-" + shardMrgIDSuffix; - shardManager.onReceiveCommand(new RoleChangeNotification(shardId, + shardManager.handleCommand(new RoleChangeNotification(shardId, RaftState.Follower.name(), RaftState.Candidate.name())); - assertEquals(false, shardManager.getMBean().getSyncStatus()); + assertFalse(shardManager.getMBean().getSyncStatus()); // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate - shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus( + shardManager.handleCommand(new FollowerInitialSyncUpStatus( true, shardId)); - assertEquals(false, shardManager.getMBean().getSyncStatus()); + assertFalse(shardManager.getMBean().getSyncStatus()); } @Test @@ -1141,21 +1185,21 @@ public class ShardManagerTest extends AbstractShardManagerTest { TestShardManager shardManager = newTestShardManager(); String shardId = "member-1-shard-default-" + shardMrgIDSuffix; - shardManager.onReceiveCommand(new RoleChangeNotification(shardId, + shardManager.handleCommand(new RoleChangeNotification(shardId, RaftState.Candidate.name(), RaftState.Follower.name())); // Initially will be false - assertEquals(false, shardManager.getMBean().getSyncStatus()); + assertFalse(shardManager.getMBean().getSyncStatus()); // Send status true will make sync status true - shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, shardId)); + shardManager.handleCommand(new FollowerInitialSyncUpStatus(true, shardId)); - assertEquals(true, shardManager.getMBean().getSyncStatus()); + assertTrue(shardManager.getMBean().getSyncStatus()); // Send status false will make sync status false - shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(false, shardId)); + shardManager.handleCommand(new FollowerInitialSyncUpStatus(false, shardId)); - assertEquals(false, shardManager.getMBean().getSyncStatus()); + assertFalse(shardManager.getMBean().getSyncStatus()); } @Test @@ -1169,36 +1213,36 @@ public class ShardManagerTest extends AbstractShardManagerTest { })); // Initially will be false - assertEquals(false, shardManager.getMBean().getSyncStatus()); + assertFalse(shardManager.getMBean().getSyncStatus()); // Make default shard leader String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix; - shardManager.onReceiveCommand(new RoleChangeNotification(defaultShardId, + shardManager.handleCommand(new RoleChangeNotification(defaultShardId, RaftState.Follower.name(), RaftState.Leader.name())); // default = Leader, astronauts is unknown so sync status remains false - assertEquals(false, shardManager.getMBean().getSyncStatus()); + assertFalse(shardManager.getMBean().getSyncStatus()); // Make astronauts shard leader as well String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix; - shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId, + shardManager.handleCommand(new RoleChangeNotification(astronautsShardId, RaftState.Follower.name(), RaftState.Leader.name())); // Now sync status should be true - assertEquals(true, shardManager.getMBean().getSyncStatus()); + assertTrue(shardManager.getMBean().getSyncStatus()); // Make astronauts a Follower - shardManager.onReceiveCommand(new RoleChangeNotification(astronautsShardId, + shardManager.handleCommand(new RoleChangeNotification(astronautsShardId, RaftState.Leader.name(), RaftState.Follower.name())); // Sync status is not true - assertEquals(false, shardManager.getMBean().getSyncStatus()); + assertFalse(shardManager.getMBean().getSyncStatus()); // Make the astronauts follower sync status true - shardManager.onReceiveCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId)); + shardManager.handleCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId)); // Sync status is now true - assertEquals(true, shardManager.getMBean().getSyncStatus()); + assertTrue(shardManager.getMBean().getSyncStatus()); LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards ending"); } @@ -1234,24 +1278,24 @@ public class ShardManagerTest extends AbstractShardManagerTest { .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider())) .withDispatcher(Dispatchers.DefaultDispatcherId())); - SchemaContext schemaContext = TEST_SCHEMA_CONTEXT; + EffectiveModelContext schemaContext = TEST_SCHEMA_CONTEXT; shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender()); DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100) .persistent(false).build(); Shard.Builder shardBuilder = Shard.builder(); - ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module", + ModuleShardConfiguration config = new ModuleShardConfiguration(XMLNamespace.of("foo-ns"), "foo-module", "foo", null, members("member-1", "member-5", "member-6")); shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), kit.getRef()); - kit.expectMsgClass(kit.duration("5 seconds"), Success.class); + kit.expectMsgClass(Duration.ofSeconds(5), Success.class); shardManager.tell(new FindLocalShard("foo", true), kit.getRef()); - kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class); + kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class); - assertEquals("isRecoveryApplicable", false, shardBuilder.getDatastoreContext().isPersistent()); + assertFalse("isRecoveryApplicable", shardBuilder.getDatastoreContext().isPersistent()); assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig() .getPeerAddressResolver() instanceof ShardPeerAddressResolver); assertEquals("peerMembers", Sets.newHashSet( @@ -1267,7 +1311,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef()); - Success success = kit.expectMsgClass(kit.duration("5 seconds"), Success.class); + Success success = kit.expectMsgClass(Duration.ofSeconds(5), Success.class); assertNotNull("Success status is null", success.status()); LOG.info("testOnCreateShard ending"); @@ -1286,14 +1330,14 @@ public class ShardManagerTest extends AbstractShardManagerTest { shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), ActorRef.noSender()); Shard.Builder shardBuilder = Shard.builder(); - ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module", + ModuleShardConfiguration config = new ModuleShardConfiguration(XMLNamespace.of("foo-ns"), "foo-module", "foo", null, members("member-5", "member-6")); shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef()); - kit.expectMsgClass(kit.duration("5 seconds"), Success.class); + kit.expectMsgClass(Duration.ofSeconds(5), Success.class); shardManager.tell(new FindLocalShard("foo", true), kit.getRef()); - kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class); + kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class); assertEquals("peerMembers size", 0, shardBuilder.getPeerAddresses().size()); assertEquals("schemaContext", DisableElectionsRaftPolicy.class.getName(), shardBuilder @@ -1312,18 +1356,18 @@ public class ShardManagerTest extends AbstractShardManagerTest { Shard.Builder shardBuilder = Shard.builder(); - ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module", + ModuleShardConfiguration config = new ModuleShardConfiguration(XMLNamespace.of("foo-ns"), "foo-module", "foo", null, members("member-1")); shardManager.tell(new CreateShard(config, shardBuilder, null), kit.getRef()); - kit.expectMsgClass(kit.duration("5 seconds"), Success.class); + kit.expectMsgClass(Duration.ofSeconds(5), Success.class); - SchemaContext schemaContext = TEST_SCHEMA_CONTEXT; + EffectiveModelContext schemaContext = TEST_SCHEMA_CONTEXT; shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender()); shardManager.tell(new FindLocalShard("foo", true), kit.getRef()); - kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class); + kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class); assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext()); assertNotNull("schemaContext is null", shardBuilder.getDatastoreContext()); @@ -1359,10 +1403,8 @@ public class ShardManagerTest extends AbstractShardManagerTest { assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType()); assertNull("Expected null ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot()); - Function shardNameTransformer = ShardSnapshot::getName; - assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), Sets.newHashSet( - datastoreSnapshot.getShardSnapshots().stream().map(shardNameTransformer).collect(Collectors.toSet()))); + datastoreSnapshot.getShardSnapshots().stream().map(ShardSnapshot::getName).collect(Collectors.toSet()))); // Add a new replica @@ -1383,7 +1425,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot"); assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"), Sets.newHashSet( - Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer))); + Lists.transform(datastoreSnapshot.getShardSnapshots(), ShardSnapshot::getName))); ShardManagerSnapshot snapshot = datastoreSnapshot.getShardManagerSnapshot(); assertNotNull("Expected ShardManagerSnapshot", snapshot); @@ -1406,7 +1448,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { .put("astronauts", Collections.emptyList()).build()); ShardManagerSnapshot snapshot = - new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts"), Collections.emptyMap()); + new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts")); DatastoreSnapshot restoreFromSnapshot = new DatastoreSnapshot(shardMrgIDSuffix, snapshot, Collections.emptyList()); TestActorRef shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(mockConfig) @@ -1441,9 +1483,9 @@ public class ShardManagerTest extends AbstractShardManagerTest { .withDispatcher(Dispatchers.DefaultDispatcherId())); shardManager.tell(new AddShardReplica("model-inventory"), kit.getRef()); - Status.Failure resp = kit.expectMsgClass(kit.duration("2 seconds"), Status.Failure.class); + Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(2), Status.Failure.class); - assertEquals("Failure obtained", true, resp.cause() instanceof IllegalArgumentException); + assertTrue("Failure obtained", resp.cause() instanceof IllegalArgumentException); } @Test @@ -1504,7 +1546,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { // persisted. String[] restoredShards = { "default", "people" }; ShardManagerSnapshot snapshot = - new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap()); + new ShardManagerSnapshot(Arrays.asList(restoredShards)); InMemorySnapshotStore.addSnapshot(shardManagerID, snapshot); Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS); @@ -1517,7 +1559,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { AddServer.class); String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix; assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId()); - kit.expectMsgClass(kit.duration("5 seconds"), Status.Success.class); + kit.expectMsgClass(Duration.ofSeconds(5), Status.Success.class); InMemorySnapshotStore.waitForSavedSnapshot(shardManagerID, ShardManagerSnapshot.class); InMemorySnapshotStore.waitForDeletedSnapshot(shardManagerID); @@ -1559,17 +1601,17 @@ public class ShardManagerTest extends AbstractShardManagerTest { MessageCollectorActor.expectFirstMatching(leaderShardActor, AddServer.class); - Failure resp = kit.expectMsgClass(kit.duration("5 seconds"), Failure.class); + Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class); assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass()); shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef()); - kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class); + kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class); // Send message again to verify previous in progress state is // cleared shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef()); - resp = kit.expectMsgClass(kit.duration("5 seconds"), Failure.class); + resp = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class); assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass()); // Send message again with an AddServer timeout to verify the @@ -1580,10 +1622,10 @@ public class ShardManagerTest extends AbstractShardManagerTest { datastoreContextBuilder.shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build()), kit.getRef()); leaderShardActor.tell(MockRespondActor.CLEAR_RESPONSE, ActorRef.noSender()); shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef()); - kit.expectMsgClass(kit.duration("5 seconds"), Failure.class); + kit.expectMsgClass(Duration.ofSeconds(5), Failure.class); shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef()); - kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class); + kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class); LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader ending"); } @@ -1604,11 +1646,11 @@ public class ShardManagerTest extends AbstractShardManagerTest { mockShardActor); shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), kit.getRef()); - Failure resp = kit.expectMsgClass(kit.duration("5 seconds"), Failure.class); + Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class); assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass()); shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef()); - kit.expectMsgClass(kit.duration("5 seconds"), LocalShardFound.class); + kit.expectMsgClass(Duration.ofSeconds(5), LocalShardFound.class); LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader ending"); } @@ -1640,18 +1682,18 @@ public class ShardManagerTest extends AbstractShardManagerTest { addServerMsg.getNewServerId()); mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.TIMEOUT, null)); - Failure failure = kit.expectMsgClass(kit.duration("5 seconds"), Failure.class); + Failure failure = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class); assertEquals("Failure cause", TimeoutException.class, failure.cause().getClass()); shardManager.tell(new FindLocalShard("astronauts", false), kit.getRef()); - kit.expectMsgClass(kit.duration("5 seconds"), LocalShardNotFound.class); + kit.expectMsgClass(Duration.ofSeconds(5), LocalShardNotFound.class); terminateWatcher.expectTerminated(mockNewReplicaShardActor); shardManager.tell(new AddShardReplica("astronauts"), kit.getRef()); mockShardLeaderKit.expectMsgClass(AddServer.class); mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.NO_LEADER, null)); - failure = kit.expectMsgClass(kit.duration("5 seconds"), Failure.class); + failure = kit.expectMsgClass(Duration.ofSeconds(5), Failure.class); assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass()); LOG.info("testAddShardReplicaWithAddServerReplyFailure ending"); @@ -1679,8 +1721,8 @@ public class ShardManagerTest extends AbstractShardManagerTest { AddressFromURIString.parse("akka://non-existent@127.0.0.1:5").toString()); newReplicaShardManager.tell(new AddShardReplica("astronauts"), kit.getRef()); - Status.Failure resp = kit.expectMsgClass(kit.duration("5 seconds"), Status.Failure.class); - assertEquals("Failure obtained", true, resp.cause() instanceof RuntimeException); + Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Status.Failure.class); + assertTrue("Failure obtained", resp.cause() instanceof RuntimeException); LOG.info("testAddShardReplicaWithFindPrimaryTimeout ending"); } @@ -1693,8 +1735,8 @@ public class ShardManagerTest extends AbstractShardManagerTest { .withDispatcher(Dispatchers.DefaultDispatcherId())); shardManager.tell(new RemoveShardReplica("model-inventory", MEMBER_1), kit.getRef()); - Status.Failure resp = kit.expectMsgClass(kit.duration("10 seconds"), Status.Failure.class); - assertEquals("Failure obtained", true, resp.cause() instanceof PrimaryNotFoundException); + Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(10), Status.Failure.class); + assertTrue("Failure obtained", resp.cause() instanceof PrimaryNotFoundException); } @Test @@ -1723,7 +1765,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { RemoveServer.class); assertEquals(ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString(), removeServer.getServerId()); - kit.expectMsgClass(kit.duration("5 seconds"), Success.class); + kit.expectMsgClass(Duration.ofSeconds(5), Success.class); } @Test @@ -1811,7 +1853,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { RemoveServer.class); String removeServerId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString(); assertEquals("RemoveServer serverId", removeServerId, removeServer.getServerId()); - kit.expectMsgClass(kit.duration("5 seconds"), Status.Success.class); + kit.expectMsgClass(Duration.ofSeconds(5), Status.Success.class); } @Test @@ -1853,7 +1895,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { shardManager.tell(secondServerChange, secondRequestKit.getRef()); - secondRequestKit.expectMsgClass(secondRequestKit.duration("5 seconds"), Failure.class); + secondRequestKit.expectMsgClass(Duration.ofSeconds(5), Failure.class); } @Test @@ -1870,10 +1912,10 @@ public class ShardManagerTest extends AbstractShardManagerTest { shardManager.underlyingActor().waitForRecoveryComplete(); shardManager.tell(new FindLocalShard("people", false), kit.getRef()); - kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class); + kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class); shardManager.tell(new FindLocalShard("default", false), kit.getRef()); - kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class); + kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class); // Removed the default shard replica from member-1 ShardIdentifier.Builder builder = new ShardIdentifier.Builder(); @@ -1931,7 +1973,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { .put("people", Arrays.asList("member-1", "member-2")).build()); String[] restoredShards = {"default", "astronauts"}; ShardManagerSnapshot snapshot = - new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap()); + new ShardManagerSnapshot(Arrays.asList(restoredShards)); InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot); // create shardManager to come up with restored data @@ -1941,7 +1983,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { newRestoredShardManager.underlyingActor().waitForRecoveryComplete(); newRestoredShardManager.tell(new FindLocalShard("people", false), kit.getRef()); - LocalShardNotFound notFound = kit.expectMsgClass(kit.duration("5 seconds"), LocalShardNotFound.class); + LocalShardNotFound notFound = kit.expectMsgClass(Duration.ofSeconds(5), LocalShardNotFound.class); assertEquals("for uninitialized shard", "people", notFound.getShardName()); // Verify a local shard is created for the restored shards, @@ -1949,10 +1991,10 @@ public class ShardManagerTest extends AbstractShardManagerTest { // as the actor initialization // message is not sent for them newRestoredShardManager.tell(new FindLocalShard("default", false), kit.getRef()); - kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class); + kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class); newRestoredShardManager.tell(new FindLocalShard("astronauts", false), kit.getRef()); - kit.expectMsgClass(kit.duration("5 seconds"), NotInitializedException.class); + kit.expectMsgClass(Duration.ofSeconds(5), NotInitializedException.class); LOG.info("testShardPersistenceWithRestoredData ending"); } @@ -2028,7 +2070,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { .create("default", MemberName.forName("member-2"), shardMrgIDSuffix).toString(), Boolean.TRUE)); - kit.expectMsgClass(kit.duration("5 seconds"), Success.class); + kit.expectMsgClass(Duration.ofSeconds(5), Success.class); } @Test @@ -2051,8 +2093,63 @@ public class ShardManagerTest extends AbstractShardManagerTest { MessageCollectorActor.expectFirstMatching(respondActor, ChangeServersVotingStatus.class); - Status.Failure resp = kit.expectMsgClass(kit.duration("5 seconds"), Status.Failure.class); - assertEquals("Failure resposnse", true, resp.cause() instanceof NoShardLeaderException); + Status.Failure resp = kit.expectMsgClass(Duration.ofSeconds(5), Status.Failure.class); + assertTrue("Failure resposnse", resp.cause() instanceof NoShardLeaderException); + } + + @SuppressWarnings("unchecked") + @Test + public void testRegisterForShardLeaderChanges() { + LOG.info("testRegisterForShardLeaderChanges starting"); + + final String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; + final String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; + final TestKit kit = new TestKit(getSystem()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); + + shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); + + final Consumer mockCallback = mock(Consumer.class); + shardManager.tell(new RegisterForShardAvailabilityChanges(mockCallback), kit.getRef()); + + final Success reply = kit.expectMsgClass(Duration.ofSeconds(5), Success.class); + final Registration reg = (Registration) reply.status(); + + final DataTree mockDataTree = mock(DataTree.class); + shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree, + DataStoreVersions.CURRENT_VERSION), mockShardActor); + + verify(mockCallback, timeout(5000)).accept("default"); + + reset(mockCallback); + shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree, + DataStoreVersions.CURRENT_VERSION), mockShardActor); + + Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + verifyNoMoreInteractions(mockCallback); + + shardManager.tell(new ShardLeaderStateChanged(memberId1, null, mockDataTree, + DataStoreVersions.CURRENT_VERSION), mockShardActor); + + verify(mockCallback, timeout(5000)).accept("default"); + + reset(mockCallback); + shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, mockDataTree, + DataStoreVersions.CURRENT_VERSION), mockShardActor); + + verify(mockCallback, timeout(5000)).accept("default"); + + reset(mockCallback); + reg.close(); + + shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree, + DataStoreVersions.CURRENT_VERSION), mockShardActor); + + Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + verifyNoMoreInteractions(mockCallback); + + LOG.info("testRegisterForShardLeaderChanges ending"); } public static class TestShardManager extends ShardManager { @@ -2119,37 +2216,36 @@ public class ShardManagerTest extends AbstractShardManagerTest { } void waitForRecoveryComplete() { - assertEquals("Recovery complete", true, + assertTrue("Recovery complete", Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS)); } public void waitForMemberUp() { - assertEquals("MemberUp received", true, + assertTrue("MemberUp received", Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS)); memberUpReceived = new CountDownLatch(1); } void waitForMemberRemoved() { - assertEquals("MemberRemoved received", true, + assertTrue("MemberRemoved received", Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS)); memberRemovedReceived = new CountDownLatch(1); } void waitForUnreachableMember() { - assertEquals("UnreachableMember received", true, - Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS - )); + assertTrue("UnreachableMember received", + Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS)); memberUnreachableReceived = new CountDownLatch(1); } void waitForReachableMember() { - assertEquals("ReachableMember received", true, + assertTrue("ReachableMember received", Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS)); memberReachableReceived = new CountDownLatch(1); } void verifyFindPrimary() { - assertEquals("FindPrimary received", true, + assertTrue("FindPrimary received", Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS)); findPrimaryMessageReceived = new CountDownLatch(1); } @@ -2168,7 +2264,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { } Builder shardActor(final ActorRef newShardActor) { - this.shardActor = newShardActor; + shardActor = newShardActor; return this; } @@ -2186,7 +2282,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { } void verifySnapshotPersisted(final Set shardList) { - assertEquals("saveSnapshot invoked", true, + assertTrue("saveSnapshot invoked", Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS)); assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList())); } @@ -2211,7 +2307,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { AbstractGenericCreator(final Class shardManagerClass) { this.shardManagerClass = shardManagerClass; - cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).waitTillReadyCountDownLatch(ready) + cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).readinessFuture(ready) .primaryShardInfoCache(new PrimaryShardInfoFutureCache()); }