X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fshardmanager%2FShardManager.java;h=52f642c98c4991f66e2cf2ba4a3de60619c9d84f;hb=HEAD;hp=43446b5ce2ce760d13c29a92e0a38c79acd74d9b;hpb=3115b8171461584e85f58d87a9f179013cfbb262;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java index 43446b5ce2..adc686723b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java @@ -5,10 +5,9 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.datastore.shardmanager; -import static akka.pattern.Patterns.ask; +import static java.util.Objects.requireNonNull; import akka.actor.ActorRef; import akka.actor.Address; @@ -34,17 +33,16 @@ import akka.persistence.SnapshotOffer; import akka.persistence.SnapshotSelectionCriteria; import akka.util.Timeout; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.SettableFuture; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; @@ -52,22 +50,18 @@ import java.util.function.Supplier; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; import org.opendaylight.controller.cluster.common.actor.Dispatchers; -import org.opendaylight.controller.cluster.datastore.AbstractDataStore; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.DatastoreContext; -import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory; import org.opendaylight.controller.cluster.datastore.Shard; import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration; -import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration; import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; -import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica; import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus; import org.opendaylight.controller.cluster.datastore.messages.CreateShard; @@ -81,13 +75,11 @@ import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary; import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound; -import org.opendaylight.controller.cluster.datastore.messages.RemovePrefixShardReplica; import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica; import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot; -import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; import org.opendaylight.controller.cluster.datastore.utils.CompositeOnComplete; import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; @@ -108,14 +100,9 @@ import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply; import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; -import org.opendaylight.controller.cluster.sharding.PrefixedShardConfigUpdateHandler; -import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener; -import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated; -import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.yangtools.concepts.Registration; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.common.Empty; +import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.ExecutionContext; @@ -155,14 +142,14 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private DatastoreContextFactory datastoreContextFactory; - private final CountDownLatch waitTillReadyCountdownLatch; + private final SettableFuture readinessFuture; private final PrimaryShardInfoFutureCache primaryShardInfoCache; @VisibleForTesting final ShardPeerAddressResolver peerAddressResolver; - private SchemaContext schemaContext; + private EffectiveModelContext modelContext; private DatastoreSnapshot restoreFromSnapshot; @@ -175,20 +162,18 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final Set> shardAvailabilityCallbacks = new HashSet<>(); private final String persistenceId; - private final AbstractDataStore dataStore; - - private PrefixedShardConfigUpdateHandler configUpdateHandler; + @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", justification = "Akka class design") ShardManager(final AbstractShardManagerCreator builder) { - this.cluster = builder.getCluster(); - this.configuration = builder.getConfiguration(); - this.datastoreContextFactory = builder.getDatastoreContextFactory(); - this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName(); - this.shardDispatcherPath = - new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard); - this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountDownLatch(); - this.primaryShardInfoCache = builder.getPrimaryShardInfoCache(); - this.restoreFromSnapshot = builder.getRestoreFromSnapshot(); + cluster = builder.getCluster(); + configuration = builder.getConfiguration(); + datastoreContextFactory = builder.getDatastoreContextFactory(); + type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName(); + shardDispatcherPath = new Dispatchers(context().system().dispatchers()) + .getDispatcherPath(Dispatchers.DispatcherType.Shard); + readinessFuture = builder.getReadinessFuture(); + primaryShardInfoCache = builder.getPrimaryShardInfoCache(); + restoreFromSnapshot = builder.getRestoreFromSnapshot(); String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId(); persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type; @@ -199,11 +184,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { cluster.subscribeToMemberEvents(getSelf()); shardManagerMBean = new ShardManagerInfo(getSelf(), cluster.getCurrentMemberName(), - "shard-manager-" + this.type, + "shard-manager-" + type, datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType()); shardManagerMBean.registerMBean(); - - dataStore = builder.getDistributedDataStore(); } @Override @@ -220,95 +203,80 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override public void handleCommand(final Object message) throws Exception { - if (message instanceof FindPrimary) { - findPrimary((FindPrimary)message); - } else if (message instanceof FindLocalShard) { - findLocalShard((FindLocalShard) message); - } else if (message instanceof UpdateSchemaContext) { - updateSchemaContext(message); - } else if (message instanceof ActorInitialized) { - onActorInitialized(message); - } else if (message instanceof ClusterEvent.MemberUp) { - memberUp((ClusterEvent.MemberUp) message); - } else if (message instanceof ClusterEvent.MemberWeaklyUp) { - memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message); - } else if (message instanceof ClusterEvent.MemberExited) { - memberExited((ClusterEvent.MemberExited) message); - } else if (message instanceof ClusterEvent.MemberRemoved) { - memberRemoved((ClusterEvent.MemberRemoved) message); - } else if (message instanceof ClusterEvent.UnreachableMember) { - memberUnreachable((ClusterEvent.UnreachableMember) message); - } else if (message instanceof ClusterEvent.ReachableMember) { - memberReachable((ClusterEvent.ReachableMember) message); - } else if (message instanceof DatastoreContextFactory) { - onDatastoreContextFactory((DatastoreContextFactory) message); - } else if (message instanceof RoleChangeNotification) { - onRoleChangeNotification((RoleChangeNotification) message); - } else if (message instanceof FollowerInitialSyncUpStatus) { - onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message); - } else if (message instanceof ShardNotInitializedTimeout) { - onShardNotInitializedTimeout((ShardNotInitializedTimeout) message); - } else if (message instanceof ShardLeaderStateChanged) { - onLeaderStateChanged((ShardLeaderStateChanged) message); - } else if (message instanceof SwitchShardBehavior) { - onSwitchShardBehavior((SwitchShardBehavior) message); - } else if (message instanceof CreateShard) { - onCreateShard((CreateShard)message); - } else if (message instanceof AddShardReplica) { - onAddShardReplica((AddShardReplica) message); - } else if (message instanceof AddPrefixShardReplica) { - onAddPrefixShardReplica((AddPrefixShardReplica) message); - } else if (message instanceof PrefixShardCreated) { - onPrefixShardCreated((PrefixShardCreated) message); - } else if (message instanceof PrefixShardRemoved) { - onPrefixShardRemoved((PrefixShardRemoved) message); - } else if (message instanceof InitConfigListener) { - onInitConfigListener(); - } else if (message instanceof ForwardedAddServerReply) { - ForwardedAddServerReply msg = (ForwardedAddServerReply)message; - onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath, - msg.removeShardOnFailure); - } else if (message instanceof ForwardedAddServerFailure) { - ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message; + if (message instanceof FindPrimary msg) { + findPrimary(msg); + } else if (message instanceof FindLocalShard msg) { + findLocalShard(msg); + } else if (message instanceof UpdateSchemaContext msg) { + updateSchemaContext(msg); + } else if (message instanceof ActorInitialized msg) { + onActorInitialized(msg); + } else if (message instanceof ClusterEvent.MemberUp msg) { + memberUp(msg); + } else if (message instanceof ClusterEvent.MemberWeaklyUp msg) { + memberWeaklyUp(msg); + } else if (message instanceof ClusterEvent.MemberExited msg) { + memberExited(msg); + } else if (message instanceof ClusterEvent.MemberRemoved msg) { + memberRemoved(msg); + } else if (message instanceof ClusterEvent.UnreachableMember msg) { + memberUnreachable(msg); + } else if (message instanceof ClusterEvent.ReachableMember msg) { + memberReachable(msg); + } else if (message instanceof DatastoreContextFactory msg) { + onDatastoreContextFactory(msg); + } else if (message instanceof RoleChangeNotification msg) { + onRoleChangeNotification(msg); + } else if (message instanceof FollowerInitialSyncUpStatus msg) { + onFollowerInitialSyncStatus(msg); + } else if (message instanceof ShardNotInitializedTimeout msg) { + onShardNotInitializedTimeout(msg); + } else if (message instanceof ShardLeaderStateChanged msg) { + onLeaderStateChanged(msg); + } else if (message instanceof SwitchShardBehavior msg) { + onSwitchShardBehavior(msg); + } else if (message instanceof CreateShard msg) { + onCreateShard(msg); + } else if (message instanceof AddShardReplica msg) { + onAddShardReplica(msg); + } else if (message instanceof ForwardedAddServerReply msg) { + onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath, msg.removeShardOnFailure); + } else if (message instanceof ForwardedAddServerFailure msg) { onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure); - } else if (message instanceof RemoveShardReplica) { - onRemoveShardReplica((RemoveShardReplica) message); - } else if (message instanceof RemovePrefixShardReplica) { - onRemovePrefixShardReplica((RemovePrefixShardReplica) message); - } else if (message instanceof WrappedShardResponse) { - onWrappedShardResponse((WrappedShardResponse) message); - } else if (message instanceof GetSnapshot) { - onGetSnapshot(); - } else if (message instanceof ServerRemoved) { - onShardReplicaRemoved((ServerRemoved) message); - } else if (message instanceof ChangeShardMembersVotingStatus) { - onChangeShardServersVotingStatus((ChangeShardMembersVotingStatus) message); - } else if (message instanceof FlipShardMembersVotingStatus) { - onFlipShardMembersVotingStatus((FlipShardMembersVotingStatus) message); - } else if (message instanceof SaveSnapshotSuccess) { - onSaveSnapshotSuccess((SaveSnapshotSuccess) message); - } else if (message instanceof SaveSnapshotFailure) { - LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards", persistenceId(), - ((SaveSnapshotFailure) message).cause()); + } else if (message instanceof RemoveShardReplica msg) { + onRemoveShardReplica(msg); + } else if (message instanceof WrappedShardResponse msg) { + onWrappedShardResponse(msg); + } else if (message instanceof GetSnapshot msg) { + onGetSnapshot(msg); + } else if (message instanceof ServerRemoved msg) { + onShardReplicaRemoved(msg); + } else if (message instanceof ChangeShardMembersVotingStatus msg) { + onChangeShardServersVotingStatus(msg); + } else if (message instanceof FlipShardMembersVotingStatus msg) { + onFlipShardMembersVotingStatus(msg); + } else if (message instanceof SaveSnapshotSuccess msg) { + onSaveSnapshotSuccess(msg); + } else if (message instanceof SaveSnapshotFailure msg) { + LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards", persistenceId(), msg.cause()); } else if (message instanceof Shutdown) { onShutDown(); } else if (message instanceof GetLocalShardIds) { onGetLocalShardIds(); - } else if (message instanceof GetShardRole) { - onGetShardRole((GetShardRole) message); - } else if (message instanceof RunnableMessage) { - ((RunnableMessage)message).run(); - } else if (message instanceof RegisterForShardAvailabilityChanges) { - onRegisterForShardAvailabilityChanges((RegisterForShardAvailabilityChanges)message); - } else if (message instanceof DeleteSnapshotsFailure) { - LOG.warn("{}: Failed to delete prior snapshots", persistenceId(), - ((DeleteSnapshotsFailure) message).cause()); + } else if (message instanceof GetShardRole msg) { + onGetShardRole(msg); + } else if (message instanceof RunnableMessage msg) { + msg.run(); + } else if (message instanceof RegisterForShardAvailabilityChanges msg) { + onRegisterForShardAvailabilityChanges(msg); + } else if (message instanceof DeleteSnapshotsFailure msg) { + LOG.warn("{}: Failed to delete prior snapshots", persistenceId(), msg.cause()); } else if (message instanceof DeleteSnapshotsSuccess) { LOG.debug("{}: Successfully deleted prior snapshots", persistenceId()); } else if (message instanceof RegisterRoleChangeListenerReply) { LOG.trace("{}: Received RegisterRoleChangeListenerReply", persistenceId()); - } else if (message instanceof ClusterEvent.MemberEvent) { - LOG.trace("{}: Received other ClusterEvent.MemberEvent: {}", persistenceId(), message); + } else if (message instanceof ClusterEvent.MemberEvent msg) { + LOG.trace("{}: Received other ClusterEvent.MemberEvent: {}", persistenceId(), msg); } else { unknownMessage(message); } @@ -341,22 +309,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { getSender().tell(new GetShardRoleReply(shardInformation.getRole()), ActorRef.noSender()); } - private void onInitConfigListener() { - LOG.debug("{}: Initializing config listener on {}", persistenceId(), cluster.getCurrentMemberName()); - - final org.opendaylight.mdsal.common.api.LogicalDatastoreType datastoreType = - org.opendaylight.mdsal.common.api.LogicalDatastoreType - .valueOf(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType().name()); - - if (configUpdateHandler != null) { - configUpdateHandler.close(); - } - - configUpdateHandler = new PrefixedShardConfigUpdateHandler(self(), cluster.getCurrentMemberName()); - configUpdateHandler.initListener(dataStore, datastoreType); - } - - private void onShutDown() { + void onShutDown() { List> stopFutures = new ArrayList<>(localShards.size()); for (ShardInformation info : localShards.values()) { if (info.getActor() != null) { @@ -425,47 +378,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - private void removePrefixShardReplica(final RemovePrefixShardReplica contextMessage, final String shardName, - final String primaryPath, final ActorRef sender) { - if (isShardReplicaOperationInProgress(shardName, sender)) { - return; - } - - shardReplicaOperationsInProgress.add(shardName); - - final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName); - - final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build(); - - //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message - LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(), - primaryPath, shardId); - - Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration()); - Future futureObj = ask(getContext().actorSelection(primaryPath), - new RemoveServer(shardId.toString()), removeServerTimeout); - - futureObj.onComplete(new OnComplete() { - @Override - public void onComplete(final Throwable failure, final Object response) { - if (failure != null) { - shardReplicaOperationsInProgress.remove(shardName); - - LOG.debug("{}: RemoveServer request to leader {} for shard {} failed", persistenceId(), primaryPath, - shardName, failure); - - // FAILURE - sender.tell(new Status.Failure(new RuntimeException( - String.format("RemoveServer request to leader %s for shard %s failed", primaryPath, shardName), - failure)), self()); - } else { - // SUCCESS - self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender); - } - } - }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); - } - private void removeShardReplica(final RemoveShardReplica contextMessage, final String shardName, final String primaryPath, final ActorRef sender) { if (isShardReplicaOperationInProgress(shardName, sender)) { @@ -483,10 +395,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { primaryPath, shardId); Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration()); - Future futureObj = ask(getContext().actorSelection(primaryPath), + Future futureObj = Patterns.ask(getContext().actorSelection(primaryPath), new RemoveServer(shardId.toString()), removeServerTimeout); - futureObj.onComplete(new OnComplete() { + futureObj.onComplete(new OnComplete<>() { @Override public void onComplete(final Throwable failure, final Object response) { if (failure != null) { @@ -530,7 +442,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { final Future stopFuture = Patterns.gracefulStop(shardActor, FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS), Shutdown.INSTANCE); - final CompositeOnComplete onComplete = new CompositeOnComplete() { + final CompositeOnComplete onComplete = new CompositeOnComplete<>() { @Override public void onComplete(final Throwable failure, final Boolean result) { if (failure == null) { @@ -558,7 +470,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { persistShardList(); } - private void onGetSnapshot() { + private void onGetSnapshot(final GetSnapshot getSnapshot) { LOG.debug("{}: onGetSnapshot", persistenceId()); List notInitialized = null; @@ -583,7 +495,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration())); for (ShardInformation shardInfo: localShards.values()) { - shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor); + shardInfo.getActor().tell(getSnapshot, replyActor); } } @@ -611,32 +523,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - private void onPrefixShardCreated(final PrefixShardCreated message) { - LOG.debug("{}: onPrefixShardCreated: {}", persistenceId(), message); - - final PrefixShardConfiguration config = message.getConfiguration(); - final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), - ClusterUtils.getCleanShardName(config.getPrefix().getRootIdentifier())); - final String shardName = shardId.getShardName(); - - if (isPreviousShardActorStopInProgress(shardName, message)) { - return; - } - - if (localShards.containsKey(shardName)) { - LOG.debug("{}: Received create for an already existing shard {}", persistenceId(), shardName); - final PrefixShardConfiguration existing = - configuration.getAllPrefixShardConfigurations().get(config.getPrefix()); - - if (existing != null && existing.equals(config)) { - // we don't have to do nothing here - return; - } - } - - doCreatePrefixShard(config, shardId, shardName); - } - private boolean isPreviousShardActorStopInProgress(final String shardName, final Object messageToDefer) { final CompositeOnComplete stopOnComplete = shardActorsStopping.get(shardName); if (stopOnComplete == null) { @@ -657,43 +543,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { return true; } - private void doCreatePrefixShard(final PrefixShardConfiguration config, final ShardIdentifier shardId, - final String shardName) { - configuration.addPrefixShardConfiguration(config); - - final Builder builder = newShardDatastoreContextBuilder(shardName); - builder.logicalStoreType(config.getPrefix().getDatastoreType()) - .storeRoot(config.getPrefix().getRootIdentifier()); - DatastoreContext shardDatastoreContext = builder.build(); - - final Map peerAddresses = getPeerAddresses(shardName); - final boolean isActiveMember = true; - - LOG.debug("{} doCreatePrefixShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}", - persistenceId(), shardId, config.getShardMemberNames(), peerAddresses, isActiveMember); - - final ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses, - shardDatastoreContext, Shard.builder(), peerAddressResolver); - info.setActiveMember(isActiveMember); - localShards.put(info.getShardName(), info); - - if (schemaContext != null) { - info.setSchemaContext(schemaContext); - info.setActor(newShardActor(info)); - } - } - - private void onPrefixShardRemoved(final PrefixShardRemoved message) { - LOG.debug("{}: onPrefixShardRemoved : {}", persistenceId(), message); - - final DOMDataTreeIdentifier prefix = message.getPrefix(); - final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), - ClusterUtils.getCleanShardName(prefix.getRootIdentifier())); - - configuration.removePrefixShardConfiguration(prefix); - removeShard(shardId); - } - private void doCreateShard(final CreateShard createShard) { final ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig(); final String shardName = moduleShardConfig.getShardName(); @@ -725,7 +574,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { // the shard with no peers and with elections disabled so it stays as follower. A // subsequent AddServer request will be needed to make it an active member. isActiveMember = false; - peerAddresses = Collections.emptyMap(); + peerAddresses = Map.of(); shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext) .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build(); } @@ -739,8 +588,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { info.setActiveMember(isActiveMember); localShards.put(info.getShardName(), info); - if (schemaContext != null) { - info.setSchemaContext(schemaContext); + if (modelContext != null) { + info.setSchemaContext(modelContext); info.setActor(newShardActor(info)); } } @@ -756,10 +605,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void checkReady() { if (isReadyWithLeaderId()) { - LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}", - persistenceId(), type, waitTillReadyCountdownLatch.getCount()); - - waitTillReadyCountdownLatch.countDown(); + LOG.info("{}: All Shards are ready - data store {} is ready", persistenceId(), type); + readinessFuture.set(Empty.value()); } } @@ -768,7 +615,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId()); if (shardInformation != null) { - shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree()); + shardInformation.setLocalDataTree(leaderStateChanged.localShardDataTree()); shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion()); if (shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) { primaryShardInfoCache.remove(shardInformation.getShardName()); @@ -860,13 +707,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { return true; } - private void onActorInitialized(final Object message) { - final ActorRef sender = getSender(); - - if (sender == null) { - // why is a non-actor sending this message? Just ignore. - return; - } + private void onActorInitialized(final ActorInitialized message) { + final var sender = message.actorRef(); String actorName = sender.path().name(); //find shard name from actor name; actor name is stringified shardId @@ -897,8 +739,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { protected void handleRecover(final Object message) throws Exception { if (message instanceof RecoveryCompleted) { onRecoveryCompleted(); - } else if (message instanceof SnapshotOffer) { - applyShardManagerSnapshot((ShardManagerSnapshot)((SnapshotOffer) message).snapshot()); + } else if (message instanceof SnapshotOffer msg) { + applyShardManagerSnapshot((ShardManagerSnapshot) msg.snapshot()); } } @@ -984,10 +826,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { message.member().address()); peerAddressResolver.removePeerAddress(memberName); - - for (ShardInformation info : localShards.values()) { - info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf()); - } } private void memberExited(final ClusterEvent.MemberExited message) { @@ -997,10 +835,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { message.member().address()); peerAddressResolver.removePeerAddress(memberName); - - for (ShardInformation info : localShards.values()) { - info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf()); - } } private void memberUp(final ClusterEvent.MemberUp message) { @@ -1033,8 +867,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { String shardName = info.getShardName(); String peerId = getShardIdentifier(memberName, shardName).toString(); info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf()); - - info.peerUp(memberName, peerId, getSelf()); } } @@ -1065,8 +897,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { notifyShardAvailabilityCallbacks(info); } - - info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf()); } } @@ -1077,8 +907,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug("Marking Leader {} as available.", leaderId); info.setLeaderAvailable(true); } - - info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf()); } } @@ -1135,13 +963,13 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { * * @param message the message to send */ - private void updateSchemaContext(final Object message) { - schemaContext = ((UpdateSchemaContext) message).getSchemaContext(); + private void updateSchemaContext(final UpdateSchemaContext message) { + modelContext = message.modelContext(); - LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getModules().size()); + LOG.debug("Got updated SchemaContext: # of modules {}", modelContext.getModules().size()); for (ShardInformation info : localShards.values()) { - info.setSchemaContext(schemaContext); + info.setSchemaContext(modelContext); if (info.getActor() == null) { LOG.debug("Creating Shard {}", info.getShardId()); @@ -1153,7 +981,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { String peerId = getShardIdentifier(memberName, shardName).toString() ; String peerAddress = peerAddressResolver.getShardActorAddress(shardName, memberName); info.updatePeerAddress(peerId, peerAddress, getSelf()); - info.peerUp(memberName, peerId, getSelf()); LOG.debug("{}: updated peer {} on member {} with address {} on shard {} whose actor address is {}", persistenceId(), peerId, memberName, peerAddress, info.getShardId(), info.getActor()); } @@ -1186,7 +1013,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { sendResponse(info, message.isWaitUntilReady(), true, () -> { String primaryPath = info.getSerializedLeaderActor(); Object found = canReturnLocalShardState && info.isLeader() - ? new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) : + ? new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().orElseThrow()) : new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion()); LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found); @@ -1228,20 +1055,18 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext() .getShardInitializationTimeout().duration().$times(2)); - Future futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout); - futureObj.onComplete(new OnComplete() { + Future futureObj = Patterns.ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout); + futureObj.onComplete(new OnComplete<>() { @Override public void onComplete(final Throwable failure, final Object response) { if (failure != null) { handler.onFailure(failure); + } else if (response instanceof RemotePrimaryShardFound msg) { + handler.onRemotePrimaryShardFound(msg); + } else if (response instanceof LocalPrimaryShardFound msg) { + handler.onLocalPrimaryFound(msg); } else { - if (response instanceof RemotePrimaryShardFound) { - handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response); - } else if (response instanceof LocalPrimaryShardFound) { - handler.onLocalPrimaryFound((LocalPrimaryShardFound) response); - } else { - handler.onUnknownResponse(response); - } + handler.onUnknownResponse(response); } } }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); @@ -1263,8 +1088,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { * Create shards that are local to the member on which the ShardManager runs. */ private void createLocalShards() { - MemberName memberName = this.cluster.getCurrentMemberName(); - Collection memberShardNames = this.configuration.getMemberShardNames(memberName); + MemberName memberName = cluster.getCurrentMemberName(); + Collection memberShardNames = configuration.getMemberShardNames(memberName); Map shardSnapshots = new HashMap<>(); if (restoreFromSnapshot != null) { @@ -1288,10 +1113,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } @VisibleForTesting - ShardInformation createShardInfoFor(String shardName, ShardIdentifier shardId, - Map peerAddresses, - DatastoreContext datastoreContext, - Map shardSnapshots) { + ShardInformation createShardInfoFor(final String shardName, final ShardIdentifier shardId, + final Map peerAddresses, + final DatastoreContext datastoreContext, + final Map shardSnapshots) { return new ShardInformation(shardName, shardId, peerAddresses, datastoreContext, Shard.builder().restoreFromSnapshot(shardSnapshots.get(shardName)), peerAddressResolver); @@ -1309,7 +1134,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private Map getPeerAddresses(final String shardName, final Collection members) { Map peerAddresses = new HashMap<>(); - MemberName currentMemberName = this.cluster.getCurrentMemberName(); + MemberName currentMemberName = cluster.getCurrentMemberName(); for (MemberName memberName : members) { if (!currentMemberName.equals(memberName)) { @@ -1352,48 +1177,13 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { return false; } - private void onAddPrefixShardReplica(final AddPrefixShardReplica message) { - LOG.debug("{}: onAddPrefixShardReplica: {}", persistenceId(), message); - - final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), - ClusterUtils.getCleanShardName(message.getShardPrefix())); - final String shardName = shardId.getShardName(); - - // Create the localShard - if (schemaContext == null) { - LOG.debug("{}: No SchemaContext is available in order to create a local shard instance for {}", - persistenceId(), shardName); - getSender().tell(new Status.Failure(new IllegalStateException( - "No SchemaContext is available in order to create a local shard instance for " + shardName)), - getSelf()); - return; - } - - findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), - getSelf()) { - @Override - public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) { - final RunnableMessage runnable = (RunnableMessage) () -> addPrefixShard(getShardName(), - message.getShardPrefix(), response, getSender()); - if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) { - getSelf().tell(runnable, getTargetActor()); - } - } - - @Override - public void onLocalPrimaryFound(final LocalPrimaryShardFound message) { - sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor()); - } - }); - } - private void onAddShardReplica(final AddShardReplica shardReplicaMsg) { final String shardName = shardReplicaMsg.getShardName(); LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg); // verify the shard with the specified name is present in the cluster configuration - if (!this.configuration.isShardConfigured(shardName)) { + if (!configuration.isShardConfigured(shardName)) { LOG.debug("{}: No module configuration exists for shard {}", persistenceId(), shardName); getSender().tell(new Status.Failure(new IllegalArgumentException( "No module configuration exists for shard " + shardName)), getSelf()); @@ -1401,7 +1191,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } // Create the localShard - if (schemaContext == null) { + if (modelContext == null) { LOG.debug("{}: No SchemaContext is available in order to create a local shard instance for {}", persistenceId(), shardName); getSender().tell(new Status.Failure(new IllegalStateException( @@ -1434,40 +1224,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { String.format("Local shard %s already exists", shardName))), getSelf()); } - private void addPrefixShard(final String shardName, final YangInstanceIdentifier shardPrefix, - final RemotePrimaryShardFound response, final ActorRef sender) { - if (isShardReplicaOperationInProgress(shardName, sender)) { - return; - } - - shardReplicaOperationsInProgress.add(shardName); - - final ShardInformation shardInfo; - final boolean removeShardOnFailure; - ShardInformation existingShardInfo = localShards.get(shardName); - if (existingShardInfo == null) { - removeShardOnFailure = true; - ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName); - - final Builder builder = newShardDatastoreContextBuilder(shardName); - builder.storeRoot(shardPrefix).customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()); - - DatastoreContext datastoreContext = builder.build(); - - shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext, - Shard.builder(), peerAddressResolver); - shardInfo.setActiveMember(false); - shardInfo.setSchemaContext(schemaContext); - localShards.put(shardName, shardInfo); - shardInfo.setActor(newShardActor(shardInfo)); - } else { - removeShardOnFailure = false; - shardInfo = existingShardInfo; - } - - execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender); - } - private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) { if (isShardReplicaOperationInProgress(shardName, sender)) { return; @@ -1488,7 +1244,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext, Shard.builder(), peerAddressResolver); shardInfo.setActiveMember(false); - shardInfo.setSchemaContext(schemaContext); + shardInfo.setSchemaContext(modelContext); localShards.put(shardName, shardInfo); shardInfo.setActor(newShardActor(shardInfo)); } else { @@ -1514,10 +1270,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { final Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext() .getShardLeaderElectionTimeout().duration()); - final Future futureObj = ask(getContext().actorSelection(response.getPrimaryPath()), + final Future futureObj = Patterns.ask(getContext().actorSelection(response.getPrimaryPath()), new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout); - futureObj.onComplete(new OnComplete() { + futureObj.onComplete(new OnComplete<>() { @Override public void onComplete(final Throwable failure, final Object addServerResponse) { if (failure != null) { @@ -1581,21 +1337,18 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private static Exception getServerChangeException(final Class serverChange, final ServerChangeStatus serverChangeStatus, final String leaderPath, final ShardIdentifier shardId) { - switch (serverChangeStatus) { - case TIMEOUT: - return new TimeoutException(String.format( - "The shard leader %s timed out trying to replicate the initial data to the new shard %s." - + "Possible causes - there was a problem replicating the data or shard leadership changed " - + "while replicating the shard data", leaderPath, shardId.getShardName())); - case NO_LEADER: - return new NoShardLeaderException(shardId); - case NOT_SUPPORTED: - return new UnsupportedOperationException(String.format("%s request is not supported for shard %s", - serverChange.getSimpleName(), shardId.getShardName())); - default : - return new RuntimeException(String.format("%s request to leader %s for shard %s failed with status %s", - serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus)); - } + return switch (serverChangeStatus) { + case TIMEOUT -> new TimeoutException(""" + The shard leader %s timed out trying to replicate the initial data to the new shard %s. Possible \ + causes - there was a problem replicating the data or shard leadership changed while replicating the \ + shard data""".formatted(leaderPath, shardId.getShardName())); + case NO_LEADER -> new NoShardLeaderException(shardId); + case NOT_SUPPORTED -> new UnsupportedOperationException( + "%s request is not supported for shard %s".formatted( + serverChange.getSimpleName(), shardId.getShardName())); + default -> new RuntimeException("%s request to leader %s for shard %s failed with status %s".formatted( + serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus)); + }; } private void onRemoveShardReplica(final RemoveShardReplica shardReplicaMsg) { @@ -1620,32 +1373,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { }); } - private void onRemovePrefixShardReplica(final RemovePrefixShardReplica message) { - LOG.debug("{}: onRemovePrefixShardReplica: {}", persistenceId(), message); - - final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), - ClusterUtils.getCleanShardName(message.getShardPrefix())); - final String shardName = shardId.getShardName(); - - findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), - shardName, persistenceId(), getSelf()) { - @Override - public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) { - doRemoveShardReplicaAsync(response.getPrimaryPath()); - } - - @Override - public void onLocalPrimaryFound(final LocalPrimaryShardFound response) { - doRemoveShardReplicaAsync(response.getPrimaryPath()); - } - - private void doRemoveShardReplicaAsync(final String primaryPath) { - getSelf().tell((RunnableMessage) () -> removePrefixShardReplica(message, getShardName(), - primaryPath, getSender()), getTargetActor()); - } - }); - } - private void persistShardList() { List shardList = new ArrayList<>(localShards.keySet()); for (ShardInformation shardInfo : localShards.values()) { @@ -1654,13 +1381,11 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } LOG.debug("{}: persisting the shard list {}", persistenceId(), shardList); - saveSnapshot(updateShardManagerSnapshot(shardList, configuration.getAllPrefixShardConfigurations())); + saveSnapshot(updateShardManagerSnapshot(shardList)); } - private ShardManagerSnapshot updateShardManagerSnapshot( - final List shardList, - final Map allPrefixShardConfigurations) { - currentSnapshot = new ShardManagerSnapshot(shardList, allPrefixShardConfigurations); + private ShardManagerSnapshot updateShardManagerSnapshot(final List shardList) { + currentSnapshot = new ShardManagerSnapshot(shardList); return currentSnapshot; } @@ -1718,10 +1443,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { ActorRef sender = getSender(); final String shardName = flipMembersVotingStatus.getShardName(); findLocalShard(shardName, sender, localShardFound -> { - Future future = ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE, + Future future = Patterns.ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE, Timeout.apply(30, TimeUnit.SECONDS)); - future.onComplete(new OnComplete() { + future.onComplete(new OnComplete<>() { @Override public void onComplete(final Throwable failure, final Object response) { if (failure != null) { @@ -1769,31 +1494,27 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { Timeout findLocalTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext() .getShardInitializationTimeout().duration().$times(2)); - Future futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout); - futureObj.onComplete(new OnComplete() { + Future futureObj = Patterns.ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout); + futureObj.onComplete(new OnComplete<>() { @Override public void onComplete(final Throwable failure, final Object response) { if (failure != null) { LOG.debug("{}: Received failure from FindLocalShard for shard {}", persistenceId, shardName, - failure); + failure); sender.tell(new Status.Failure(new RuntimeException( - String.format("Failed to find local shard %s", shardName), failure)), self()); + String.format("Failed to find local shard %s", shardName), failure)), self()); + } if (response instanceof LocalShardFound msg) { + getSelf().tell((RunnableMessage) () -> onLocalShardFound.accept(msg), sender); + } else if (response instanceof LocalShardNotFound) { + LOG.debug("{}: Local shard {} does not exist", persistenceId, shardName); + sender.tell(new Status.Failure(new IllegalArgumentException( + String.format("Local shard %s does not exist", shardName))), self()); } else { - if (response instanceof LocalShardFound) { - getSelf().tell((RunnableMessage) () -> onLocalShardFound.accept((LocalShardFound) response), - sender); - } else if (response instanceof LocalShardNotFound) { - LOG.debug("{}: Local shard {} does not exist", persistenceId, shardName); - sender.tell(new Status.Failure(new IllegalArgumentException( - String.format("Local shard %s does not exist", shardName))), self()); - } else { - LOG.debug("{}: Failed to find local shard {}: received response: {}", persistenceId, shardName, - response); - sender.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response - : new RuntimeException( - String.format("Failed to find local shard %s: received response: %s", shardName, - response))), self()); - } + LOG.debug("{}: Failed to find local shard {}: received response: {}", persistenceId, shardName, + response); + sender.tell(new Status.Failure(response instanceof Throwable throwable ? throwable + : new RuntimeException(String.format("Failed to find local shard %s: received response: %s", + shardName, response))), self()); } } }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); @@ -1814,9 +1535,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { changeServersVotingStatus, shardActorRef.path()); Timeout timeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2)); - Future futureObj = ask(shardActorRef, changeServersVotingStatus, timeout); + Future futureObj = Patterns.ask(shardActorRef, changeServersVotingStatus, timeout); - futureObj.onComplete(new OnComplete() { + futureObj.onComplete(new OnComplete<>() { @Override public void onComplete(final Throwable failure, final Object response) { shardReplicaOperationsInProgress.remove(shardName); @@ -1964,10 +1685,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { */ protected AutoFindPrimaryFailureResponseHandler(final ActorRef targetActor, final String shardName, final String persistenceId, final ActorRef shardManagerActor) { - this.targetActor = Preconditions.checkNotNull(targetActor); - this.shardName = Preconditions.checkNotNull(shardName); - this.persistenceId = Preconditions.checkNotNull(persistenceId); - this.shardManagerActor = Preconditions.checkNotNull(shardManagerActor); + this.targetActor = requireNonNull(targetActor); + this.shardName = requireNonNull(shardName); + this.persistenceId = requireNonNull(persistenceId); + this.shardManagerActor = requireNonNull(shardManagerActor); } public ActorRef getTargetActor() {