X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fshardmanager%2FShardManagerTest.java;h=6e37a975ed6aafaa84849a9b1b0289d8f2d60065;hp=ad591a24d2c5f9cc460ce325a0941bc665dee3d9;hb=aa307bc6c06d9bcf8e877553af9babc95c42c39b;hpb=5464f50be733df1bbbe31cf05665d542d3b7c5e7 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java index ad591a24d2..6e37a975ed 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java @@ -40,14 +40,11 @@ import akka.util.Timeout; import com.google.common.base.Function; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; -import com.typesafe.config.ConfigFactory; import java.net.URI; import java.util.AbstractMap; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -60,22 +57,16 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; -import org.apache.commons.lang3.SerializationUtils; -import org.junit.After; -import org.junit.Before; import org.junit.Test; -import org.mockito.Mock; import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; import org.opendaylight.controller.cluster.access.concepts.MemberName; -import org.opendaylight.controller.cluster.datastore.AbstractActorTest; +import org.opendaylight.controller.cluster.datastore.AbstractShardManagerTest; import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl; import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory; +import org.opendaylight.controller.cluster.datastore.DistributedDataStore; import org.opendaylight.controller.cluster.datastore.Shard; -import org.opendaylight.controller.cluster.datastore.ShardManager.SchemaContextModules; -import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl; import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider; import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration; @@ -89,8 +80,6 @@ import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus; import org.opendaylight.controller.cluster.datastore.messages.CreateShard; -import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; -import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; @@ -103,6 +92,9 @@ import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShard import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica; import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; +import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot; import org.opendaylight.controller.cluster.datastore.utils.ForwardingActor; import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; @@ -111,7 +103,6 @@ import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.raft.RaftState; -import org.opendaylight.controller.cluster.raft.TestActorFactory; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; @@ -125,7 +116,6 @@ import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply; import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; -import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; @@ -137,78 +127,30 @@ import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; -public class ShardManagerTest extends AbstractActorTest { +public class ShardManagerTest extends AbstractShardManagerTest { private static final Logger LOG = LoggerFactory.getLogger(ShardManagerTest.class); - private static final MemberName MEMBER_1 = MemberName.forName("member-1"); private static final MemberName MEMBER_2 = MemberName.forName("member-2"); private static final MemberName MEMBER_3 = MemberName.forName("member-3"); - private static int ID_COUNTER = 1; - - private final String shardMrgIDSuffix = "config" + ID_COUNTER++; private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); - @Mock - private static CountDownLatch ready; - - private static ShardIdentifier mockShardName = ShardIdentifier.create(Shard.DEFAULT_NAME, MEMBER_1, "config"); - - private static TestActorRef mockShardActor = TestActorRef.create(getSystem(), - Props.create(MessageCollectorActor.class), mockShardName.toString()); - - private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder() - .dataStoreName(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS) - .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6); - - private final Collection actorSystems = new ArrayList<>(); - - private final TestActorFactory actorFactory = new TestActorFactory(getSystem()); - - @Before - public void setUp() { - MockitoAnnotations.initMocks(this); - - InMemoryJournal.clear(); - InMemorySnapshotStore.clear(); - - mockShardActor.underlyingActor().clear(); - } - - @After - public void tearDown() { - InMemoryJournal.clear(); - InMemorySnapshotStore.clear(); - - for (ActorSystem system: actorSystems) { - JavaTestKit.shutdownActorSystem(system, null, Boolean.TRUE); - } - - actorFactory.close(); - } - private ActorSystem newActorSystem(String config) { - ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(config)); - actorSystems.add(system); - return system; + return newActorSystem("cluster-test", config); } private ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) { String name = ShardIdentifier.create(shardName, MemberName.forName(memberName), "config").toString(); if (system == getSystem()) { - return actorFactory.createTestActor(Props.create(MessageCollectorActor.class), name); + return actorFactory.createActor(Props.create(MessageCollectorActor.class), name); } - return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name); + return system.actorOf(Props.create(MessageCollectorActor.class), name); } private Props newShardMgrProps() { return newShardMgrProps(new MockConfiguration()); } - private Props newShardMgrProps(Configuration config) { - return newTestShardMgrBuilder(config).props(); - } - private static DatastoreContextFactory newDatastoreContextFactory(DatastoreContext datastoreContext) { DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class); Mockito.doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext(); @@ -216,20 +158,13 @@ public class ShardManagerTest extends AbstractActorTest { return mockFactory; } - private TestShardManager.Builder newTestShardMgrBuilder() { - return TestShardManager.builder(datastoreContextBuilder); - } - - private TestShardManager.Builder newTestShardMgrBuilder(Configuration config) { - return TestShardManager.builder(datastoreContextBuilder).configuration(config); - } - private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor() { return newTestShardMgrBuilderWithMockShardActor(mockShardActor); } private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(ActorRef shardActor) { - return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor); + return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor) + .distributedDataStore(mock(DistributedDataStore.class)); } @@ -239,7 +174,8 @@ public class ShardManagerTest extends AbstractActorTest { } private Props newPropsShardMgrWithMockShardActor(ActorRef shardActor) { - return newTestShardMgrBuilderWithMockShardActor(shardActor).props(); + return newTestShardMgrBuilderWithMockShardActor(shardActor).props() + .withDispatcher(Dispatchers.DefaultDispatcherId()); } @@ -308,9 +244,9 @@ public class ShardManagerTest extends AbstractActorTest { } }; - final TestActorRef defaultShardActor = actorFactory.createTestActor( + final ActorRef defaultShardActor = actorFactory.createActor( Props.create(MessageCollectorActor.class), actorFactory.generateActorId("default")); - final TestActorRef topologyShardActor = actorFactory.createTestActor( + final ActorRef topologyShardActor = actorFactory.createActor( Props.create(MessageCollectorActor.class), actorFactory.generateActorId("topology")); final Map> shardInfoMap = Collections.synchronizedMap( @@ -695,7 +631,7 @@ public class ShardManagerTest extends AbstractActorTest { // Create an ActorSystem ShardManager actor for member-1. final ActorSystem system1 = newActorSystem("Member1"); - Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558")); final TestActorRef shardManager1 = TestActorRef.create(system1, newTestShardMgrBuilderWithMockShardActor().cluster( @@ -706,7 +642,7 @@ public class ShardManagerTest extends AbstractActorTest { final ActorSystem system2 = newActorSystem("Member2"); - Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558")); final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2"); @@ -745,7 +681,7 @@ public class ShardManagerTest extends AbstractActorTest { shardManager2.underlyingActor().verifyFindPrimary(); - Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + Cluster.get(system2).down(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558")); shardManager1.underlyingActor().waitForMemberRemoved(); @@ -766,7 +702,7 @@ public class ShardManagerTest extends AbstractActorTest { // Create an ActorSystem ShardManager actor for member-1. final ActorSystem system1 = newActorSystem("Member1"); - Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558")); final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1"); @@ -779,7 +715,7 @@ public class ShardManagerTest extends AbstractActorTest { final ActorSystem system2 = newActorSystem("Member2"); - Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558")); final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2"); @@ -819,7 +755,7 @@ public class ShardManagerTest extends AbstractActorTest { assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config")); shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2", - "akka.tcp://cluster-test@127.0.0.1:2558"), getRef()); + "akka://cluster-test@127.0.0.1:2558"), getRef()); shardManager1.underlyingActor().waitForUnreachableMember(); @@ -828,7 +764,7 @@ public class ShardManagerTest extends AbstractActorTest { MessageCollectorActor.clearMessages(mockShardActor1); shardManager1.tell( - MockClusterWrapper.createMemberRemoved("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), + MockClusterWrapper.createMemberRemoved("member-2", "akka://cluster-test@127.0.0.1:2558"), getRef()); MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class); @@ -838,7 +774,7 @@ public class ShardManagerTest extends AbstractActorTest { expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); shardManager1.tell( - MockClusterWrapper.createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), + MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"), getRef()); shardManager1.underlyingActor().waitForReachableMember(); @@ -854,7 +790,7 @@ public class ShardManagerTest extends AbstractActorTest { assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config")); shardManager1.tell( - MockClusterWrapper.createMemberUp("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), + MockClusterWrapper.createMemberUp("member-2", "akka://cluster-test@127.0.0.1:2558"), getRef()); MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class); @@ -862,13 +798,13 @@ public class ShardManagerTest extends AbstractActorTest { // Test FindPrimary wait succeeds after reachable member event. shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2", - "akka.tcp://cluster-test@127.0.0.1:2558"), getRef()); + "akka://cluster-test@127.0.0.1:2558"), getRef()); shardManager1.underlyingActor().waitForUnreachableMember(); shardManager1.tell(new FindPrimary("default", true), getRef()); shardManager1.tell( - MockClusterWrapper.createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"), + MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"), getRef()); RemotePrimaryShardFound found2 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class); @@ -888,7 +824,7 @@ public class ShardManagerTest extends AbstractActorTest { // Create an ActorSystem ShardManager actor for member-1. final ActorSystem system1 = newActorSystem("Member1"); - Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558")); final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1"); @@ -903,7 +839,7 @@ public class ShardManagerTest extends AbstractActorTest { final ActorSystem system2 = newActorSystem("Member2"); - Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558")); final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2"); @@ -946,7 +882,7 @@ public class ShardManagerTest extends AbstractActorTest { system1.actorSelection(mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION)); shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2", - "akka.tcp://cluster-test@127.0.0.1:2558"), getRef()); + "akka://cluster-test@127.0.0.1:2558"), getRef()); shardManager1.underlyingActor().waitForUnreachableMember(); @@ -1050,24 +986,6 @@ public class ShardManagerTest extends AbstractActorTest { LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting"); } - @Test - public void testOnRecoveryJournalIsCleaned() { - String persistenceID = "shard-manager-" + shardMrgIDSuffix; - InMemoryJournal.addEntry(persistenceID, 1L, new SchemaContextModules(ImmutableSet.of("foo"))); - InMemoryJournal.addEntry(persistenceID, 2L, new SchemaContextModules(ImmutableSet.of("bar"))); - InMemoryJournal.addDeleteMessagesCompleteLatch(persistenceID); - - newTestShardManager(); - - InMemoryJournal.waitForDeleteMessagesComplete(persistenceID); - - // Journal entries up to the last one should've been deleted - Map journal = InMemoryJournal.get(persistenceID); - synchronized (journal) { - assertEquals("Journal size", 0, journal.size()); - } - } - @Test public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception { TestShardManager shardManager = newTestShardManager(); @@ -1274,7 +1192,8 @@ public class ShardManagerTest extends AbstractActorTest { datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true); ActorRef shardManager = actorFactory - .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))); + .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider())) + .withDispatcher(Dispatchers.DefaultDispatcherId())); SchemaContext schemaContext = TestModel.createTestContext(); shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender()); @@ -1325,7 +1244,8 @@ public class ShardManagerTest extends AbstractActorTest { datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true); ActorRef shardManager = actorFactory - .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))); + .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider())) + .withDispatcher(Dispatchers.DefaultDispatcherId())); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender()); @@ -1354,7 +1274,8 @@ public class ShardManagerTest extends AbstractActorTest { new JavaTestKit(getSystem()) { { ActorRef shardManager = actorFactory - .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))); + .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider())) + .withDispatcher(Dispatchers.DefaultDispatcherId())); Shard.Builder shardBuilder = Shard.builder(); @@ -1388,8 +1309,8 @@ public class ShardManagerTest extends AbstractActorTest { .put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1")) .put("astronauts", Collections.emptyList()).build()); - TestActorRef shardManager = actorFactory - .createTestActor(newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId())); + TestActorRef shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig) + .withDispatcher(Dispatchers.DefaultDispatcherId())); shardManager.tell(GetSnapshot.INSTANCE, kit.getRef()); Failure failure = kit.expectMsgClass(Failure.class); @@ -1433,9 +1354,8 @@ public class ShardManagerTest extends AbstractActorTest { assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"), Sets.newHashSet( Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer))); - byte[] snapshotBytes = datastoreSnapshot.getShardManagerSnapshot(); - assertNotNull("Expected ShardManagerSnapshot", snapshotBytes); - ShardManagerSnapshot snapshot = SerializationUtils.deserialize(snapshotBytes); + ShardManagerSnapshot snapshot = datastoreSnapshot.getShardManagerSnapshot(); + assertNotNull("Expected ShardManagerSnapshot", snapshot); assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"), Sets.newHashSet(snapshot.getShardList())); @@ -1454,9 +1374,10 @@ public class ShardManagerTest extends AbstractActorTest { .put("shard1", Collections.emptyList()).put("shard2", Collections.emptyList()) .put("astronauts", Collections.emptyList()).build()); - ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts")); - DatastoreSnapshot restoreFromSnapshot = new DatastoreSnapshot(shardMrgIDSuffix, - SerializationUtils.serialize(snapshot), Collections.emptyList()); + ShardManagerSnapshot snapshot = + new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts"), Collections.emptyMap()); + DatastoreSnapshot restoreFromSnapshot = new DatastoreSnapshot(shardMrgIDSuffix, snapshot, + Collections.emptyList()); TestActorRef shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(mockConfig) .restoreFromSnapshot(restoreFromSnapshot).props().withDispatcher(Dispatchers.DefaultDispatcherId())); @@ -1474,11 +1395,9 @@ public class ShardManagerTest extends AbstractActorTest { assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType()); - byte[] snapshotBytes = datastoreSnapshot.getShardManagerSnapshot(); - assertNotNull("Expected ShardManagerSnapshot", snapshotBytes); - snapshot = SerializationUtils.deserialize(snapshotBytes); + assertNotNull("Expected ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot()); assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"), - Sets.newHashSet(snapshot.getShardList())); + Sets.newHashSet(datastoreSnapshot.getShardManagerSnapshot().getShardList())); LOG.info("testRestoreFromSnapshot ending"); } @@ -1488,7 +1407,8 @@ public class ShardManagerTest extends AbstractActorTest { new JavaTestKit(getSystem()) { { ActorRef shardManager = actorFactory - .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))); + .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider())) + .withDispatcher(Dispatchers.DefaultDispatcherId())); shardManager.tell(new AddShardReplica("model-inventory"), getRef()); Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class); @@ -1510,7 +1430,7 @@ public class ShardManagerTest extends AbstractActorTest { // Create an ActorSystem ShardManager actor for member-1. final ActorSystem system1 = newActorSystem("Member1"); - Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558")); ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1"); final TestActorRef newReplicaShardManager = TestActorRef.create(system1, newTestShardMgrBuilder(mockConfig).shardActor(mockDefaultShardActor) @@ -1520,7 +1440,7 @@ public class ShardManagerTest extends AbstractActorTest { // Create an ActorSystem ShardManager actor for member-2. final ActorSystem system2 = newActorSystem("Member2"); - Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558")); String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix; String name = ShardIdentifier.create("astronauts", MEMBER_2, "config").toString(); @@ -1556,7 +1476,8 @@ public class ShardManagerTest extends AbstractActorTest { // Have a dummy snapshot to be overwritten by the new data // persisted. String[] restoredShards = { "default", "people" }; - ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards)); + ShardManagerSnapshot snapshot = + new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap()); InMemorySnapshotStore.addSnapshot(shardManagerID, snapshot); Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS); @@ -1686,7 +1607,8 @@ public class ShardManagerTest extends AbstractActorTest { ActorRef mockNewReplicaShardActor = newMockShardActor(getSystem(), "astronauts", "member-1"); final TestActorRef shardManager = actorFactory.createTestActor( - newTestShardMgrBuilder(mockConfig).shardActor(mockNewReplicaShardActor).props(), shardMgrID); + newTestShardMgrBuilder(mockConfig).shardActor(mockNewReplicaShardActor).props() + .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID); shardManager.underlyingActor() .setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef())); @@ -1737,11 +1659,12 @@ public class ShardManagerTest extends AbstractActorTest { .put("astronauts", Arrays.asList("member-2")).build()); final ActorRef newReplicaShardManager = actorFactory - .createActor(newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor).props(), shardMgrID); + .createActor(newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor).props() + .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID); newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2", - AddressFromURIString.parse("akka.tcp://non-existent@127.0.0.1:5").toString()); + AddressFromURIString.parse("akka://non-existent@127.0.0.1:5").toString()); newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef()); Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class); @@ -1757,7 +1680,8 @@ public class ShardManagerTest extends AbstractActorTest { new JavaTestKit(getSystem()) { { ActorRef shardManager = actorFactory - .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))); + .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider())) + .withDispatcher(Dispatchers.DefaultDispatcherId())); shardManager.tell(new RemoveShardReplica("model-inventory", MEMBER_1), getRef()); Status.Failure resp = expectMsgClass(duration("10 seconds"), Status.Failure.class); @@ -1775,9 +1699,8 @@ public class ShardManagerTest extends AbstractActorTest { { String memberId = "member-1-shard-default-" + shardMrgIDSuffix; - final TestActorRef respondActor = actorFactory - .createTestActor(Props.create(MockRespondActor.class, RemoveServer.class, - new RemoveServerReply(ServerChangeStatus.OK, null)), memberId); + final ActorRef respondActor = actorFactory.createActor(Props.create(MockRespondActor.class, + RemoveServer.class, new RemoveServerReply(ServerChangeStatus.OK, null)), memberId); ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor)); @@ -1809,7 +1732,7 @@ public class ShardManagerTest extends AbstractActorTest { // Create an ActorSystem ShardManager actor for member-1. final ActorSystem system1 = newActorSystem("Member1"); - Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558")); ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1"); final TestActorRef newReplicaShardManager = TestActorRef.create(system1, @@ -1819,7 +1742,7 @@ public class ShardManagerTest extends AbstractActorTest { // Create an ActorSystem ShardManager actor for member-2. final ActorSystem system2 = newActorSystem("Member2"); - Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558")); String name = ShardIdentifier.create("default", MEMBER_2, shardMrgIDSuffix).toString(); String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; @@ -1835,11 +1758,11 @@ public class ShardManagerTest extends AbstractActorTest { shardManagerID); // Because mockShardLeaderActor is created at the top level of the actor system it has an address like so, - // akka.tcp://cluster-test@127.0.0.1:2559/user/member-2-shard-default-config1 + // akka://cluster-test@127.0.0.1:2559/user/member-2-shard-default-config1 // However when a shard manager has a local shard which is a follower and a leader that is remote it will // try to compute an address for the remote shard leader using the ShardPeerAddressResolver. This address will // look like so, - // akka.tcp://cluster-test@127.0.0.1:2559/user/shardmanager-config1/member-2-shard-default-config1 + // akka://cluster-test@127.0.0.1:2559/user/shardmanager-config1/member-2-shard-default-config1 // In this specific case if we did a FindPrimary for shard default from member-1 we would come up // with the address of an actor which does not exist, therefore any message sent to that actor would go to // dead letters. @@ -1946,8 +1869,8 @@ public class ShardManagerTest extends AbstractActorTest { .put("astronauts", Arrays.asList("member-2")) .put("people", Arrays.asList("member-1", "member-2")).build()); - TestActorRef shardManager = actorFactory - .createTestActor(newShardMgrProps(mockConfig)); + TestActorRef shardManager = actorFactory.createTestActor( + newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId())); shardManager.underlyingActor().waitForRecoveryComplete(); shardManager.tell(new FindLocalShard("people", false), getRef()); @@ -1980,11 +1903,11 @@ public class ShardManagerTest extends AbstractActorTest { .put("people", Arrays.asList("member-1", "member-2")).build()); String shardId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString(); - TestActorRef shard = actorFactory.createTestActor(MessageCollectorActor.props(), - shardId); + ActorRef shard = actorFactory.createActor(MessageCollectorActor.props(), shardId); TestActorRef shardManager = actorFactory - .createTestActor(newTestShardMgrBuilder(mockConfig).addShardActor("default", shard).props()); + .createTestActor(newTestShardMgrBuilder(mockConfig).addShardActor("default", shard).props() + .withDispatcher(Dispatchers.DefaultDispatcherId())); shardManager.underlyingActor().waitForRecoveryComplete(); @@ -2011,17 +1934,19 @@ public class ShardManagerTest extends AbstractActorTest { LOG.info("testShardPersistenceWithRestoredData starting"); new JavaTestKit(getSystem()) { { - MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.>builder() - .put("default", Arrays.asList("member-1", "member-2")) - .put("astronauts", Arrays.asList("member-2")) - .put("people", Arrays.asList("member-1", "member-2")).build()); - String[] restoredShards = { "default", "astronauts" }; - ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards)); + MockConfiguration mockConfig = + new MockConfiguration(ImmutableMap.>builder() + .put("default", Arrays.asList("member-1", "member-2")) + .put("astronauts", Arrays.asList("member-2")) + .put("people", Arrays.asList("member-1", "member-2")).build()); + String[] restoredShards = {"default", "astronauts"}; + ShardManagerSnapshot snapshot = + new ShardManagerSnapshot(Arrays.asList(restoredShards), Collections.emptyMap()); InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot); // create shardManager to come up with restored data - TestActorRef newRestoredShardManager = actorFactory - .createTestActor(newShardMgrProps(mockConfig)); + TestActorRef newRestoredShardManager = actorFactory.createTestActor( + newShardMgrProps(mockConfig).withDispatcher(Dispatchers.DefaultDispatcherId())); newRestoredShardManager.underlyingActor().waitForRecoveryComplete(); @@ -2053,17 +1978,13 @@ public class ShardManagerTest extends AbstractActorTest { .put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1")).build()); String shardId1 = ShardIdentifier.create("shard1", MEMBER_1, shardMrgIDSuffix).toString(); - TestActorRef shard1 = actorFactory.createTestActor( - MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId1); + ActorRef shard1 = actorFactory.createActor(MessageCollectorActor.props(), shardId1); String shardId2 = ShardIdentifier.create("shard2", MEMBER_1, shardMrgIDSuffix).toString(); - TestActorRef shard2 = actorFactory.createTestActor( - MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId2); + ActorRef shard2 = actorFactory.createActor(MessageCollectorActor.props(), shardId2); - TestActorRef shardManager = actorFactory - .createTestActor(newTestShardMgrBuilder(mockConfig).addShardActor("shard1", shard1) - .addShardActor("shard2", shard2).props() - .withDispatcher(Dispatchers.DefaultDispatcherId())); + ActorRef shardManager = actorFactory.createActor(newTestShardMgrBuilder(mockConfig) + .addShardActor("shard1", shard1).addShardActor("shard2", shard2).props()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), shard1); @@ -2099,8 +2020,8 @@ public class ShardManagerTest extends AbstractActorTest { { String memberId = "member-1-shard-default-" + shardMrgIDSuffix; - TestActorRef respondActor = actorFactory - .createTestActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class, + ActorRef respondActor = actorFactory + .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class, new ServerChangeReply(ServerChangeStatus.OK, null)), memberId); ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor)); @@ -2135,8 +2056,8 @@ public class ShardManagerTest extends AbstractActorTest { { String memberId = "member-1-shard-default-" + shardMrgIDSuffix; - TestActorRef respondActor = actorFactory - .createTestActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class, + ActorRef respondActor = actorFactory + .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class, new ServerChangeReply(ServerChangeStatus.NO_LEADER, null)), memberId); ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor)); @@ -2157,7 +2078,7 @@ public class ShardManagerTest extends AbstractActorTest { }; } - private static class TestShardManager extends ShardManager { + public static class TestShardManager extends ShardManager { private final CountDownLatch recoveryComplete = new CountDownLatch(1); private final CountDownLatch snapshotPersist = new CountDownLatch(1); private ShardManagerSnapshot snapshot; @@ -2225,7 +2146,7 @@ public class ShardManagerTest extends AbstractActorTest { Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS)); } - void waitForMemberUp() { + public void waitForMemberUp() { assertEquals("MemberUp received", true, Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS)); memberUpReceived = new CountDownLatch(1); @@ -2260,7 +2181,7 @@ public class ShardManagerTest extends AbstractActorTest { return new Builder(datastoreContextBuilder); } - private static class Builder extends AbstractGenericCreator { + public static class Builder extends AbstractGenericCreator { private ActorRef shardActor; private final Map shardActors = new HashMap<>(); @@ -2313,7 +2234,7 @@ public class ShardManagerTest extends AbstractActorTest { AbstractGenericCreator(Class shardManagerClass) { this.shardManagerClass = shardManagerClass; - cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).waitTillReadyCountdownLatch(ready) + cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).waitTillReadyCountDownLatch(ready) .primaryShardInfoCache(new PrimaryShardInfoFutureCache()); }