X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fsharding%2FCDSShardAccessImpl.java;h=300759532d89a40cfa9cb41a0fa99961a174c380;hb=14c92df74247c884a43c5aaea2f154992b0ec798;hp=da328566a040bb6797f41a7536afe60233c692d7;hpb=12fcdfe39aa26dcba7fd3bb4d4c68e3d02e65c51;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/CDSShardAccessImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/CDSShardAccessImpl.java index da328566a0..300759532d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/CDSShardAccessImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/CDSShardAccessImpl.java @@ -8,6 +8,9 @@ package org.opendaylight.controller.cluster.sharding; import static akka.actor.ActorRef.noSender; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; import akka.actor.ActorRef; import akka.actor.PoisonPill; @@ -15,15 +18,13 @@ import akka.dispatch.Futures; import akka.dispatch.Mapper; import akka.dispatch.OnComplete; import akka.util.Timeout; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; import java.util.Collection; +import java.util.Optional; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; -import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException; import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal; -import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; import org.opendaylight.controller.cluster.dom.api.CDSShardAccess; import org.opendaylight.controller.cluster.dom.api.LeaderLocation; @@ -36,7 +37,6 @@ import org.slf4j.LoggerFactory; import scala.compat.java8.FutureConverters; import scala.concurrent.Future; - /** * Default {@link CDSShardAccess} implementation. Listens on leader location * change events and distributes them to registered listeners. Also updates @@ -56,7 +56,7 @@ final class CDSShardAccessImpl implements CDSShardAccess, LeaderLocationListener private final Collection listeners = ConcurrentHashMap.newKeySet(); private final DOMDataTreeIdentifier prefix; - private final ActorContext actorContext; + private final ActorUtils actorUtils; private final Timeout makeLeaderLocalTimeout; private ActorRef roleChangeListenerActor; @@ -64,36 +64,33 @@ final class CDSShardAccessImpl implements CDSShardAccess, LeaderLocationListener private volatile LeaderLocation currentLeader = LeaderLocation.UNKNOWN; private volatile boolean closed = false; - CDSShardAccessImpl(final DOMDataTreeIdentifier prefix, final ActorContext actorContext) { - this.prefix = Preconditions.checkNotNull(prefix); - this.actorContext = Preconditions.checkNotNull(actorContext); + CDSShardAccessImpl(final DOMDataTreeIdentifier prefix, final ActorUtils actorUtils) { + this.prefix = requireNonNull(prefix); + this.actorUtils = requireNonNull(actorUtils); this.makeLeaderLocalTimeout = - new Timeout(actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().$times(2)); + new Timeout(actorUtils.getDatastoreContext().getShardLeaderElectionTimeout().duration().$times(2)); // register RoleChangeListenerActor // TODO Maybe we should do this in async final Optional localShardReply = - actorContext.findLocalShard(ClusterUtils.getCleanShardName(prefix.getRootIdentifier())); - Preconditions.checkState(localShardReply.isPresent(), + actorUtils.findLocalShard(ClusterUtils.getCleanShardName(prefix.getRootIdentifier())); + checkState(localShardReply.isPresent(), "Local shard for {} not present. Cannot register RoleChangeListenerActor", prefix); roleChangeListenerActor = - actorContext.getActorSystem().actorOf(RoleChangeListenerActor.props(localShardReply.get(), this)); + actorUtils.getActorSystem().actorOf(RoleChangeListenerActor.props(localShardReply.get(), this)); } private void checkNotClosed() { - Preconditions.checkState(!closed, - "CDSDataTreeProducer, that this CDSShardAccess is associated with, is no longer valid"); + checkState(!closed, "CDSDataTreeProducer, that this CDSShardAccess is associated with, is no longer valid"); } @Override - @Nonnull public DOMDataTreeIdentifier getShardIdentifier() { checkNotClosed(); return prefix; } @Override - @Nonnull public LeaderLocation getLeaderLocation() { checkNotClosed(); // TODO before getting first notification from roleChangeListenerActor @@ -102,7 +99,6 @@ final class CDSShardAccessImpl implements CDSShardAccess, LeaderLocationListener } @Override - @Nonnull public CompletionStage makeLeaderLocal() { // TODO when we have running make leader local operation // we should just return the same completion stage @@ -110,7 +106,7 @@ final class CDSShardAccessImpl implements CDSShardAccess, LeaderLocationListener // TODO can we cache local shard actorRef? final Future localShardReply = - actorContext.findLocalShardAsync(ClusterUtils.getCleanShardName(prefix.getRootIdentifier())); + actorUtils.findLocalShardAsync(ClusterUtils.getCleanShardName(prefix.getRootIdentifier())); // we have to tell local shard to make leader local final scala.concurrent.Promise makeLeaderLocalAsk = Futures.promise(); @@ -128,11 +124,11 @@ final class CDSShardAccessImpl implements CDSShardAccess, LeaderLocationListener makeLeaderLocalAsk.failure(failure); } else { makeLeaderLocalAsk - .completeWith(actorContext + .completeWith(actorUtils .executeOperationAsync(actorRef, MakeLeaderLocal.INSTANCE, makeLeaderLocalTimeout)); } } - }, actorContext.getClientDispatcher()); + }, actorUtils.getClientDispatcher()); // we have to transform make leader local request result Future makeLeaderLocalFuture = makeLeaderLocalAsk.future() @@ -151,19 +147,18 @@ final class CDSShardAccessImpl implements CDSShardAccess, LeaderLocationListener // wrap exception in LeadershipTransferFailedEx return new LeadershipTransferFailedException("Leadership transfer failed", parameter); } - }, actorContext.getClientDispatcher()); + }, actorUtils.getClientDispatcher()); return FutureConverters.toJava(makeLeaderLocalFuture); } @Override - @Nonnull public LeaderLocationListenerRegistration - registerLeaderLocationListener(@Nonnull final L listener) { + registerLeaderLocationListener(final L listener) { checkNotClosed(); - Preconditions.checkNotNull(listener); - Preconditions.checkArgument(!listeners.contains(listener), - "Listener {} is already registered with ShardAccess {}", listener, this); + requireNonNull(listener); + checkArgument(!listeners.contains(listener), "Listener %s is already registered with ShardAccess %s", listener, + this); LOG.debug("Registering LeaderLocationListener {}", listener); @@ -184,7 +179,7 @@ final class CDSShardAccessImpl implements CDSShardAccess, LeaderLocationListener @Override @SuppressWarnings("checkstyle:IllegalCatch") - public void onLeaderLocationChanged(@Nonnull final LeaderLocation location) { + public void onLeaderLocationChanged(final LeaderLocation location) { if (closed) { // we are closed already. Do not dispatch any new leader location // change events.