BUG-2138: Make DistributedShardFactory return Futures.
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / sharding / DistributedShardedDOMDataTree.java
index 0bb6aac36fc86b6285b06ca227592315fb178d66..9c5f5b21df73df9ef89b914401630586bd14bab5 100644 (file)
@@ -16,6 +16,8 @@ import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.cluster.Cluster;
 import akka.cluster.Member;
+import akka.dispatch.Mapper;
+import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
@@ -29,6 +31,8 @@ import java.util.Collections;
 import java.util.EnumMap;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.ActorSystemProvider;
@@ -64,6 +68,9 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.collection.JavaConverters;
+import scala.compat.java8.FutureConverters;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * A layer on top of DOMDataTreeService that distributes producer/shard registrations to remote nodes via
@@ -74,10 +81,13 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
 
     private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTree.class);
 
-    private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS);
     private static final int MAX_ACTOR_CREATION_RETRIES = 100;
     private static final int ACTOR_RETRY_DELAY = 100;
     private static final TimeUnit ACTOR_RETRY_TIME_UNIT = TimeUnit.MILLISECONDS;
+    static final FiniteDuration SHARD_FUTURE_TIMEOUT_DURATION = new FiniteDuration(
+                    ShardedDataTreeActor.LOOKUP_TASK_MAX_RETRIES * ShardedDataTreeActor.LOOKUP_TASK_MAX_RETRIES * 3,
+                    TimeUnit.SECONDS);
+    static final Timeout SHARD_FUTURE_TIMEOUT = new Timeout(SHARD_FUTURE_TIMEOUT_DURATION);
 
     static final String ACTOR_ID = "ShardedDOMDataTreeFrontend";
 
@@ -118,14 +128,14 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
         try {
             defaultShardRegistrations.put(LogicalDatastoreType.CONFIGURATION,
                     initDefaultShard(LogicalDatastoreType.CONFIGURATION));
-        } catch (final DOMDataTreeProducerException | DOMDataTreeShardingConflictException e) {
+        } catch (final InterruptedException | ExecutionException e) {
             LOG.error("Unable to create default shard frontend for config shard", e);
         }
 
         try {
             defaultShardRegistrations.put(LogicalDatastoreType.OPERATIONAL,
                     initDefaultShard(LogicalDatastoreType.OPERATIONAL));
-        } catch (final DOMDataTreeProducerException | DOMDataTreeShardingConflictException e) {
+        } catch (final InterruptedException | ExecutionException e) {
             LOG.error("Unable to create default shard frontend for operational shard", e);
         }
     }
@@ -163,11 +173,9 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
     }
 
     @Override
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    //TODO: it would be better to block here until the message is processed by the actor
-    public DistributedShardRegistration createDistributedShard(
+    public CompletionStage<DistributedShardRegistration> createDistributedShard(
             final DOMDataTreeIdentifier prefix, final Collection<MemberName> replicaMembers)
-            throws DOMDataTreeShardingConflictException, DOMDataTreeProducerException {
+            throws DOMDataTreeShardingConflictException {
         final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookup =
                 shards.lookup(prefix);
         if (lookup != null && lookup.getValue().getPrefix().equals(prefix)) {
@@ -175,10 +183,27 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
                     "Prefix " + prefix + " is already occupied by another shard.");
         }
 
-        PrefixShardConfiguration config = new PrefixShardConfiguration(prefix, "prefix", replicaMembers);
-        shardedDataTreeActor.tell(new CreatePrefixShard(config), noSender());
-
-        return new DistributedShardRegistrationImpl(prefix, shardedDataTreeActor, this);
+        final PrefixShardConfiguration config = new PrefixShardConfiguration(prefix, "prefix", replicaMembers);
+
+        final Future<Object> ask =
+                Patterns.ask(shardedDataTreeActor, new CreatePrefixShard(config), SHARD_FUTURE_TIMEOUT);
+
+        final Future<DistributedShardRegistration> shardRegistrationFuture = ask.transform(
+                new Mapper<Object, DistributedShardRegistration>() {
+                    @Override
+                    public DistributedShardRegistration apply(final Object parameter) {
+                        return new DistributedShardRegistrationImpl(
+                                prefix, shardedDataTreeActor, DistributedShardedDOMDataTree.this);
+                    }
+                },
+                new Mapper<Throwable, Throwable>() {
+                    @Override
+                    public Throwable apply(final Throwable throwable) {
+                        return new DOMDataTreeShardCreationFailedException("Unable to create a cds shard.", throwable);
+                    }
+                }, actorSystem.dispatcher());
+
+        return FutureConverters.toJava(shardRegistrationFuture);
     }
 
     void resolveShardAdditions(final Set<DOMDataTreeIdentifier> additions) {
@@ -248,6 +273,12 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
         shards.remove(prefix);
     }
 
+    DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookupShardFrontend(
+            final DOMDataTreeIdentifier prefix) {
+        return shards.lookup(prefix);
+
+    }
+
     DOMDataTreeProducer localCreateProducer(final Collection<DOMDataTreeIdentifier> prefix) {
         return shardedDOMDataTree.createProducer(prefix);
     }
@@ -287,14 +318,23 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
     }
 
     private DistributedShardRegistration initDefaultShard(final LogicalDatastoreType logicalDatastoreType)
-            throws DOMDataTreeProducerException, DOMDataTreeShardingConflictException {
+            throws ExecutionException, InterruptedException {
         final Collection<Member> members = JavaConverters.asJavaCollectionConverter(
                 Cluster.get(actorSystem).state().members()).asJavaCollection();
         final Collection<MemberName> names = Collections2.transform(members,
             m -> MemberName.forName(m.roles().iterator().next()));
 
-        return createDistributedShard(
-                new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY), names);
+        try {
+            // we should probably only have one node create the default shards
+            return createDistributedShard(
+                    new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY), names)
+                    .toCompletableFuture().get();
+        } catch (DOMDataTreeShardingConflictException e) {
+            LOG.debug("Default shard already registered, possibly due to other node doing it faster");
+            return new DistributedShardRegistrationImpl(
+                    new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY),
+                    shardedDataTreeActor, this);
+        }
     }
 
     private static void closeProducer(final DOMDataTreeProducer producer) {
@@ -326,7 +366,7 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
         throw new IllegalStateException("Failed to create actor for ShardedDOMDataTree", lastException);
     }
 
-    private static class DistributedShardRegistrationImpl implements DistributedShardRegistration {
+    private class DistributedShardRegistrationImpl implements DistributedShardRegistration {
 
         private final DOMDataTreeIdentifier prefix;
         private final ActorRef shardedDataTreeActor;
@@ -341,11 +381,28 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
         }
 
         @Override
-        public void close() {
+        public CompletionStage<Void> close() {
             // first despawn on the local node
             distributedShardedDOMDataTree.despawnShardFrontend(prefix);
             // update the config so the remote nodes are updated
-            shardedDataTreeActor.tell(new RemovePrefixShard(prefix), noSender());
+            final Future<Object> ask =
+                    Patterns.ask(shardedDataTreeActor, new RemovePrefixShard(prefix), SHARD_FUTURE_TIMEOUT);
+
+            final Future<Void> closeFuture = ask.transform(
+                    new Mapper<Object, Void>() {
+                        @Override
+                        public Void apply(Object parameter) {
+                            return null;
+                        }
+                    },
+                    new Mapper<Throwable, Throwable>() {
+                        @Override
+                        public Throwable apply(Throwable throwable) {
+                            return throwable;
+                        }
+                    }, actorSystem.dispatcher());
+
+            return FutureConverters.toJava(closeFuture);
         }
     }