X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fshardmanager%2FShardManager.java;h=52f642c98c4991f66e2cf2ba4a3de60619c9d84f;hp=fee555470f7fefd74dcc136c7595f1f0023d776c;hb=bb10634078d038fcccb4d5542a79f062e3835ad3;hpb=e66759266dc43d5f58b2837aca5047b42c205e4a diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java index fee555470f..52f642c98c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java @@ -35,6 +35,7 @@ import akka.persistence.SnapshotOffer; import akka.persistence.SnapshotSelectionCriteria; import akka.util.Timeout; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.SettableFuture; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.ArrayList; import java.util.Collection; @@ -45,7 +46,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; @@ -116,7 +116,7 @@ import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.yangtools.concepts.Registration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.ExecutionContext; @@ -156,14 +156,14 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private DatastoreContextFactory datastoreContextFactory; - private final CountDownLatch waitTillReadyCountdownLatch; + private final SettableFuture readinessFuture; private final PrimaryShardInfoFutureCache primaryShardInfoCache; @VisibleForTesting final ShardPeerAddressResolver peerAddressResolver; - private SchemaContext schemaContext; + private EffectiveModelContext schemaContext; private DatastoreSnapshot restoreFromSnapshot; @@ -187,7 +187,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName(); this.shardDispatcherPath = new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard); - this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountDownLatch(); + this.readinessFuture = builder.getReadinessFuture(); this.primaryShardInfoCache = builder.getPrimaryShardInfoCache(); this.restoreFromSnapshot = builder.getRestoreFromSnapshot(); @@ -279,7 +279,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } else if (message instanceof WrappedShardResponse) { onWrappedShardResponse((WrappedShardResponse) message); } else if (message instanceof GetSnapshot) { - onGetSnapshot(); + onGetSnapshot((GetSnapshot) message); } else if (message instanceof ServerRemoved) { onShardReplicaRemoved((ServerRemoved) message); } else if (message instanceof ChangeShardMembersVotingStatus) { @@ -357,7 +357,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { configUpdateHandler.initListener(dataStore, datastoreType); } - private void onShutDown() { + void onShutDown() { List> stopFutures = new ArrayList<>(localShards.size()); for (ShardInformation info : localShards.values()) { if (info.getActor() != null) { @@ -448,7 +448,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { Future futureObj = ask(getContext().actorSelection(primaryPath), new RemoveServer(shardId.toString()), removeServerTimeout); - futureObj.onComplete(new OnComplete() { + futureObj.onComplete(new OnComplete<>() { @Override public void onComplete(final Throwable failure, final Object response) { if (failure != null) { @@ -491,7 +491,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { Future futureObj = ask(getContext().actorSelection(primaryPath), new RemoveServer(shardId.toString()), removeServerTimeout); - futureObj.onComplete(new OnComplete() { + futureObj.onComplete(new OnComplete<>() { @Override public void onComplete(final Throwable failure, final Object response) { if (failure != null) { @@ -535,7 +535,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { final Future stopFuture = Patterns.gracefulStop(shardActor, FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS), Shutdown.INSTANCE); - final CompositeOnComplete onComplete = new CompositeOnComplete() { + final CompositeOnComplete onComplete = new CompositeOnComplete<>() { @Override public void onComplete(final Throwable failure, final Boolean result) { if (failure == null) { @@ -563,7 +563,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { persistShardList(); } - private void onGetSnapshot() { + private void onGetSnapshot(final GetSnapshot getSnapshot) { LOG.debug("{}: onGetSnapshot", persistenceId()); List notInitialized = null; @@ -588,7 +588,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration())); for (ShardInformation shardInfo: localShards.values()) { - shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor); + shardInfo.getActor().tell(getSnapshot, replyActor); } } @@ -761,10 +761,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void checkReady() { if (isReadyWithLeaderId()) { - LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}", - persistenceId(), type, waitTillReadyCountdownLatch.getCount()); - - waitTillReadyCountdownLatch.countDown(); + LOG.info("{}: All Shards are ready - data store {} is ready", persistenceId(), type); + readinessFuture.set(null); } } @@ -1141,7 +1139,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { * @param message the message to send */ private void updateSchemaContext(final Object message) { - schemaContext = ((UpdateSchemaContext) message).getSchemaContext(); + schemaContext = ((UpdateSchemaContext) message).getEffectiveModelContext(); LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getModules().size()); @@ -1234,7 +1232,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { .getShardInitializationTimeout().duration().$times(2)); Future futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout); - futureObj.onComplete(new OnComplete() { + futureObj.onComplete(new OnComplete<>() { @Override public void onComplete(final Throwable failure, final Object response) { if (failure != null) { @@ -1528,7 +1526,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { final Future futureObj = ask(getContext().actorSelection(response.getPrimaryPath()), new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout); - futureObj.onComplete(new OnComplete() { + futureObj.onComplete(new OnComplete<>() { @Override public void onComplete(final Throwable failure, final Object addServerResponse) { if (failure != null) { @@ -1732,7 +1730,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { Future future = ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE, Timeout.apply(30, TimeUnit.SECONDS)); - future.onComplete(new OnComplete() { + future.onComplete(new OnComplete<>() { @Override public void onComplete(final Throwable failure, final Object response) { if (failure != null) { @@ -1781,7 +1779,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { .getShardInitializationTimeout().duration().$times(2)); Future futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout); - futureObj.onComplete(new OnComplete() { + futureObj.onComplete(new OnComplete<>() { @Override public void onComplete(final Throwable failure, final Object response) { if (failure != null) { @@ -1827,7 +1825,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { Timeout timeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2)); Future futureObj = ask(shardActorRef, changeServersVotingStatus, timeout); - futureObj.onComplete(new OnComplete() { + futureObj.onComplete(new OnComplete<>() { @Override public void onComplete(final Throwable failure, final Object response) { shardReplicaOperationsInProgress.remove(shardName);