package org.opendaylight.controller.cluster.sharding;
import static akka.actor.ActorRef.noSender;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.dispatch.OnComplete;
import akka.util.Timeout;
import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
import java.util.Collection;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
-import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import scala.compat.java8.FutureConverters;
import scala.concurrent.Future;
-
/**
* Default {@link CDSShardAccess} implementation. Listens on leader location
* change events and distributes them to registered listeners. Also updates
private volatile boolean closed = false;
CDSShardAccessImpl(final DOMDataTreeIdentifier prefix, final ActorContext actorContext) {
- this.prefix = Preconditions.checkNotNull(prefix);
- this.actorContext = Preconditions.checkNotNull(actorContext);
+ this.prefix = requireNonNull(prefix);
+ this.actorContext = requireNonNull(actorContext);
this.makeLeaderLocalTimeout =
new Timeout(actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().$times(2));
// TODO Maybe we should do this in async
final Optional<ActorRef> localShardReply =
actorContext.findLocalShard(ClusterUtils.getCleanShardName(prefix.getRootIdentifier()));
- Preconditions.checkState(localShardReply.isPresent(),
+ checkState(localShardReply.isPresent(),
"Local shard for {} not present. Cannot register RoleChangeListenerActor", prefix);
roleChangeListenerActor =
actorContext.getActorSystem().actorOf(RoleChangeListenerActor.props(localShardReply.get(), this));
}
private void checkNotClosed() {
- Preconditions.checkState(!closed,
- "CDSDataTreeProducer, that this CDSShardAccess is associated with, is no longer valid");
+ checkState(!closed, "CDSDataTreeProducer, that this CDSShardAccess is associated with, is no longer valid");
}
@Override
- @Nonnull
public DOMDataTreeIdentifier getShardIdentifier() {
checkNotClosed();
return prefix;
}
@Override
- @Nonnull
public LeaderLocation getLeaderLocation() {
checkNotClosed();
// TODO before getting first notification from roleChangeListenerActor
}
@Override
- @Nonnull
public CompletionStage<Void> makeLeaderLocal() {
// TODO when we have running make leader local operation
// we should just return the same completion stage
final scala.concurrent.Promise<Object> makeLeaderLocalAsk = Futures.promise();
localShardReply.onComplete(new OnComplete<ActorRef>() {
@Override
- public void onComplete(final Throwable failure, final ActorRef actorRef) throws Throwable {
+ public void onComplete(final Throwable failure, final ActorRef actorRef) {
if (failure instanceof LocalShardNotFoundException) {
LOG.debug("No local shard found for {} - Cannot request leadership transfer to local shard.",
getShardIdentifier(), failure);
}
@Override
- @Nonnull
public <L extends LeaderLocationListener> LeaderLocationListenerRegistration<L>
- registerLeaderLocationListener(@Nonnull final L listener) {
+ registerLeaderLocationListener(final L listener) {
checkNotClosed();
- Preconditions.checkNotNull(listener);
- Preconditions.checkArgument(!listeners.contains(listener),
- "Listener {} is already registered with ShardAccess {}", listener, this);
+ requireNonNull(listener);
+ checkArgument(!listeners.contains(listener), "Listener %s is already registered with ShardAccess %s", listener,
+ this);
LOG.debug("Registering LeaderLocationListener {}", listener);
@Override
@SuppressWarnings("checkstyle:IllegalCatch")
- public void onLeaderLocationChanged(@Nonnull final LeaderLocation location) {
+ public void onLeaderLocationChanged(final LeaderLocation location) {
if (closed) {
// we are closed already. Do not dispatch any new leader location
// change events.