X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fshardmanager%2FShardManager.java;h=93b57eda2864d416d0f4cd548264a141fc5640b6;hb=1d5ca4009be6c61d7b61989799037ad8f1ab7a75;hp=18bcadf1511c2e9bc247586d0d97d68c984f66f3;hpb=5f587c3e2bfabc09fec49463d04a6fbeba414e9c;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java index 18bcadf151..93b57eda28 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java @@ -5,7 +5,6 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.datastore.shardmanager; import static akka.pattern.Patterns.ask; @@ -39,7 +38,6 @@ import com.google.common.util.concurrent.SettableFuture; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -55,19 +53,16 @@ import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersisten import org.opendaylight.controller.cluster.common.actor.Dispatchers; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.DatastoreContext; -import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory; import org.opendaylight.controller.cluster.datastore.Shard; import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration; -import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration; import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; -import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica; import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus; import org.opendaylight.controller.cluster.datastore.messages.CreateShard; @@ -81,13 +76,11 @@ import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary; import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound; -import org.opendaylight.controller.cluster.datastore.messages.RemovePrefixShardReplica; import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica; import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot; -import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; import org.opendaylight.controller.cluster.datastore.utils.CompositeOnComplete; import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; @@ -108,9 +101,8 @@ import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply; import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.yangtools.concepts.Registration; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.common.Empty; import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -151,7 +143,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private DatastoreContextFactory datastoreContextFactory; - private final SettableFuture readinessFuture; + private final SettableFuture readinessFuture; private final PrimaryShardInfoFutureCache primaryShardInfoCache; @@ -172,16 +164,17 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final String persistenceId; + @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", justification = "Akka class design") ShardManager(final AbstractShardManagerCreator builder) { - this.cluster = builder.getCluster(); - this.configuration = builder.getConfiguration(); - this.datastoreContextFactory = builder.getDatastoreContextFactory(); - this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName(); - this.shardDispatcherPath = - new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard); - this.readinessFuture = builder.getReadinessFuture(); - this.primaryShardInfoCache = builder.getPrimaryShardInfoCache(); - this.restoreFromSnapshot = builder.getRestoreFromSnapshot(); + cluster = builder.getCluster(); + configuration = builder.getConfiguration(); + datastoreContextFactory = builder.getDatastoreContextFactory(); + type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName(); + shardDispatcherPath = new Dispatchers(context().system().dispatchers()) + .getDispatcherPath(Dispatchers.DispatcherType.Shard); + readinessFuture = builder.getReadinessFuture(); + primaryShardInfoCache = builder.getPrimaryShardInfoCache(); + restoreFromSnapshot = builder.getRestoreFromSnapshot(); String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId(); persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type; @@ -192,7 +185,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { cluster.subscribeToMemberEvents(getSelf()); shardManagerMBean = new ShardManagerInfo(getSelf(), cluster.getCurrentMemberName(), - "shard-manager-" + this.type, + "shard-manager-" + type, datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType()); shardManagerMBean.registerMBean(); } @@ -247,8 +240,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { onCreateShard((CreateShard)message); } else if (message instanceof AddShardReplica) { onAddShardReplica((AddShardReplica) message); - } else if (message instanceof AddPrefixShardReplica) { - onAddPrefixShardReplica((AddPrefixShardReplica) message); } else if (message instanceof ForwardedAddServerReply) { ForwardedAddServerReply msg = (ForwardedAddServerReply)message; onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath, @@ -258,8 +249,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure); } else if (message instanceof RemoveShardReplica) { onRemoveShardReplica((RemoveShardReplica) message); - } else if (message instanceof RemovePrefixShardReplica) { - onRemovePrefixShardReplica((RemovePrefixShardReplica) message); } else if (message instanceof WrappedShardResponse) { onWrappedShardResponse((WrappedShardResponse) message); } else if (message instanceof GetSnapshot) { @@ -395,51 +384,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", - justification = "https://github.com/spotbugs/spotbugs/issues/811") - private void removePrefixShardReplica(final RemovePrefixShardReplica contextMessage, final String shardName, - final String primaryPath, final ActorRef sender) { - if (isShardReplicaOperationInProgress(shardName, sender)) { - return; - } - - shardReplicaOperationsInProgress.add(shardName); - - final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName); - - final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build(); - - //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message - LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(), - primaryPath, shardId); - - Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration()); - Future futureObj = ask(getContext().actorSelection(primaryPath), - new RemoveServer(shardId.toString()), removeServerTimeout); - - futureObj.onComplete(new OnComplete<>() { - @Override - public void onComplete(final Throwable failure, final Object response) { - if (failure != null) { - shardReplicaOperationsInProgress.remove(shardName); - - LOG.debug("{}: RemoveServer request to leader {} for shard {} failed", persistenceId(), primaryPath, - shardName, failure); - - // FAILURE - sender.tell(new Status.Failure(new RuntimeException( - String.format("RemoveServer request to leader %s for shard %s failed", primaryPath, shardName), - failure)), self()); - } else { - // SUCCESS - self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender); - } - } - }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); - } - - @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", - justification = "https://github.com/spotbugs/spotbugs/issues/811") private void removeShardReplica(final RemoveShardReplica contextMessage, final String shardName, final String primaryPath, final ActorRef sender) { if (isShardReplicaOperationInProgress(shardName, sender)) { @@ -585,8 +529,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", - justification = "https://github.com/spotbugs/spotbugs/issues/811") private boolean isPreviousShardActorStopInProgress(final String shardName, final Object messageToDefer) { final CompositeOnComplete stopOnComplete = shardActorsStopping.get(shardName); if (stopOnComplete == null) { @@ -638,7 +580,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { // the shard with no peers and with elections disabled so it stays as follower. A // subsequent AddServer request will be needed to make it an active member. isActiveMember = false; - peerAddresses = Collections.emptyMap(); + peerAddresses = Map.of(); shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext) .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build(); } @@ -670,7 +612,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void checkReady() { if (isReadyWithLeaderId()) { LOG.info("{}: All Shards are ready - data store {} is ready", persistenceId(), type); - readinessFuture.set(null); + readinessFuture.set(Empty.value()); } } @@ -895,10 +837,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { message.member().address()); peerAddressResolver.removePeerAddress(memberName); - - for (ShardInformation info : localShards.values()) { - info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf()); - } } private void memberExited(final ClusterEvent.MemberExited message) { @@ -908,10 +846,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { message.member().address()); peerAddressResolver.removePeerAddress(memberName); - - for (ShardInformation info : localShards.values()) { - info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf()); - } } private void memberUp(final ClusterEvent.MemberUp message) { @@ -944,8 +878,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { String shardName = info.getShardName(); String peerId = getShardIdentifier(memberName, shardName).toString(); info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf()); - - info.peerUp(memberName, peerId, getSelf()); } } @@ -976,8 +908,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { notifyShardAvailabilityCallbacks(info); } - - info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf()); } } @@ -988,8 +918,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug("Marking Leader {} as available.", leaderId); info.setLeaderAvailable(true); } - - info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf()); } } @@ -1064,7 +992,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { String peerId = getShardIdentifier(memberName, shardName).toString() ; String peerAddress = peerAddressResolver.getShardActorAddress(shardName, memberName); info.updatePeerAddress(peerId, peerAddress, getSelf()); - info.peerUp(memberName, peerId, getSelf()); LOG.debug("{}: updated peer {} on member {} with address {} on shard {} whose actor address is {}", persistenceId(), peerId, memberName, peerAddress, info.getShardId(), info.getActor()); } @@ -1174,8 +1101,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { * Create shards that are local to the member on which the ShardManager runs. */ private void createLocalShards() { - MemberName memberName = this.cluster.getCurrentMemberName(); - Collection memberShardNames = this.configuration.getMemberShardNames(memberName); + MemberName memberName = cluster.getCurrentMemberName(); + Collection memberShardNames = configuration.getMemberShardNames(memberName); Map shardSnapshots = new HashMap<>(); if (restoreFromSnapshot != null) { @@ -1220,7 +1147,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private Map getPeerAddresses(final String shardName, final Collection members) { Map peerAddresses = new HashMap<>(); - MemberName currentMemberName = this.cluster.getCurrentMemberName(); + MemberName currentMemberName = cluster.getCurrentMemberName(); for (MemberName memberName : members) { if (!currentMemberName.equals(memberName)) { @@ -1263,48 +1190,13 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { return false; } - private void onAddPrefixShardReplica(final AddPrefixShardReplica message) { - LOG.debug("{}: onAddPrefixShardReplica: {}", persistenceId(), message); - - final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), - ClusterUtils.getCleanShardName(message.getShardPrefix())); - final String shardName = shardId.getShardName(); - - // Create the localShard - if (schemaContext == null) { - LOG.debug("{}: No SchemaContext is available in order to create a local shard instance for {}", - persistenceId(), shardName); - getSender().tell(new Status.Failure(new IllegalStateException( - "No SchemaContext is available in order to create a local shard instance for " + shardName)), - getSelf()); - return; - } - - findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), - getSelf()) { - @Override - public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) { - final RunnableMessage runnable = (RunnableMessage) () -> addPrefixShard(getShardName(), - message.getShardPrefix(), response, getSender()); - if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) { - getSelf().tell(runnable, getTargetActor()); - } - } - - @Override - public void onLocalPrimaryFound(final LocalPrimaryShardFound message) { - sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor()); - } - }); - } - private void onAddShardReplica(final AddShardReplica shardReplicaMsg) { final String shardName = shardReplicaMsg.getShardName(); LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg); // verify the shard with the specified name is present in the cluster configuration - if (!this.configuration.isShardConfigured(shardName)) { + if (!configuration.isShardConfigured(shardName)) { LOG.debug("{}: No module configuration exists for shard {}", persistenceId(), shardName); getSender().tell(new Status.Failure(new IllegalArgumentException( "No module configuration exists for shard " + shardName)), getSelf()); @@ -1339,52 +1231,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { }); } - @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", - justification = "https://github.com/spotbugs/spotbugs/issues/811") private void sendLocalReplicaAlreadyExistsReply(final String shardName, final ActorRef sender) { LOG.debug("{}: Local shard {} already exists", persistenceId(), shardName); sender.tell(new Status.Failure(new AlreadyExistsException( String.format("Local shard %s already exists", shardName))), getSelf()); } - @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", - justification = "https://github.com/spotbugs/spotbugs/issues/811") - private void addPrefixShard(final String shardName, final YangInstanceIdentifier shardPrefix, - final RemotePrimaryShardFound response, final ActorRef sender) { - if (isShardReplicaOperationInProgress(shardName, sender)) { - return; - } - - shardReplicaOperationsInProgress.add(shardName); - - final ShardInformation shardInfo; - final boolean removeShardOnFailure; - ShardInformation existingShardInfo = localShards.get(shardName); - if (existingShardInfo == null) { - removeShardOnFailure = true; - ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName); - - final Builder builder = newShardDatastoreContextBuilder(shardName); - builder.storeRoot(shardPrefix).customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()); - - DatastoreContext datastoreContext = builder.build(); - - shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext, - Shard.builder(), peerAddressResolver); - shardInfo.setActiveMember(false); - shardInfo.setSchemaContext(schemaContext); - localShards.put(shardName, shardInfo); - shardInfo.setActor(newShardActor(shardInfo)); - } else { - removeShardOnFailure = false; - shardInfo = existingShardInfo; - } - - execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender); - } - - @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", - justification = "https://github.com/spotbugs/spotbugs/issues/811") private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) { if (isShardReplicaOperationInProgress(shardName, sender)) { return; @@ -1537,32 +1389,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { }); } - private void onRemovePrefixShardReplica(final RemovePrefixShardReplica message) { - LOG.debug("{}: onRemovePrefixShardReplica: {}", persistenceId(), message); - - final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), - ClusterUtils.getCleanShardName(message.getShardPrefix())); - final String shardName = shardId.getShardName(); - - findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), - shardName, persistenceId(), getSelf()) { - @Override - public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) { - doRemoveShardReplicaAsync(response.getPrimaryPath()); - } - - @Override - public void onLocalPrimaryFound(final LocalPrimaryShardFound response) { - doRemoveShardReplicaAsync(response.getPrimaryPath()); - } - - private void doRemoveShardReplicaAsync(final String primaryPath) { - getSelf().tell((RunnableMessage) () -> removePrefixShardReplica(message, getShardName(), - primaryPath, getSender()), getTargetActor()); - } - }); - } - private void persistShardList() { List shardList = new ArrayList<>(localShards.keySet()); for (ShardInformation shardInfo : localShards.values()) { @@ -1571,13 +1397,11 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } LOG.debug("{}: persisting the shard list {}", persistenceId(), shardList); - saveSnapshot(updateShardManagerSnapshot(shardList, configuration.getAllPrefixShardConfigurations())); + saveSnapshot(updateShardManagerSnapshot(shardList)); } - private ShardManagerSnapshot updateShardManagerSnapshot( - final List shardList, - final Map allPrefixShardConfigurations) { - currentSnapshot = new ShardManagerSnapshot(shardList, allPrefixShardConfigurations); + private ShardManagerSnapshot updateShardManagerSnapshot(final List shardList) { + currentSnapshot = new ShardManagerSnapshot(shardList); return currentSnapshot; }