BUG-5280: add ExplicitAsk utility class 54/44654/1
authorRobert Varga <rovarga@cisco.com>
Thu, 25 Aug 2016 09:31:24 +0000 (11:31 +0200)
committerRobert Varga <rovarga@cisco.com>
Thu, 25 Aug 2016 10:40:48 +0000 (12:40 +0200)
Akka's support for Explicit Ask with Java functions
is broken and requires a workaround. This patch moves
the previous implementation site to a reusable place
and fixes a caller which was broken.

Change-Id: Idc0fc961b1808c23e01a4ae8c4eafdc93b7974f2
Signed-off-by: Robert Varga <rovarga@cisco.com>
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/ExplicitAsk.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientActor.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java

diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/ExplicitAsk.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/ExplicitAsk.java
new file mode 100644 (file)
index 0000000..c1f7c8b
--- /dev/null
@@ -0,0 +1,64 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.common.actor;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.pattern.ExplicitAskSupport;
+import akka.util.Timeout;
+import com.google.common.annotations.Beta;
+import java.util.function.Function;
+import scala.Function1;
+import scala.concurrent.Future;
+import scala.runtime.AbstractFunction1;
+
+/**
+ * Unfortunately Akka's explicit ask pattern does not work with its Java API, as it fails to invoke passed message.
+ * In order to make this work for now, we tap directly into ExplicitAskSupport and use a Scala function instead
+ * of akka.japi.Function.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class ExplicitAsk {
+    private static final ExplicitAskSupport ASK_SUPPORT = akka.pattern.extended.package$.MODULE$;
+
+    private ExplicitAsk() {
+        throw new UnsupportedOperationException();
+    }
+
+    public static <T> Function1<ActorRef, T> toScala(final Function<ActorRef, T> function) {
+        return new AbstractFunction1<ActorRef, T>() {
+            @Override
+            public T apply(final ActorRef askSender) {
+                return function.apply(askSender);
+            }
+        };
+    }
+
+    @SuppressWarnings("unchecked")
+    public static Future<Object> ask(final ActorRef actor, final Function1<ActorRef, ?> function,
+            final Timeout timeout) {
+        return ASK_SUPPORT.ask(actor, (Function1<ActorRef, Object>)function, timeout);
+    }
+
+    @SuppressWarnings("unchecked")
+    public static Future<Object> ask(final ActorSelection actor, final Function1<ActorRef, ?> function,
+            final Timeout timeout) {
+        return ASK_SUPPORT.ask(actor, (Function1<ActorRef, Object>)function, timeout);
+    }
+
+    public static Future<Object> ask(final ActorRef actor, final Function<ActorRef, ?> function, final Timeout timeout) {
+        return ask(actor, toScala(function), timeout);
+    }
+
+    public static Future<Object> ask(final ActorSelection actor, final Function<ActorRef, ?> function,
+            final Timeout timeout) {
+        return ask(actor, toScala(function), timeout);
+    }
+}
index b0c518d41f5042a9bd4590f115921eecc9bdd44b..de2c19572c1bdec1ae03b510000a5fa3ea5c3591 100644 (file)
@@ -9,7 +9,6 @@ package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import akka.actor.ActorRef;
 import akka.actor.Props;
-import akka.pattern.ExplicitAskSupport;
 import akka.util.Timeout;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
@@ -20,11 +19,11 @@ import org.opendaylight.controller.cluster.access.client.ClientActorContext;
 import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.FrontendType;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.common.actor.ExplicitAsk;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import scala.Function1;
 import scala.concurrent.Await;
 import scala.concurrent.duration.Duration;
-import scala.runtime.AbstractFunction1;
 
 /**
  * A {@link AbstractClientActor} which acts as the point of contact for DistributedDataStore.
@@ -32,16 +31,7 @@ import scala.runtime.AbstractFunction1;
  * @author Robert Varga
  */
 public final class DistributedDataStoreClientActor extends AbstractClientActor {
-    // Unfortunately Akka's explicit ask pattern does not work with its Java API, as it fails to invoke passed message.
-    // In order to make this work for now, we tap directly into ExplicitAskSupport and use a Scala function instead
-    // of akka.japi.Function.
-    private static final ExplicitAskSupport ASK_SUPPORT = akka.pattern.extended.package$.MODULE$;
-    private static final Function1<ActorRef, Object> GET_CLIENT_FACTORY = new AbstractFunction1<ActorRef, Object>() {
-        @Override
-        public Object apply(final ActorRef askSender) {
-            return new GetClientRequest(askSender);
-        }
-    };
+    private static final Function1<ActorRef, ?> GET_CLIENT_FACTORY = ExplicitAsk.toScala(t -> new GetClientRequest(t));
 
     private final ActorContext actorContext;
 
@@ -65,7 +55,7 @@ public final class DistributedDataStoreClientActor extends AbstractClientActor {
     public static DistributedDataStoreClient getDistributedDataStoreClient(final @Nonnull ActorRef actor,
             final long timeout, final TimeUnit unit) {
         try {
-            return (DistributedDataStoreClient) Await.result(ASK_SUPPORT.ask(actor, GET_CLIENT_FACTORY,
+            return (DistributedDataStoreClient) Await.result(ExplicitAsk.ask(actor, GET_CLIENT_FACTORY,
                 Timeout.apply(timeout, unit)), Duration.Inf());
         } catch (Exception e) {
             throw Throwables.propagate(e);
index dd4f1aadc6ef702ac61949d17cc6ada2a37b3630..e3e781e4db8eb0d75488b29059ab555b37afa125 100644 (file)
@@ -65,7 +65,7 @@ final class DistributedDataStoreClientBehavior extends ClientActorBehavior imple
 
     DistributedDataStoreClientBehavior(final ClientActorContext context, final ActorContext actorContext) {
         super(context);
-        resolver = new ModuleShardBackendResolver(actorContext);
+        resolver = new ModuleShardBackendResolver(context.getIdentifier(), actorContext);
         singleHistory = new SingleClientHistory(this, new LocalHistoryIdentifier(getIdentifier(), 0));
     }
 
index 359b428c99d5060f59baa45ca2414a34eba46f98..249cd4524d5d1f2ea1b925008c035e850abb8a5e 100644 (file)
@@ -8,8 +8,6 @@
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import akka.actor.ActorRef;
-import akka.japi.Function;
-import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
@@ -27,12 +25,15 @@ import org.opendaylight.controller.cluster.access.client.BackendInfo;
 import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
 import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
 import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
+import org.opendaylight.controller.cluster.common.actor.ExplicitAsk;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Function1;
 import scala.compat.java8.FutureConverters;
 
 /**
@@ -57,6 +58,7 @@ final class ModuleShardBackendResolver extends BackendInfoResolver<ShardBackendI
     private final ActorContext actorContext;
     // FIXME: this counter should be in superclass somewhere
     private final AtomicLong nextSessionId = new AtomicLong();
+    private final Function1<ActorRef, ?> connectFunction;
 
     @GuardedBy("this")
     private long nextShard = 1;
@@ -64,8 +66,10 @@ final class ModuleShardBackendResolver extends BackendInfoResolver<ShardBackendI
     private volatile BiMap<String, Long> shards = ImmutableBiMap.of(DefaultShardStrategy.DEFAULT_SHARD, 0L);
 
     // FIXME: we really need just ActorContext.findPrimaryShardAsync()
-    ModuleShardBackendResolver(final ActorContext actorContext) {
+    ModuleShardBackendResolver(final ClientIdentifier clientId, final ActorContext actorContext) {
         this.actorContext = Preconditions.checkNotNull(actorContext);
+        this.connectFunction = ExplicitAsk.toScala(t -> new ConnectClientRequest(clientId, t, ABIVersion.BORON,
+            ABIVersion.current()));
     }
 
     @Override
@@ -112,9 +116,7 @@ final class ModuleShardBackendResolver extends BackendInfoResolver<ShardBackendI
 
         FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)).thenCompose(info -> {
             LOG.debug("Looking up primary info for {} from {}", shardName, info);
-            return FutureConverters.toJava(Patterns.ask(info.getPrimaryShardActor(),
-                (Function<ActorRef, Object>) replyTo -> new ConnectClientRequest(null, replyTo,
-                    ABIVersion.BORON, ABIVersion.current()), DEAD_TIMEOUT));
+            return FutureConverters.toJava(ExplicitAsk.ask(info.getPrimaryShardActor(), connectFunction, DEAD_TIMEOUT));
         }).thenApply(response -> {
             if (response instanceof RequestFailure) {
                 final RequestFailure<?, ?> failure = (RequestFailure<?, ?>) response;