From dea5df204ec3a8d98840979e976952fbf653296f Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Sun, 31 Dec 2023 05:44:51 +0100 Subject: [PATCH] Refactor ActorInitialized This message is only ever sent from Shard to ShardManager, which its parent. Change it to a record holding the actor reference, so that users do not need to muck with getSender(). Change-Id: Ib26c328e60c8bfd86075e12c39c271978f157d15 Signed-off-by: Robert Varga --- .../controller/cluster/datastore/Shard.java | 4 +- .../datastore/messages/ActorInitialized.java | 11 ++-- .../datastore/shardmanager/ShardManager.java | 11 +--- .../shardmanager/ShardManagerTest.java | 64 +++++++++---------- 4 files changed, 44 insertions(+), 46 deletions(-) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index ac32fe5c32..a7927d824d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -971,13 +971,13 @@ public class Shard extends RaftActor { restoreFromSnapshot = null; //notify shard manager - getContext().parent().tell(new ActorInitialized(), getSelf()); + getContext().parent().tell(new ActorInitialized(getSelf()), ActorRef.noSender()); // Being paranoid here - this method should only be called once but just in case... if (txCommitTimeoutCheckSchedule == null) { // Schedule a message to be periodically sent to check if the current in-progress // transaction should be expired and aborted. - FiniteDuration period = FiniteDuration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS); + final var period = FiniteDuration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS); txCommitTimeoutCheckSchedule = getContext().system().scheduler().schedule( period, period, getSelf(), TX_COMMIT_TIMEOUT_CHECK_MESSAGE, getContext().dispatcher(), ActorRef.noSender()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ActorInitialized.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ActorInitialized.java index 09c5b739cf..cd6e0d8cfa 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ActorInitialized.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ActorInitialized.java @@ -7,11 +7,14 @@ */ package org.opendaylight.controller.cluster.datastore.messages; -import java.io.Serializable; +import static java.util.Objects.requireNonNull; -public class ActorInitialized implements Serializable { - private static final long serialVersionUID = 1L; +import akka.actor.ActorRef; +import org.eclipse.jdt.annotation.NonNullByDefault; - public ActorInitialized() { +@NonNullByDefault +public record ActorInitialized(ActorRef actorRef) { + public ActorInitialized { + requireNonNull(actorRef); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java index 48917a6291..22bff493cf 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java @@ -708,12 +708,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void onActorInitialized(final ActorInitialized message) { - final ActorRef sender = getSender(); - - if (sender == null) { - // why is a non-actor sending this message? Just ignore. - return; - } + final var sender = message.actorRef(); String actorName = sender.path().name(); //find shard name from actor name; actor name is stringified shardId @@ -744,8 +739,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { protected void handleRecover(final Object message) throws Exception { if (message instanceof RecoveryCompleted) { onRecoveryCompleted(); - } else if (message instanceof SnapshotOffer) { - applyShardManagerSnapshot((ShardManagerSnapshot)((SnapshotOffer) message).snapshot()); + } else if (message instanceof SnapshotOffer msg) { + applyShardManagerSnapshot((ShardManagerSnapshot) msg.snapshot()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java index 9d31d21544..b56d8020cd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java @@ -423,7 +423,7 @@ public class ShardManagerTest extends AbstractClusterRefActorTest { final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); - shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender()); DataTree mockDataTree = mock(DataTree.class); shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree, @@ -452,7 +452,7 @@ public class ShardManagerTest extends AbstractClusterRefActorTest { final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); - shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender()); String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; @@ -476,7 +476,7 @@ public class ShardManagerTest extends AbstractClusterRefActorTest { final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); - shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender()); String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; MockClusterWrapper.sendMemberUp(shardManager, "member-2", kit.getRef().path().toString()); @@ -514,7 +514,7 @@ public class ShardManagerTest extends AbstractClusterRefActorTest { final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); - shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender()); shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef()); @@ -528,7 +528,7 @@ public class ShardManagerTest extends AbstractClusterRefActorTest { final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); - shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender()); String memberId = "member-1-shard-default-" + shardMrgIDSuffix; shardManager.tell( @@ -571,7 +571,7 @@ public class ShardManagerTest extends AbstractClusterRefActorTest { kit.expectNoMessage(Duration.ofMillis(150)); - shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender()); kit.expectNoMessage(Duration.ofMillis(150)); @@ -608,7 +608,7 @@ public class ShardManagerTest extends AbstractClusterRefActorTest { kit.expectMsgClass(Duration.ofSeconds(2), NotInitializedException.class); - shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender()); kit.expectNoMessage(Duration.ofMillis(200)); @@ -622,7 +622,7 @@ public class ShardManagerTest extends AbstractClusterRefActorTest { final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); - shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender()); shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null, RaftState.Candidate.name()), mockShardActor); @@ -640,7 +640,7 @@ public class ShardManagerTest extends AbstractClusterRefActorTest { final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); - shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender()); shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null, RaftState.IsolatedLeader.name()), mockShardActor); @@ -658,7 +658,7 @@ public class ShardManagerTest extends AbstractClusterRefActorTest { final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); - shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender()); shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), kit.getRef()); @@ -703,7 +703,7 @@ public class ShardManagerTest extends AbstractClusterRefActorTest { shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); - shardManager2.tell(new ActorInitialized(), mockShardActor2); + shardManager2.tell(new ActorInitialized(mockShardActor2), ActorRef.noSender()); String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix; short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1; @@ -771,8 +771,8 @@ public class ShardManagerTest extends AbstractClusterRefActorTest { final TestKit kit = new TestKit(system1); shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); - shardManager1.tell(new ActorInitialized(), mockShardActor1); - shardManager2.tell(new ActorInitialized(), mockShardActor2); + shardManager1.tell(new ActorInitialized(mockShardActor1), ActorRef.noSender()); + shardManager2.tell(new ActorInitialized(mockShardActor1), ActorRef.noSender()); String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; @@ -877,8 +877,8 @@ public class ShardManagerTest extends AbstractClusterRefActorTest { final TestKit kit = new TestKit(system1); shardManager1.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); - shardManager1.tell(new ActorInitialized(), mockShardActor1); - shardManager2.tell(new ActorInitialized(), mockShardActor2); + shardManager1.tell(new ActorInitialized(mockShardActor1), ActorRef.noSender()); + shardManager2.tell(new ActorInitialized(mockShardActor2), ActorRef.noSender()); String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; @@ -973,8 +973,8 @@ public class ShardManagerTest extends AbstractClusterRefActorTest { final TestKit kit256 = new TestKit(system256); shardManager256.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit256.getRef()); shardManager2.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit256.getRef()); - shardManager256.tell(new ActorInitialized(), mockShardActor256); - shardManager2.tell(new ActorInitialized(), mockShardActor2); + shardManager256.tell(new ActorInitialized(mockShardActor256), ActorRef.noSender()); + shardManager2.tell(new ActorInitialized(mockShardActor2), ActorRef.noSender()); String memberId256 = "member-256-shard-default-" + shardMrgIDSuffix; String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; @@ -1048,7 +1048,7 @@ public class ShardManagerTest extends AbstractClusterRefActorTest { final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); - shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender()); shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), kit.getRef()); @@ -1082,7 +1082,7 @@ public class ShardManagerTest extends AbstractClusterRefActorTest { Future future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true), new Timeout(5, TimeUnit.SECONDS)); - shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender()); Object resp = Await.result(future, kit.duration("5 seconds")); assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound); @@ -1254,7 +1254,7 @@ public class ShardManagerTest extends AbstractClusterRefActorTest { final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); - shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender()); shardManager.tell(new SwitchShardBehavior(mockShardName, RaftState.Leader, 1000), kit.getRef()); @@ -1530,7 +1530,7 @@ public class ShardManagerTest extends AbstractClusterRefActorTest { newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); leaderShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); - leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor); + leaderShardManager.tell(new ActorInitialized(mockShardLeaderActor), ActorRef.noSender()); short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1; leaderShardManager.tell( @@ -1581,7 +1581,7 @@ public class ShardManagerTest extends AbstractClusterRefActorTest { .createTestActor(newPropsShardMgrWithMockShardActor(), shardMgrID); shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); - shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender()); String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix; AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null); @@ -1639,7 +1639,7 @@ public class ShardManagerTest extends AbstractClusterRefActorTest { ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); - shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender()); shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class), DataStoreVersions.CURRENT_VERSION), kit.getRef()); shardManager.tell( @@ -1754,7 +1754,7 @@ public class ShardManagerTest extends AbstractClusterRefActorTest { ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor)); shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); - shardManager.tell(new ActorInitialized(), respondActor); + shardManager.tell(new ActorInitialized(respondActor), ActorRef.noSender()); shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class), DataStoreVersions.CURRENT_VERSION), kit.getRef()); shardManager.tell( @@ -1826,8 +1826,8 @@ public class ShardManagerTest extends AbstractClusterRefActorTest { newReplicaShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); leaderShardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); - leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor); - newReplicaShardManager.tell(new ActorInitialized(), mockShardLeaderActor); + leaderShardManager.tell(new ActorInitialized(mockShardLeaderActor), ActorRef.noSender()); + newReplicaShardManager.tell(new ActorInitialized(mockShardLeaderActor), ActorRef.noSender()); short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1; leaderShardManager.tell( @@ -1948,7 +1948,7 @@ public class ShardManagerTest extends AbstractClusterRefActorTest { shardManager.underlyingActor().waitForRecoveryComplete(); shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); - shardManager.tell(new ActorInitialized(), shard); + shardManager.tell(new ActorInitialized(shard), ActorRef.noSender()); waitForShardInitialized(shardManager, "people", kit); waitForShardInitialized(shardManager, "default", kit); @@ -2017,8 +2017,8 @@ public class ShardManagerTest extends AbstractClusterRefActorTest { .addShardActor("shard1", shard1).addShardActor("shard2", shard2).props()); shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); - shardManager.tell(new ActorInitialized(), shard1); - shardManager.tell(new ActorInitialized(), shard2); + shardManager.tell(new ActorInitialized(shard1), ActorRef.noSender()); + shardManager.tell(new ActorInitialized(shard2), ActorRef.noSender()); FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS); Future stopFuture = Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE); @@ -2054,7 +2054,7 @@ public class ShardManagerTest extends AbstractClusterRefActorTest { ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor)); shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); - shardManager.tell(new ActorInitialized(), respondActor); + shardManager.tell(new ActorInitialized(respondActor), ActorRef.noSender()); shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class), DataStoreVersions.CURRENT_VERSION), kit.getRef()); shardManager.tell( @@ -2086,7 +2086,7 @@ public class ShardManagerTest extends AbstractClusterRefActorTest { ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor)); shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); - shardManager.tell(new ActorInitialized(), respondActor); + shardManager.tell(new ActorInitialized(respondActor), ActorRef.noSender()); shardManager.tell(new RoleChangeNotification(memberId, null, RaftState.Follower.name()), respondActor); shardManager.tell( @@ -2109,7 +2109,7 @@ public class ShardManagerTest extends AbstractClusterRefActorTest { final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TEST_SCHEMA_CONTEXT), kit.getRef()); - shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new ActorInitialized(mockShardActor), ActorRef.noSender()); final Consumer mockCallback = mock(Consumer.class); shardManager.tell(new RegisterForShardAvailabilityChanges(mockCallback), kit.getRef()); -- 2.36.6