X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fshardmanager%2FShardManager.java;h=18bcadf1511c2e9bc247586d0d97d68c984f66f3;hp=d1b9aba9d666268c6ab159f11e543ebc674fefe5;hb=5f587c3e2bfabc09fec49463d04a6fbeba414e9c;hpb=6ef0b898f2117a4bb3a510c0df7af340f4fc8eca diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java index d1b9aba9d6..18bcadf151 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java @@ -35,6 +35,7 @@ import akka.persistence.SnapshotOffer; import akka.persistence.SnapshotSelectionCriteria; import akka.util.Timeout; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.SettableFuture; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.ArrayList; import java.util.Collection; @@ -45,7 +46,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; @@ -53,7 +53,6 @@ import java.util.function.Supplier; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; import org.opendaylight.controller.cluster.common.actor.Dispatchers; -import org.opendaylight.controller.cluster.datastore.AbstractDataStore; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; @@ -109,10 +108,6 @@ import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply; import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; -import org.opendaylight.controller.cluster.sharding.PrefixedShardConfigUpdateHandler; -import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener; -import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated; -import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.yangtools.concepts.Registration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -156,7 +151,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private DatastoreContextFactory datastoreContextFactory; - private final CountDownLatch waitTillReadyCountdownLatch; + private final SettableFuture readinessFuture; private final PrimaryShardInfoFutureCache primaryShardInfoCache; @@ -176,9 +171,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final Set> shardAvailabilityCallbacks = new HashSet<>(); private final String persistenceId; - private final AbstractDataStore dataStore; - - private PrefixedShardConfigUpdateHandler configUpdateHandler; ShardManager(final AbstractShardManagerCreator builder) { this.cluster = builder.getCluster(); @@ -187,7 +179,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName(); this.shardDispatcherPath = new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard); - this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountDownLatch(); + this.readinessFuture = builder.getReadinessFuture(); this.primaryShardInfoCache = builder.getPrimaryShardInfoCache(); this.restoreFromSnapshot = builder.getRestoreFromSnapshot(); @@ -203,8 +195,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { "shard-manager-" + this.type, datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType()); shardManagerMBean.registerMBean(); - - dataStore = builder.getDistributedDataStore(); } @Override @@ -259,12 +249,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { onAddShardReplica((AddShardReplica) message); } else if (message instanceof AddPrefixShardReplica) { onAddPrefixShardReplica((AddPrefixShardReplica) message); - } else if (message instanceof PrefixShardCreated) { - onPrefixShardCreated((PrefixShardCreated) message); - } else if (message instanceof PrefixShardRemoved) { - onPrefixShardRemoved((PrefixShardRemoved) message); - } else if (message instanceof InitConfigListener) { - onInitConfigListener(); } else if (message instanceof ForwardedAddServerReply) { ForwardedAddServerReply msg = (ForwardedAddServerReply)message; onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath, @@ -342,21 +326,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { getSender().tell(new GetShardRoleReply(shardInformation.getRole()), ActorRef.noSender()); } - private void onInitConfigListener() { - LOG.debug("{}: Initializing config listener on {}", persistenceId(), cluster.getCurrentMemberName()); - - final org.opendaylight.mdsal.common.api.LogicalDatastoreType datastoreType = - org.opendaylight.mdsal.common.api.LogicalDatastoreType - .valueOf(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType().name()); - - if (configUpdateHandler != null) { - configUpdateHandler.close(); - } - - configUpdateHandler = new PrefixedShardConfigUpdateHandler(self(), cluster.getCurrentMemberName()); - configUpdateHandler.initListener(dataStore, datastoreType); - } - void onShutDown() { List> stopFutures = new ArrayList<>(localShards.size()); for (ShardInformation info : localShards.values()) { @@ -616,32 +585,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - private void onPrefixShardCreated(final PrefixShardCreated message) { - LOG.debug("{}: onPrefixShardCreated: {}", persistenceId(), message); - - final PrefixShardConfiguration config = message.getConfiguration(); - final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), - ClusterUtils.getCleanShardName(config.getPrefix().getRootIdentifier())); - final String shardName = shardId.getShardName(); - - if (isPreviousShardActorStopInProgress(shardName, message)) { - return; - } - - if (localShards.containsKey(shardName)) { - LOG.debug("{}: Received create for an already existing shard {}", persistenceId(), shardName); - final PrefixShardConfiguration existing = - configuration.getAllPrefixShardConfigurations().get(config.getPrefix()); - - if (existing != null && existing.equals(config)) { - // we don't have to do nothing here - return; - } - } - - doCreatePrefixShard(config, shardId, shardName); - } - + @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", + justification = "https://github.com/spotbugs/spotbugs/issues/811") private boolean isPreviousShardActorStopInProgress(final String shardName, final Object messageToDefer) { final CompositeOnComplete stopOnComplete = shardActorsStopping.get(shardName); if (stopOnComplete == null) { @@ -662,43 +607,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { return true; } - private void doCreatePrefixShard(final PrefixShardConfiguration config, final ShardIdentifier shardId, - final String shardName) { - configuration.addPrefixShardConfiguration(config); - - final Builder builder = newShardDatastoreContextBuilder(shardName); - builder.logicalStoreType(config.getPrefix().getDatastoreType()) - .storeRoot(config.getPrefix().getRootIdentifier()); - DatastoreContext shardDatastoreContext = builder.build(); - - final Map peerAddresses = getPeerAddresses(shardName); - final boolean isActiveMember = true; - - LOG.debug("{} doCreatePrefixShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}", - persistenceId(), shardId, config.getShardMemberNames(), peerAddresses, isActiveMember); - - final ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses, - shardDatastoreContext, Shard.builder(), peerAddressResolver); - info.setActiveMember(isActiveMember); - localShards.put(info.getShardName(), info); - - if (schemaContext != null) { - info.setSchemaContext(schemaContext); - info.setActor(newShardActor(info)); - } - } - - private void onPrefixShardRemoved(final PrefixShardRemoved message) { - LOG.debug("{}: onPrefixShardRemoved : {}", persistenceId(), message); - - final DOMDataTreeIdentifier prefix = message.getPrefix(); - final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), - ClusterUtils.getCleanShardName(prefix.getRootIdentifier())); - - configuration.removePrefixShardConfiguration(prefix); - removeShard(shardId); - } - private void doCreateShard(final CreateShard createShard) { final ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig(); final String shardName = moduleShardConfig.getShardName(); @@ -761,10 +669,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void checkReady() { if (isReadyWithLeaderId()) { - LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}", - persistenceId(), type, waitTillReadyCountdownLatch.getCount()); - - waitTillReadyCountdownLatch.countDown(); + LOG.info("{}: All Shards are ready - data store {} is ready", persistenceId(), type); + readinessFuture.set(null); } }