/*
* Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
package org.opendaylight.controller.cluster.sharding;
import static akka.actor.ActorRef.noSender;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.dispatch.Futures;
import akka.dispatch.Mapper;
import akka.dispatch.OnComplete;
import akka.util.Timeout;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import java.util.Collection;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
import org.opendaylight.controller.cluster.dom.api.CDSShardAccess;
import org.opendaylight.controller.cluster.dom.api.LeaderLocation;
import org.opendaylight.controller.cluster.dom.api.LeaderLocationListener;
import org.opendaylight.controller.cluster.dom.api.LeaderLocationListenerRegistration;
import org.opendaylight.controller.cluster.raft.LeadershipTransferFailedException;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.compat.java8.FutureConverters;
import scala.concurrent.Future;
/**
* Default {@link CDSShardAccess} implementation. Listens on leader location
* change events and distributes them to registered listeners. Also updates
* current information about leader location accordingly.
*
*
* Sends {@link MakeLeaderLocal} message to local shards and translates its result
* on behalf users {@link #makeLeaderLocal()} calls.
*
*
* {@link org.opendaylight.controller.cluster.dom.api.CDSDataTreeProducer} that
* creates instances of this class has to call {@link #close()} once it is no
* longer valid.
*/
final class CDSShardAccessImpl implements CDSShardAccess, LeaderLocationListener, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(CDSShardAccessImpl.class);
private final Collection listeners = ConcurrentHashMap.newKeySet();
private final DOMDataTreeIdentifier prefix;
private final ActorContext actorContext;
private final Timeout makeLeaderLocalTimeout;
private ActorRef roleChangeListenerActor;
private volatile LeaderLocation currentLeader = LeaderLocation.UNKNOWN;
private volatile boolean closed = false;
CDSShardAccessImpl(final DOMDataTreeIdentifier prefix, final ActorContext actorContext) {
this.prefix = Preconditions.checkNotNull(prefix);
this.actorContext = Preconditions.checkNotNull(actorContext);
this.makeLeaderLocalTimeout =
new Timeout(actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().$times(2));
// register RoleChangeListenerActor
// TODO Maybe we should do this in async
final Optional localShardReply =
actorContext.findLocalShard(ClusterUtils.getCleanShardName(prefix.getRootIdentifier()));
Preconditions.checkState(localShardReply.isPresent(),
"Local shard for {} not present. Cannot register RoleChangeListenerActor", prefix);
roleChangeListenerActor =
actorContext.getActorSystem().actorOf(RoleChangeListenerActor.props(localShardReply.get(), this));
}
private void checkNotClosed() {
Preconditions.checkState(!closed,
"CDSDataTreeProducer, that this CDSShardAccess is associated with, is no longer valid");
}
@Override
@Nonnull
public DOMDataTreeIdentifier getShardIdentifier() {
checkNotClosed();
return prefix;
}
@Override
@Nonnull
public LeaderLocation getLeaderLocation() {
checkNotClosed();
// TODO before getting first notification from roleChangeListenerActor
// we will always return UNKNOWN
return currentLeader;
}
@Override
@Nonnull
public CompletionStage makeLeaderLocal() {
// TODO when we have running make leader local operation
// we should just return the same completion stage
checkNotClosed();
// TODO can we cache local shard actorRef?
final Future localShardReply =
actorContext.findLocalShardAsync(ClusterUtils.getCleanShardName(prefix.getRootIdentifier()));
// we have to tell local shard to make leader local
final scala.concurrent.Promise