From 2ebf9ef718ea7ddd790784a6d241e68ef8d1c564 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Thu, 25 Aug 2016 11:31:24 +0200 Subject: [PATCH] BUG-5280: add ExplicitAsk utility class Akka's support for Explicit Ask with Java functions is broken and requires a workaround. This patch moves the previous implementation site to a reusable place and fixes a caller which was broken. Change-Id: Idc0fc961b1808c23e01a4ae8c4eafdc93b7974f2 Signed-off-by: Robert Varga --- .../cluster/common/actor/ExplicitAsk.java | 64 +++++++++++++++++++ .../dds/DistributedDataStoreClientActor.java | 16 +---- .../DistributedDataStoreClientBehavior.java | 2 +- .../dds/ModuleShardBackendResolver.java | 14 ++-- 4 files changed, 76 insertions(+), 20 deletions(-) create mode 100644 opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/ExplicitAsk.java diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/ExplicitAsk.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/ExplicitAsk.java new file mode 100644 index 0000000000..c1f7c8b2e5 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/ExplicitAsk.java @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.common.actor; + +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.pattern.ExplicitAskSupport; +import akka.util.Timeout; +import com.google.common.annotations.Beta; +import java.util.function.Function; +import scala.Function1; +import scala.concurrent.Future; +import scala.runtime.AbstractFunction1; + +/** + * Unfortunately Akka's explicit ask pattern does not work with its Java API, as it fails to invoke passed message. + * In order to make this work for now, we tap directly into ExplicitAskSupport and use a Scala function instead + * of akka.japi.Function. + * + * @author Robert Varga + */ +@Beta +public final class ExplicitAsk { + private static final ExplicitAskSupport ASK_SUPPORT = akka.pattern.extended.package$.MODULE$; + + private ExplicitAsk() { + throw new UnsupportedOperationException(); + } + + public static Function1 toScala(final Function function) { + return new AbstractFunction1() { + @Override + public T apply(final ActorRef askSender) { + return function.apply(askSender); + } + }; + } + + @SuppressWarnings("unchecked") + public static Future ask(final ActorRef actor, final Function1 function, + final Timeout timeout) { + return ASK_SUPPORT.ask(actor, (Function1)function, timeout); + } + + @SuppressWarnings("unchecked") + public static Future ask(final ActorSelection actor, final Function1 function, + final Timeout timeout) { + return ASK_SUPPORT.ask(actor, (Function1)function, timeout); + } + + public static Future ask(final ActorRef actor, final Function function, final Timeout timeout) { + return ask(actor, toScala(function), timeout); + } + + public static Future ask(final ActorSelection actor, final Function function, + final Timeout timeout) { + return ask(actor, toScala(function), timeout); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientActor.java index b0c518d41f..de2c19572c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientActor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientActor.java @@ -9,7 +9,6 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import akka.actor.ActorRef; import akka.actor.Props; -import akka.pattern.ExplicitAskSupport; import akka.util.Timeout; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; @@ -20,11 +19,11 @@ import org.opendaylight.controller.cluster.access.client.ClientActorContext; import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier; import org.opendaylight.controller.cluster.access.concepts.FrontendType; import org.opendaylight.controller.cluster.access.concepts.MemberName; +import org.opendaylight.controller.cluster.common.actor.ExplicitAsk; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import scala.Function1; import scala.concurrent.Await; import scala.concurrent.duration.Duration; -import scala.runtime.AbstractFunction1; /** * A {@link AbstractClientActor} which acts as the point of contact for DistributedDataStore. @@ -32,16 +31,7 @@ import scala.runtime.AbstractFunction1; * @author Robert Varga */ public final class DistributedDataStoreClientActor extends AbstractClientActor { - // Unfortunately Akka's explicit ask pattern does not work with its Java API, as it fails to invoke passed message. - // In order to make this work for now, we tap directly into ExplicitAskSupport and use a Scala function instead - // of akka.japi.Function. - private static final ExplicitAskSupport ASK_SUPPORT = akka.pattern.extended.package$.MODULE$; - private static final Function1 GET_CLIENT_FACTORY = new AbstractFunction1() { - @Override - public Object apply(final ActorRef askSender) { - return new GetClientRequest(askSender); - } - }; + private static final Function1 GET_CLIENT_FACTORY = ExplicitAsk.toScala(t -> new GetClientRequest(t)); private final ActorContext actorContext; @@ -65,7 +55,7 @@ public final class DistributedDataStoreClientActor extends AbstractClientActor { public static DistributedDataStoreClient getDistributedDataStoreClient(final @Nonnull ActorRef actor, final long timeout, final TimeUnit unit) { try { - return (DistributedDataStoreClient) Await.result(ASK_SUPPORT.ask(actor, GET_CLIENT_FACTORY, + return (DistributedDataStoreClient) Await.result(ExplicitAsk.ask(actor, GET_CLIENT_FACTORY, Timeout.apply(timeout, unit)), Duration.Inf()); } catch (Exception e) { throw Throwables.propagate(e); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java index dd4f1aadc6..e3e781e4db 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java @@ -65,7 +65,7 @@ final class DistributedDataStoreClientBehavior extends ClientActorBehavior imple DistributedDataStoreClientBehavior(final ClientActorContext context, final ActorContext actorContext) { super(context); - resolver = new ModuleShardBackendResolver(actorContext); + resolver = new ModuleShardBackendResolver(context.getIdentifier(), actorContext); singleHistory = new SingleClientHistory(this, new LocalHistoryIdentifier(getIdentifier(), 0)); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java index 359b428c99..249cd4524d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java @@ -8,8 +8,6 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import akka.actor.ActorRef; -import akka.japi.Function; -import akka.pattern.Patterns; import akka.util.Timeout; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; @@ -27,12 +25,15 @@ import org.opendaylight.controller.cluster.access.client.BackendInfo; import org.opendaylight.controller.cluster.access.client.BackendInfoResolver; import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest; import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess; +import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; import org.opendaylight.controller.cluster.access.concepts.RequestFailure; +import org.opendaylight.controller.cluster.common.actor.ExplicitAsk; import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.Function1; import scala.compat.java8.FutureConverters; /** @@ -57,6 +58,7 @@ final class ModuleShardBackendResolver extends BackendInfoResolver connectFunction; @GuardedBy("this") private long nextShard = 1; @@ -64,8 +66,10 @@ final class ModuleShardBackendResolver extends BackendInfoResolver shards = ImmutableBiMap.of(DefaultShardStrategy.DEFAULT_SHARD, 0L); // FIXME: we really need just ActorContext.findPrimaryShardAsync() - ModuleShardBackendResolver(final ActorContext actorContext) { + ModuleShardBackendResolver(final ClientIdentifier clientId, final ActorContext actorContext) { this.actorContext = Preconditions.checkNotNull(actorContext); + this.connectFunction = ExplicitAsk.toScala(t -> new ConnectClientRequest(clientId, t, ABIVersion.BORON, + ABIVersion.current())); } @Override @@ -112,9 +116,7 @@ final class ModuleShardBackendResolver extends BackendInfoResolver { LOG.debug("Looking up primary info for {} from {}", shardName, info); - return FutureConverters.toJava(Patterns.ask(info.getPrimaryShardActor(), - (Function) replyTo -> new ConnectClientRequest(null, replyTo, - ABIVersion.BORON, ABIVersion.current()), DEAD_TIMEOUT)); + return FutureConverters.toJava(ExplicitAsk.ask(info.getPrimaryShardActor(), connectFunction, DEAD_TIMEOUT)); }).thenApply(response -> { if (response instanceof RequestFailure) { final RequestFailure failure = (RequestFailure) response; -- 2.36.6