import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
+import akka.dispatch.Mapper;
import akka.util.Timeout;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.Configuration;
+import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
+import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
}
/**
- * Finds a local shard given it's shard name and return it's ActorRef
+ * Finds a local shard given its shard name and return it's ActorRef
*
* @param shardName the name of the local shard that needs to be found
* @return a reference to a local shard actor which represents the shard
* specified by the shardName
*/
public Optional<ActorRef> findLocalShard(String shardName) {
- Object result = executeOperation(shardManager, new FindLocalShard(shardName));
+ Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
if (result instanceof LocalShardFound) {
LocalShardFound found = (LocalShardFound) result;
return Optional.absent();
}
+ /**
+ * Finds a local shard async given its shard name and return a Future from which to obtain the
+ * ActorRef.
+ *
+ * @param shardName the name of the local shard that needs to be found
+ */
+ public Future<ActorRef> findLocalShardAsync( final String shardName, Timeout timeout) {
+ Future<Object> future = executeOperationAsync(shardManager,
+ new FindLocalShard(shardName, true), timeout);
+
+ return future.map(new Mapper<Object, ActorRef>() {
+ @Override
+ public ActorRef checkedApply(Object response) throws Throwable {
+ if(response instanceof LocalShardFound) {
+ LocalShardFound found = (LocalShardFound)response;
+ LOG.debug("Local shard found {}", found.getPath());
+ return found.getPath();
+ } else if(response instanceof ActorNotInitialized) {
+ throw new NotInitializedException(
+ String.format("Found local shard for %s but it's not initialized yet.",
+ shardName));
+ } else if(response instanceof LocalShardNotFound) {
+ throw new LocalShardNotFoundException(
+ String.format("Local shard for %s does not exist.", shardName));
+ }
+
+ throw new UnknownMessageException(String.format(
+ "FindLocalShard returned unkown response: %s", response));
+ }
+ }, getActorSystem().dispatcher());
+ }
private String findPrimaryPathOrNull(String shardName) {
- Object result = executeOperation(shardManager, new FindPrimary(shardName).toSerializable());
+ Object result = executeOperation(shardManager, new FindPrimary(shardName, false).toSerializable());
if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
PrimaryFound found = PrimaryFound.fromSerializable(result);