--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.common.actor;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.pattern.ExplicitAskSupport;
+import akka.util.Timeout;
+import com.google.common.annotations.Beta;
+import java.util.function.Function;
+import scala.Function1;
+import scala.concurrent.Future;
+import scala.runtime.AbstractFunction1;
+
+/**
+ * Unfortunately Akka's explicit ask pattern does not work with its Java API, as it fails to invoke passed message.
+ * In order to make this work for now, we tap directly into ExplicitAskSupport and use a Scala function instead
+ * of akka.japi.Function.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class ExplicitAsk {
+ private static final ExplicitAskSupport ASK_SUPPORT = akka.pattern.extended.package$.MODULE$;
+
+ private ExplicitAsk() {
+ throw new UnsupportedOperationException();
+ }
+
+ public static <T> Function1<ActorRef, T> toScala(final Function<ActorRef, T> function) {
+ return new AbstractFunction1<ActorRef, T>() {
+ @Override
+ public T apply(final ActorRef askSender) {
+ return function.apply(askSender);
+ }
+ };
+ }
+
+ @SuppressWarnings("unchecked")
+ public static Future<Object> ask(final ActorRef actor, final Function1<ActorRef, ?> function,
+ final Timeout timeout) {
+ return ASK_SUPPORT.ask(actor, (Function1<ActorRef, Object>)function, timeout);
+ }
+
+ @SuppressWarnings("unchecked")
+ public static Future<Object> ask(final ActorSelection actor, final Function1<ActorRef, ?> function,
+ final Timeout timeout) {
+ return ASK_SUPPORT.ask(actor, (Function1<ActorRef, Object>)function, timeout);
+ }
+
+ public static Future<Object> ask(final ActorRef actor, final Function<ActorRef, ?> function, final Timeout timeout) {
+ return ask(actor, toScala(function), timeout);
+ }
+
+ public static Future<Object> ask(final ActorSelection actor, final Function<ActorRef, ?> function,
+ final Timeout timeout) {
+ return ask(actor, toScala(function), timeout);
+ }
+}
import akka.actor.ActorRef;
import akka.actor.Props;
-import akka.pattern.ExplicitAskSupport;
import akka.util.Timeout;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
import org.opendaylight.controller.cluster.access.concepts.FrontendType;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.common.actor.ExplicitAsk;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import scala.Function1;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;
-import scala.runtime.AbstractFunction1;
/**
* A {@link AbstractClientActor} which acts as the point of contact for DistributedDataStore.
* @author Robert Varga
*/
public final class DistributedDataStoreClientActor extends AbstractClientActor {
- // Unfortunately Akka's explicit ask pattern does not work with its Java API, as it fails to invoke passed message.
- // In order to make this work for now, we tap directly into ExplicitAskSupport and use a Scala function instead
- // of akka.japi.Function.
- private static final ExplicitAskSupport ASK_SUPPORT = akka.pattern.extended.package$.MODULE$;
- private static final Function1<ActorRef, Object> GET_CLIENT_FACTORY = new AbstractFunction1<ActorRef, Object>() {
- @Override
- public Object apply(final ActorRef askSender) {
- return new GetClientRequest(askSender);
- }
- };
+ private static final Function1<ActorRef, ?> GET_CLIENT_FACTORY = ExplicitAsk.toScala(t -> new GetClientRequest(t));
private final ActorContext actorContext;
public static DistributedDataStoreClient getDistributedDataStoreClient(final @Nonnull ActorRef actor,
final long timeout, final TimeUnit unit) {
try {
- return (DistributedDataStoreClient) Await.result(ASK_SUPPORT.ask(actor, GET_CLIENT_FACTORY,
+ return (DistributedDataStoreClient) Await.result(ExplicitAsk.ask(actor, GET_CLIENT_FACTORY,
Timeout.apply(timeout, unit)), Duration.Inf());
} catch (Exception e) {
throw Throwables.propagate(e);
DistributedDataStoreClientBehavior(final ClientActorContext context, final ActorContext actorContext) {
super(context);
- resolver = new ModuleShardBackendResolver(actorContext);
+ resolver = new ModuleShardBackendResolver(context.getIdentifier(), actorContext);
singleHistory = new SingleClientHistory(this, new LocalHistoryIdentifier(getIdentifier(), 0));
}
package org.opendaylight.controller.cluster.databroker.actors.dds;
import akka.actor.ActorRef;
-import akka.japi.Function;
-import akka.pattern.Patterns;
import akka.util.Timeout;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
+import org.opendaylight.controller.cluster.common.actor.ExplicitAsk;
import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.Function1;
import scala.compat.java8.FutureConverters;
/**
private final ActorContext actorContext;
// FIXME: this counter should be in superclass somewhere
private final AtomicLong nextSessionId = new AtomicLong();
+ private final Function1<ActorRef, ?> connectFunction;
@GuardedBy("this")
private long nextShard = 1;
private volatile BiMap<String, Long> shards = ImmutableBiMap.of(DefaultShardStrategy.DEFAULT_SHARD, 0L);
// FIXME: we really need just ActorContext.findPrimaryShardAsync()
- ModuleShardBackendResolver(final ActorContext actorContext) {
+ ModuleShardBackendResolver(final ClientIdentifier clientId, final ActorContext actorContext) {
this.actorContext = Preconditions.checkNotNull(actorContext);
+ this.connectFunction = ExplicitAsk.toScala(t -> new ConnectClientRequest(clientId, t, ABIVersion.BORON,
+ ABIVersion.current()));
}
@Override
FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)).thenCompose(info -> {
LOG.debug("Looking up primary info for {} from {}", shardName, info);
- return FutureConverters.toJava(Patterns.ask(info.getPrimaryShardActor(),
- (Function<ActorRef, Object>) replyTo -> new ConnectClientRequest(null, replyTo,
- ABIVersion.BORON, ABIVersion.current()), DEAD_TIMEOUT));
+ return FutureConverters.toJava(ExplicitAsk.ask(info.getPrimaryShardActor(), connectFunction, DEAD_TIMEOUT));
}).thenApply(response -> {
if (response instanceof RequestFailure) {
final RequestFailure<?, ?> failure = (RequestFailure<?, ?>) response;