X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fshardmanager%2FShardManager.java;h=93b57eda2864d416d0f4cd548264a141fc5640b6;hb=99f80f27bee37bb23e345420bf14bb7bb4793c28;hp=fdd66c9f56a7c6a9b81b04aabd906575a909b311;hpb=f41c5e6e6f6e10b36b1e4b1992877e38e718c8fb;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java index fdd66c9f56..93b57eda28 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java @@ -5,10 +5,10 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.datastore.shardmanager; import static akka.pattern.Patterns.ask; +import static java.util.Objects.requireNonNull; import akka.actor.ActorRef; import akka.actor.Address; @@ -34,17 +34,16 @@ import akka.persistence.SnapshotOffer; import akka.persistence.SnapshotSelectionCriteria; import akka.util.Timeout; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.SettableFuture; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; @@ -52,22 +51,18 @@ import java.util.function.Supplier; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; import org.opendaylight.controller.cluster.common.actor.Dispatchers; -import org.opendaylight.controller.cluster.datastore.AbstractDataStore; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.DatastoreContext; -import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory; import org.opendaylight.controller.cluster.datastore.Shard; import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration; -import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration; import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; -import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica; import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus; import org.opendaylight.controller.cluster.datastore.messages.CreateShard; @@ -81,13 +76,11 @@ import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary; import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound; -import org.opendaylight.controller.cluster.datastore.messages.RemovePrefixShardReplica; import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica; import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot; -import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; import org.opendaylight.controller.cluster.datastore.utils.CompositeOnComplete; import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; @@ -108,20 +101,13 @@ import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply; import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; -import org.opendaylight.controller.cluster.sharding.PrefixedShardConfigUpdateHandler; -import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener; -import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated; -import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved; -import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; -import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.concepts.Registration; +import org.opendaylight.yangtools.yang.common.Empty; +import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.ExecutionContext; import scala.concurrent.Future; -import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; /** @@ -139,7 +125,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { // Stores a mapping between a shard name and it's corresponding information // Shard names look like inventory, topology etc and are as specified in // configuration - private final Map localShards = new HashMap<>(); + @VisibleForTesting + final Map localShards = new HashMap<>(); // The type of a ShardManager reflects the type of the datastore itself // A data store could be of type config/operational @@ -149,19 +136,21 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final Configuration configuration; - private final String shardDispatcherPath; + @VisibleForTesting + final String shardDispatcherPath; private final ShardManagerInfo shardManagerMBean; private DatastoreContextFactory datastoreContextFactory; - private final CountDownLatch waitTillReadyCountdownLatch; + private final SettableFuture readinessFuture; private final PrimaryShardInfoFutureCache primaryShardInfoCache; - private final ShardPeerAddressResolver peerAddressResolver; + @VisibleForTesting + final ShardPeerAddressResolver peerAddressResolver; - private SchemaContext schemaContext; + private EffectiveModelContext schemaContext; private DatastoreSnapshot restoreFromSnapshot; @@ -171,22 +160,21 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final Map> shardActorsStopping = new HashMap<>(); - private final String persistenceId; - private final AbstractDataStore dataStore; + private final Set> shardAvailabilityCallbacks = new HashSet<>(); - private ListenerRegistration configListenerReg = null; - private PrefixedShardConfigUpdateHandler configUpdateHandler; + private final String persistenceId; + @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", justification = "Akka class design") ShardManager(final AbstractShardManagerCreator builder) { - this.cluster = builder.getCluster(); - this.configuration = builder.getConfiguration(); - this.datastoreContextFactory = builder.getDatastoreContextFactory(); - this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName(); - this.shardDispatcherPath = - new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard); - this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountDownLatch(); - this.primaryShardInfoCache = builder.getPrimaryShardInfoCache(); - this.restoreFromSnapshot = builder.getRestoreFromSnapshot(); + cluster = builder.getCluster(); + configuration = builder.getConfiguration(); + datastoreContextFactory = builder.getDatastoreContextFactory(); + type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName(); + shardDispatcherPath = new Dispatchers(context().system().dispatchers()) + .getDispatcherPath(Dispatchers.DispatcherType.Shard); + readinessFuture = builder.getReadinessFuture(); + primaryShardInfoCache = builder.getPrimaryShardInfoCache(); + restoreFromSnapshot = builder.getRestoreFromSnapshot(); String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId(); persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type; @@ -197,11 +185,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { cluster.subscribeToMemberEvents(getSelf()); shardManagerMBean = new ShardManagerInfo(getSelf(), cluster.getCurrentMemberName(), - "shard-manager-" + this.type, + "shard-manager-" + type, datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType()); shardManagerMBean.registerMBean(); - - dataStore = builder.getDistributedDataStore(); } @Override @@ -214,11 +200,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.info("Stopping ShardManager {}", persistenceId()); shardManagerMBean.unregisterMBean(); - - if (configListenerReg != null) { - configListenerReg.close(); - configListenerReg = null; - } } @Override @@ -259,14 +240,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { onCreateShard((CreateShard)message); } else if (message instanceof AddShardReplica) { onAddShardReplica((AddShardReplica) message); - } else if (message instanceof AddPrefixShardReplica) { - onAddPrefixShardReplica((AddPrefixShardReplica) message); - } else if (message instanceof PrefixShardCreated) { - onPrefixShardCreated((PrefixShardCreated) message); - } else if (message instanceof PrefixShardRemoved) { - onPrefixShardRemoved((PrefixShardRemoved) message); - } else if (message instanceof InitConfigListener) { - onInitConfigListener(); } else if (message instanceof ForwardedAddServerReply) { ForwardedAddServerReply msg = (ForwardedAddServerReply)message; onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath, @@ -276,12 +249,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure); } else if (message instanceof RemoveShardReplica) { onRemoveShardReplica((RemoveShardReplica) message); - } else if (message instanceof RemovePrefixShardReplica) { - onRemovePrefixShardReplica((RemovePrefixShardReplica) message); } else if (message instanceof WrappedShardResponse) { onWrappedShardResponse((WrappedShardResponse) message); } else if (message instanceof GetSnapshot) { - onGetSnapshot(); + onGetSnapshot((GetSnapshot) message); } else if (message instanceof ServerRemoved) { onShardReplicaRemoved((ServerRemoved) message); } else if (message instanceof ChangeShardMembersVotingStatus) { @@ -301,11 +272,13 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { onGetShardRole((GetShardRole) message); } else if (message instanceof RunnableMessage) { ((RunnableMessage)message).run(); + } else if (message instanceof RegisterForShardAvailabilityChanges) { + onRegisterForShardAvailabilityChanges((RegisterForShardAvailabilityChanges)message); } else if (message instanceof DeleteSnapshotsFailure) { LOG.warn("{}: Failed to delete prior snapshots", persistenceId(), ((DeleteSnapshotsFailure) message).cause()); } else if (message instanceof DeleteSnapshotsSuccess) { - LOG.debug("{}: Successfully deleted prior snapshots", persistenceId(), message); + LOG.debug("{}: Successfully deleted prior snapshots", persistenceId()); } else if (message instanceof RegisterRoleChangeListenerReply) { LOG.trace("{}: Received RegisterRoleChangeListenerReply", persistenceId()); } else if (message instanceof ClusterEvent.MemberEvent) { @@ -315,6 +288,16 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } + private void onRegisterForShardAvailabilityChanges(final RegisterForShardAvailabilityChanges message) { + LOG.debug("{}: onRegisterForShardAvailabilityChanges: {}", persistenceId(), message); + + final Consumer callback = message.getCallback(); + shardAvailabilityCallbacks.add(callback); + + getSender().tell(new Status.Success((Registration) + () -> executeInSelf(() -> shardAvailabilityCallbacks.remove(callback))), self()); + } + private void onGetShardRole(final GetShardRole message) { LOG.debug("{}: onGetShardRole for shard: {}", persistenceId(), message.getName()); @@ -332,22 +315,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { getSender().tell(new GetShardRoleReply(shardInformation.getRole()), ActorRef.noSender()); } - private void onInitConfigListener() { - LOG.debug("{}: Initializing config listener on {}", persistenceId(), cluster.getCurrentMemberName()); - - final org.opendaylight.mdsal.common.api.LogicalDatastoreType datastoreType = - org.opendaylight.mdsal.common.api.LogicalDatastoreType - .valueOf(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType().name()); - - if (configUpdateHandler != null) { - configUpdateHandler.close(); - } - - configUpdateHandler = new PrefixedShardConfigUpdateHandler(self(), cluster.getCurrentMemberName()); - configUpdateHandler.initListener(dataStore, datastoreType); - } - - private void onShutDown() { + void onShutDown() { List> stopFutures = new ArrayList<>(localShards.size()); for (ShardInformation info : localShards.values()) { if (info.getActor() != null) { @@ -416,46 +384,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - private void removePrefixShardReplica(final RemovePrefixShardReplica contextMessage, final String shardName, - final String primaryPath, final ActorRef sender) { - if (isShardReplicaOperationInProgress(shardName, sender)) { - return; - } - - shardReplicaOperationsInProgress.add(shardName); - - final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName); - - final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build(); - - //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message - LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(), - primaryPath, shardId); - - Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration()); - Future futureObj = ask(getContext().actorSelection(primaryPath), - new RemoveServer(shardId.toString()), removeServerTimeout); - - futureObj.onComplete(new OnComplete() { - @Override - public void onComplete(final Throwable failure, final Object response) { - if (failure != null) { - shardReplicaOperationsInProgress.remove(shardName); - String msg = String.format("RemoveServer request to leader %s for shard %s failed", - primaryPath, shardName); - - LOG.debug("{}: {}", persistenceId(), msg, failure); - - // FAILURE - sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self()); - } else { - // SUCCESS - self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender); - } - } - }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); - } - private void removeShardReplica(final RemoveShardReplica contextMessage, final String shardName, final String primaryPath, final ActorRef sender) { if (isShardReplicaOperationInProgress(shardName, sender)) { @@ -476,18 +404,18 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { Future futureObj = ask(getContext().actorSelection(primaryPath), new RemoveServer(shardId.toString()), removeServerTimeout); - futureObj.onComplete(new OnComplete() { + futureObj.onComplete(new OnComplete<>() { @Override public void onComplete(final Throwable failure, final Object response) { if (failure != null) { shardReplicaOperationsInProgress.remove(shardName); - String msg = String.format("RemoveServer request to leader %s for shard %s failed", - primaryPath, shardName); - - LOG.debug("{}: {}", persistenceId(), msg, failure); + LOG.debug("{}: RemoveServer request to leader {} for shard {} failed", persistenceId(), primaryPath, + shardName, failure); // FAILURE - sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self()); + sender.tell(new Status.Failure(new RuntimeException( + String.format("RemoveServer request to leader %s for shard %s failed", primaryPath, shardName), + failure)), self()); } else { // SUCCESS self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender); @@ -520,7 +448,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { final Future stopFuture = Patterns.gracefulStop(shardActor, FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS), Shutdown.INSTANCE); - final CompositeOnComplete onComplete = new CompositeOnComplete() { + final CompositeOnComplete onComplete = new CompositeOnComplete<>() { @Override public void onComplete(final Throwable failure, final Boolean result) { if (failure == null) { @@ -548,7 +476,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { persistShardList(); } - private void onGetSnapshot() { + private void onGetSnapshot(final GetSnapshot getSnapshot) { LOG.debug("{}: onGetSnapshot", persistenceId()); List notInitialized = null; @@ -573,7 +501,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration())); for (ShardInformation shardInfo: localShards.values()) { - shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor); + shardInfo.getActor().tell(getSnapshot, replyActor); } } @@ -601,32 +529,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - private void onPrefixShardCreated(final PrefixShardCreated message) { - LOG.debug("{}: onPrefixShardCreated: {}", persistenceId(), message); - - final PrefixShardConfiguration config = message.getConfiguration(); - final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), - ClusterUtils.getCleanShardName(config.getPrefix().getRootIdentifier())); - final String shardName = shardId.getShardName(); - - if (isPreviousShardActorStopInProgress(shardName, message)) { - return; - } - - if (localShards.containsKey(shardName)) { - LOG.debug("{}: Received create for an already existing shard {}", persistenceId(), shardName); - final PrefixShardConfiguration existing = - configuration.getAllPrefixShardConfigurations().get(config.getPrefix()); - - if (existing != null && existing.equals(config)) { - // we don't have to do nothing here - return; - } - } - - doCreatePrefixShard(config, shardId, shardName); - } - private boolean isPreviousShardActorStopInProgress(final String shardName, final Object messageToDefer) { final CompositeOnComplete stopOnComplete = shardActorsStopping.get(shardName); if (stopOnComplete == null) { @@ -647,43 +549,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { return true; } - private void doCreatePrefixShard(final PrefixShardConfiguration config, final ShardIdentifier shardId, - final String shardName) { - configuration.addPrefixShardConfiguration(config); - - final Builder builder = newShardDatastoreContextBuilder(shardName); - builder.logicalStoreType(config.getPrefix().getDatastoreType()) - .storeRoot(config.getPrefix().getRootIdentifier()); - DatastoreContext shardDatastoreContext = builder.build(); - - final Map peerAddresses = getPeerAddresses(shardName); - final boolean isActiveMember = true; - - LOG.debug("{} doCreatePrefixShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}", - persistenceId(), shardId, config.getShardMemberNames(), peerAddresses, isActiveMember); - - final ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses, - shardDatastoreContext, Shard.builder(), peerAddressResolver); - info.setActiveMember(isActiveMember); - localShards.put(info.getShardName(), info); - - if (schemaContext != null) { - info.setSchemaContext(schemaContext); - info.setActor(newShardActor(info)); - } - } - - private void onPrefixShardRemoved(final PrefixShardRemoved message) { - LOG.debug("{}: onPrefixShardRemoved : {}", persistenceId(), message); - - final DOMDataTreeIdentifier prefix = message.getPrefix(); - final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), - ClusterUtils.getCleanShardName(prefix.getRootIdentifier())); - - configuration.removePrefixShardConfiguration(prefix); - removeShard(shardId); - } - private void doCreateShard(final CreateShard createShard) { final ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig(); final String shardName = moduleShardConfig.getShardName(); @@ -715,7 +580,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { // the shard with no peers and with elections disabled so it stays as follower. A // subsequent AddServer request will be needed to make it an active member. isActiveMember = false; - peerAddresses = Collections.emptyMap(); + peerAddresses = Map.of(); shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext) .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build(); } @@ -746,10 +611,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void checkReady() { if (isReadyWithLeaderId()) { - LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}", - persistenceId(), type, waitTillReadyCountdownLatch.getCount()); - - waitTillReadyCountdownLatch.countDown(); + LOG.info("{}: All Shards are ready - data store {} is ready", persistenceId(), type); + readinessFuture.set(Empty.value()); } } @@ -762,6 +625,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion()); if (shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) { primaryShardInfoCache.remove(shardInformation.getShardName()); + + notifyShardAvailabilityCallbacks(shardInformation); } checkReady(); @@ -770,6 +635,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } + private void notifyShardAvailabilityCallbacks(final ShardInformation shardInformation) { + shardAvailabilityCallbacks.forEach(callback -> callback.accept(shardInformation.getShardName())); + } + private void onShardNotInitializedTimeout(final ShardNotInitializedTimeout message) { ShardInformation shardInfo = message.getShardInfo(); @@ -783,7 +652,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { message.getSender().tell(createNotInitializedException(shardInfo.getShardId()), getSelf()); } else { LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName()); - message.getSender().tell(createNoShardLeaderException(shardInfo.getShardId()), getSelf()); + message.getSender().tell(new NoShardLeaderException(shardInfo.getShardId()), getSelf()); } } @@ -859,7 +728,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { try { shardId = ShardIdentifier.fromShardIdString(actorName); } catch (IllegalArgumentException e) { - LOG.debug("{}: ignoring actor {}", actorName, e); + LOG.debug("{}: ignoring actor {}", persistenceId, actorName, e); return; } @@ -926,7 +795,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(), - shardInformation.getShardName()); + shardInformation); Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce( timeout, getSelf(), @@ -942,7 +811,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } else { LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInformation.getShardName()); - getSender().tell(createNoShardLeaderException(shardInformation.getShardId()), getSelf()); + getSender().tell(new NoShardLeaderException(shardInformation.getShardId()), getSelf()); } return; @@ -951,10 +820,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { getSender().tell(messageSupplier.get(), getSelf()); } - private static NoShardLeaderException createNoShardLeaderException(final ShardIdentifier shardId) { - return new NoShardLeaderException(null, shardId.toString()); - } - private static NotInitializedException createNotInitializedException(final ShardIdentifier shardId) { return new NotInitializedException(String.format( "Found primary shard %s but it's not initialized yet. Please try again later", shardId)); @@ -972,10 +837,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { message.member().address()); peerAddressResolver.removePeerAddress(memberName); - - for (ShardInformation info : localShards.values()) { - info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf()); - } } private void memberExited(final ClusterEvent.MemberExited message) { @@ -985,10 +846,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { message.member().address()); peerAddressResolver.removePeerAddress(memberName); - - for (ShardInformation info : localShards.values()) { - info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf()); - } } private void memberUp(final ClusterEvent.MemberUp message) { @@ -1021,8 +878,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { String shardName = info.getShardName(); String peerId = getShardIdentifier(memberName, shardName).toString(); info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf()); - - info.peerUp(memberName, peerId, getSelf()); } } @@ -1050,9 +905,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { info.setLeaderAvailable(false); primaryShardInfoCache.remove(info.getShardName()); - } - info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf()); + notifyShardAvailabilityCallbacks(info); + } } } @@ -1063,8 +918,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug("Marking Leader {} as available.", leaderId); info.setLeaderAvailable(true); } - - info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf()); } } @@ -1122,7 +975,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { * @param message the message to send */ private void updateSchemaContext(final Object message) { - schemaContext = ((UpdateSchemaContext) message).getSchemaContext(); + schemaContext = ((UpdateSchemaContext) message).getEffectiveModelContext(); LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getModules().size()); @@ -1139,7 +992,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { String peerId = getShardIdentifier(memberName, shardName).toString() ; String peerAddress = peerAddressResolver.getShardActorAddress(shardName, memberName); info.updatePeerAddress(peerId, peerAddress, getSelf()); - info.peerUp(memberName, peerId, getSelf()); LOG.debug("{}: updated peer {} on member {} with address {} on shard {} whose actor address is {}", persistenceId(), peerId, memberName, peerAddress, info.getShardId(), info.getActor()); } @@ -1215,7 +1067,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { .getShardInitializationTimeout().duration().$times(2)); Future futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout); - futureObj.onComplete(new OnComplete() { + futureObj.onComplete(new OnComplete<>() { @Override public void onComplete(final Throwable failure, final Object response) { if (failure != null) { @@ -1249,8 +1101,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { * Create shards that are local to the member on which the ShardManager runs. */ private void createLocalShards() { - MemberName memberName = this.cluster.getCurrentMemberName(); - Collection memberShardNames = this.configuration.getMemberShardNames(memberName); + MemberName memberName = cluster.getCurrentMemberName(); + Collection memberShardNames = configuration.getMemberShardNames(memberName); Map shardSnapshots = new HashMap<>(); if (restoreFromSnapshot != null) { @@ -1268,25 +1120,34 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId); Map peerAddresses = getPeerAddresses(shardName); - localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses, - newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot( - shardSnapshots.get(shardName)), peerAddressResolver)); + localShards.put(shardName, createShardInfoFor(shardName, shardId, peerAddresses, + newShardDatastoreContext(shardName), shardSnapshots)); } } + @VisibleForTesting + ShardInformation createShardInfoFor(final String shardName, final ShardIdentifier shardId, + final Map peerAddresses, + final DatastoreContext datastoreContext, + final Map shardSnapshots) { + return new ShardInformation(shardName, shardId, peerAddresses, + datastoreContext, Shard.builder().restoreFromSnapshot(shardSnapshots.get(shardName)), + peerAddressResolver); + } + /** * Given the name of the shard find the addresses of all it's peers. * * @param shardName the shard name */ - private Map getPeerAddresses(final String shardName) { + Map getPeerAddresses(final String shardName) { final Collection members = configuration.getMembersFromShardName(shardName); return getPeerAddresses(shardName, members); } private Map getPeerAddresses(final String shardName, final Collection members) { Map peerAddresses = new HashMap<>(); - MemberName currentMemberName = this.cluster.getCurrentMemberName(); + MemberName currentMemberName = cluster.getCurrentMemberName(); for (MemberName memberName : members) { if (!currentMemberName.equals(memberName)) { @@ -1301,7 +1162,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override public SupervisorStrategy supervisorStrategy() { - return new OneForOneStrategy(10, Duration.create("1 minute"), + return new OneForOneStrategy(10, FiniteDuration.create(1, TimeUnit.MINUTES), (Function) t -> { LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t); return SupervisorStrategy.resume(); @@ -1320,68 +1181,35 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) { if (shardReplicaOperationsInProgress.contains(shardName)) { - String msg = String.format("A shard replica operation for %s is already in progress", shardName); - LOG.debug("{}: {}", persistenceId(), msg); - sender.tell(new Status.Failure(new IllegalStateException(msg)), getSelf()); + LOG.debug("{}: A shard replica operation for {} is already in progress", persistenceId(), shardName); + sender.tell(new Status.Failure(new IllegalStateException( + String.format("A shard replica operation for %s is already in progress", shardName))), getSelf()); return true; } return false; } - private void onAddPrefixShardReplica(final AddPrefixShardReplica message) { - LOG.debug("{}: onAddPrefixShardReplica: {}", persistenceId(), message); - - final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), - ClusterUtils.getCleanShardName(message.getShardPrefix())); - final String shardName = shardId.getShardName(); - - // Create the localShard - if (schemaContext == null) { - String msg = String.format( - "No SchemaContext is available in order to create a local shard instance for %s", shardName); - LOG.debug("{}: {}", persistenceId(), msg); - getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf()); - return; - } - - findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), - getSelf()) { - @Override - public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) { - final RunnableMessage runnable = (RunnableMessage) () -> addPrefixShard(getShardName(), - message.getShardPrefix(), response, getSender()); - if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) { - getSelf().tell(runnable, getTargetActor()); - } - } - - @Override - public void onLocalPrimaryFound(final LocalPrimaryShardFound message) { - sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor()); - } - }); - } - private void onAddShardReplica(final AddShardReplica shardReplicaMsg) { final String shardName = shardReplicaMsg.getShardName(); LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg); // verify the shard with the specified name is present in the cluster configuration - if (!this.configuration.isShardConfigured(shardName)) { - String msg = String.format("No module configuration exists for shard %s", shardName); - LOG.debug("{}: {}", persistenceId(), msg); - getSender().tell(new Status.Failure(new IllegalArgumentException(msg)), getSelf()); + if (!configuration.isShardConfigured(shardName)) { + LOG.debug("{}: No module configuration exists for shard {}", persistenceId(), shardName); + getSender().tell(new Status.Failure(new IllegalArgumentException( + "No module configuration exists for shard " + shardName)), getSelf()); return; } // Create the localShard if (schemaContext == null) { - String msg = String.format( - "No SchemaContext is available in order to create a local shard instance for %s", shardName); - LOG.debug("{}: {}", persistenceId(), msg); - getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf()); + LOG.debug("{}: No SchemaContext is available in order to create a local shard instance for {}", + persistenceId(), shardName); + getSender().tell(new Status.Failure(new IllegalStateException( + "No SchemaContext is available in order to create a local shard instance for " + shardName)), + getSelf()); return; } @@ -1404,43 +1232,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void sendLocalReplicaAlreadyExistsReply(final String shardName, final ActorRef sender) { - String msg = String.format("Local shard %s already exists", shardName); - LOG.debug("{}: {}", persistenceId(), msg); - sender.tell(new Status.Failure(new AlreadyExistsException(msg)), getSelf()); - } - - private void addPrefixShard(final String shardName, final YangInstanceIdentifier shardPrefix, - final RemotePrimaryShardFound response, final ActorRef sender) { - if (isShardReplicaOperationInProgress(shardName, sender)) { - return; - } - - shardReplicaOperationsInProgress.add(shardName); - - final ShardInformation shardInfo; - final boolean removeShardOnFailure; - ShardInformation existingShardInfo = localShards.get(shardName); - if (existingShardInfo == null) { - removeShardOnFailure = true; - ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName); - - final Builder builder = newShardDatastoreContextBuilder(shardName); - builder.storeRoot(shardPrefix).customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()); - - DatastoreContext datastoreContext = builder.build(); - - shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext, - Shard.builder(), peerAddressResolver); - shardInfo.setActiveMember(false); - shardInfo.setSchemaContext(schemaContext); - localShards.put(shardName, shardInfo); - shardInfo.setActor(newShardActor(shardInfo)); - } else { - removeShardOnFailure = false; - shardInfo = existingShardInfo; - } - - execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender); + LOG.debug("{}: Local shard {} already exists", persistenceId(), shardName); + sender.tell(new Status.Failure(new AlreadyExistsException( + String.format("Local shard %s already exists", shardName))), getSelf()); } private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) { @@ -1492,7 +1286,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { final Future futureObj = ask(getContext().actorSelection(response.getPrimaryPath()), new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout); - futureObj.onComplete(new OnComplete() { + futureObj.onComplete(new OnComplete<>() { @Override public void onComplete(final Throwable failure, final Object addServerResponse) { if (failure != null) { @@ -1563,7 +1357,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { + "Possible causes - there was a problem replicating the data or shard leadership changed " + "while replicating the shard data", leaderPath, shardId.getShardName())); case NO_LEADER: - return createNoShardLeaderException(shardId); + return new NoShardLeaderException(shardId); case NOT_SUPPORTED: return new UnsupportedOperationException(String.format("%s request is not supported for shard %s", serverChange.getSimpleName(), shardId.getShardName())); @@ -1595,32 +1389,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { }); } - private void onRemovePrefixShardReplica(final RemovePrefixShardReplica message) { - LOG.debug("{}: onRemovePrefixShardReplica: {}", persistenceId(), message); - - final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), - ClusterUtils.getCleanShardName(message.getShardPrefix())); - final String shardName = shardId.getShardName(); - - findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), - shardName, persistenceId(), getSelf()) { - @Override - public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) { - doRemoveShardReplicaAsync(response.getPrimaryPath()); - } - - @Override - public void onLocalPrimaryFound(final LocalPrimaryShardFound response) { - doRemoveShardReplicaAsync(response.getPrimaryPath()); - } - - private void doRemoveShardReplicaAsync(final String primaryPath) { - getSelf().tell((RunnableMessage) () -> removePrefixShardReplica(message, getShardName(), - primaryPath, getSender()), getTargetActor()); - } - }); - } - private void persistShardList() { List shardList = new ArrayList<>(localShards.keySet()); for (ShardInformation shardInfo : localShards.values()) { @@ -1629,13 +1397,11 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } LOG.debug("{}: persisting the shard list {}", persistenceId(), shardList); - saveSnapshot(updateShardManagerSnapshot(shardList, configuration.getAllPrefixShardConfigurations())); + saveSnapshot(updateShardManagerSnapshot(shardList)); } - private ShardManagerSnapshot updateShardManagerSnapshot( - final List shardList, - final Map allPrefixShardConfigurations) { - currentSnapshot = new ShardManagerSnapshot(shardList, allPrefixShardConfigurations); + private ShardManagerSnapshot updateShardManagerSnapshot(final List shardList) { + currentSnapshot = new ShardManagerSnapshot(shardList); return currentSnapshot; } @@ -1696,7 +1462,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { Future future = ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE, Timeout.apply(30, TimeUnit.SECONDS)); - future.onComplete(new OnComplete() { + future.onComplete(new OnComplete<>() { @Override public void onComplete(final Throwable failure, final Object response) { if (failure != null) { @@ -1745,7 +1511,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { .getShardInitializationTimeout().duration().$times(2)); Future futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout); - futureObj.onComplete(new OnComplete() { + futureObj.onComplete(new OnComplete<>() { @Override public void onComplete(final Throwable failure, final Object response) { if (failure != null) { @@ -1758,15 +1524,16 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { getSelf().tell((RunnableMessage) () -> onLocalShardFound.accept((LocalShardFound) response), sender); } else if (response instanceof LocalShardNotFound) { - String msg = String.format("Local shard %s does not exist", shardName); - LOG.debug("{}: {}", persistenceId, msg); - sender.tell(new Status.Failure(new IllegalArgumentException(msg)), self()); + LOG.debug("{}: Local shard {} does not exist", persistenceId, shardName); + sender.tell(new Status.Failure(new IllegalArgumentException( + String.format("Local shard %s does not exist", shardName))), self()); } else { - String msg = String.format("Failed to find local shard %s: received response: %s", - shardName, response); - LOG.debug("{}: {}", persistenceId, msg); - sender.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response : - new RuntimeException(msg)), self()); + LOG.debug("{}: Failed to find local shard {}: received response: {}", persistenceId, shardName, + response); + sender.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response + : new RuntimeException( + String.format("Failed to find local shard %s: received response: %s", shardName, + response))), self()); } } } @@ -1790,15 +1557,16 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { Timeout timeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2)); Future futureObj = ask(shardActorRef, changeServersVotingStatus, timeout); - futureObj.onComplete(new OnComplete() { + futureObj.onComplete(new OnComplete<>() { @Override public void onComplete(final Throwable failure, final Object response) { shardReplicaOperationsInProgress.remove(shardName); if (failure != null) { - String msg = String.format("ChangeServersVotingStatus request to local shard %s failed", - shardActorRef.path()); - LOG.debug("{}: {}", persistenceId(), msg, failure); - sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self()); + LOG.debug("{}: ChangeServersVotingStatus request to local shard {} failed", persistenceId(), + shardActorRef.path(), failure); + sender.tell(new Status.Failure(new RuntimeException( + String.format("ChangeServersVotingStatus request to local shard %s failed", + shardActorRef.path()), failure)), self()); } else { LOG.debug("{}: Received {} from local shard {}", persistenceId(), response, shardActorRef.path()); @@ -1937,10 +1705,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { */ protected AutoFindPrimaryFailureResponseHandler(final ActorRef targetActor, final String shardName, final String persistenceId, final ActorRef shardManagerActor) { - this.targetActor = Preconditions.checkNotNull(targetActor); - this.shardName = Preconditions.checkNotNull(shardName); - this.persistenceId = Preconditions.checkNotNull(persistenceId); - this.shardManagerActor = Preconditions.checkNotNull(shardManagerActor); + this.targetActor = requireNonNull(targetActor); + this.shardName = requireNonNull(shardName); + this.persistenceId = requireNonNull(persistenceId); + this.shardManagerActor = requireNonNull(shardManagerActor); } public ActorRef getTargetActor() { @@ -1960,11 +1728,11 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override public void onUnknownResponse(final Object response) { - String msg = String.format("Failed to find leader for shard %s: received response: %s", - shardName, response); - LOG.debug("{}: {}", persistenceId, msg); - targetActor.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response : - new RuntimeException(msg)), shardManagerActor); + LOG.debug("{}: Failed to find leader for shard {}: received response: {}", persistenceId, shardName, + response); + targetActor.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response + : new RuntimeException(String.format("Failed to find leader for shard %s: received response: %s", + shardName, response))), shardManagerActor); } }