From: Tony Tkacik Date: Tue, 9 Jun 2015 16:59:16 +0000 (+0200) Subject: Remote RPC Broker: Make Futures non-blocking. X-Git-Tag: release/lithium~44 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=c89c9a5f25ea53a002a3cd0672f6e86b7a876f05 Remote RPC Broker: Make Futures non-blocking. Change-Id: I06eedf5eea33458f88b3bad94afcc8c1d85a9c19 Signed-off-by: Tony Tkacik --- diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteDOMRpcException.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteDOMRpcException.java new file mode 100644 index 0000000000..b0e9d4e786 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteDOMRpcException.java @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc; + +import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; + +class RemoteDOMRpcException extends DOMRpcException { + + private static final long serialVersionUID = 1L; + + public RemoteDOMRpcException(final String message,final Throwable cause) { + super(message,cause); + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteDOMRpcFuture.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteDOMRpcFuture.java new file mode 100644 index 0000000000..e97a499fdb --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteDOMRpcFuture.java @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc; + +import akka.dispatch.OnComplete; +import com.google.common.base.Throwables; +import com.google.common.util.concurrent.AbstractFuture; +import com.google.common.util.concurrent.CheckedFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; +import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult; +import org.opendaylight.controller.remote.rpc.messages.RpcResponse; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.ExecutionContext; +import scala.concurrent.Future; + +class RemoteDOMRpcFuture extends AbstractFuture implements CheckedFuture { + + private static final Logger LOG = LoggerFactory.getLogger(RemoteDOMRpcFuture.class); + + private RemoteDOMRpcFuture(final Future future) { + future.onComplete(new FutureUpdater(), ExecutionContext.Implicits$.MODULE$.global()); + } + + public static CheckedFuture from(final Future future) { + return new RemoteDOMRpcFuture(future); + } + + @Override + public DOMRpcResult checkedGet() throws DOMRpcException { + try { + return get(); + } catch (final ExecutionException e) { + throw mapException(e); + } catch (final InterruptedException e) { + throw Throwables.propagate(e); + } + } + + @Override + public DOMRpcResult checkedGet(final long timeout, final TimeUnit unit) throws TimeoutException, DOMRpcException { + try { + return get(timeout, unit); + } catch (final ExecutionException e) { + throw mapException(e); + } catch (final InterruptedException e) { + throw Throwables.propagate(e); + } + } + + private DOMRpcException mapException(final ExecutionException e) { + final Throwable cause = e.getCause(); + if (cause instanceof DOMRpcException) { + return (DOMRpcException) cause; + } + return new RemoteDOMRpcException("Exception during invoking RPC", e); + } + + private final class FutureUpdater extends OnComplete { + + @Override + public void onComplete(final Throwable error, final Object reply) throws Throwable { + if (error != null) { + RemoteDOMRpcFuture.this.setException(error); + } else if (reply instanceof RpcResponse) { + final RpcResponse rpcReply = (RpcResponse) reply; + final NormalizedNode result; + if (rpcReply.getResultNormalizedNode() == null) { + result = null; + LOG.debug("Received response for invoke rpc: result is null"); + } else { + result = NormalizedNodeSerializer.deSerialize(rpcReply.getResultNormalizedNode()); + LOG.debug("Received response for invoke rpc: result is {}", result); + } + RemoteDOMRpcFuture.this.set(new DefaultDOMRpcResult(result)); + } + RemoteDOMRpcFuture.this.setException(new IllegalStateException("Incorrect reply type " + reply + + "from Akka")); + } + } + +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java index 2a4ea2e3e6..404a109741 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java @@ -1,35 +1,17 @@ package org.opendaylight.controller.remote.rpc; import static akka.pattern.Patterns.ask; + import akka.actor.ActorRef; -import akka.dispatch.OnComplete; import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.JdkFutureAdapters; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; -import java.util.Arrays; -import java.util.Collection; -import java.util.concurrent.ExecutionException; -import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer; import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier; import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementation; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationNotAvailableException; import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; -import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult; import org.opendaylight.controller.remote.rpc.messages.InvokeRpc; -import org.opendaylight.controller.remote.rpc.messages.RpcResponse; -import org.opendaylight.yangtools.yang.common.RpcError; -import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; -import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.concurrent.ExecutionContext; public class RemoteRpcImplementation implements DOMRpcImplementation { - private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcImplementation.class); private final ActorRef rpcBroker; private final RemoteRpcProviderConfig config; @@ -41,68 +23,7 @@ public class RemoteRpcImplementation implements DOMRpcImplementation { @Override public CheckedFuture invokeRpc(final DOMRpcIdentifier rpc, final NormalizedNode input) { final InvokeRpc rpcMsg = new InvokeRpc(rpc.getType().getLastComponent(), rpc.getContextReference(), input); - - final SettableFuture settableFuture = SettableFuture.create(); - - final ListenableFuture listenableFuture = - JdkFutureAdapters.listenInPoolThread(settableFuture); - final scala.concurrent.Future future = ask(rpcBroker, rpcMsg, config.getAskDuration()); - - final OnComplete onComplete = new OnComplete() { - @Override - public void onComplete(final Throwable failure, final Object reply) throws Throwable { - if(failure != null) { - - // When we return a failure to the caller they can choose to log it if they like - // so here we just do basic warn logging by default and log the stack trace only when debug - // is enabled - - LOG.warn("InvokeRpc failed rpc = {}, identifier = {}", rpcMsg.getRpc(), rpcMsg.getIdentifier()); - - if(LOG.isDebugEnabled()){ - LOG.debug("Detailed Error", failure); - } - - final String message = String.format("Execution of RPC %s failed because of %s", - rpcMsg.getRpc(), failure.getMessage()); - Collection errors = Arrays.asList(RpcResultBuilder.newError(ErrorType.RPC, null, message)); - if(failure instanceof RpcErrorsException) { - errors = ((RpcErrorsException) failure).getRpcErrors(); - if (errors == null || errors.size() == 0) { - errors = Arrays.asList(RpcResultBuilder.newError(ErrorType.RPC, null, message)); - } - } - final DOMRpcResult rpcResult = new DefaultDOMRpcResult(errors); - - settableFuture.set(rpcResult); - return; - } - - final RpcResponse rpcReply = (RpcResponse)reply; - final NormalizedNode result; - - if(rpcReply.getResultNormalizedNode() == null){ - result = null; - LOG.debug("Received response for invoke rpc : {} result is null", rpcMsg.getRpc()); - } else { - result = NormalizedNodeSerializer.deSerialize(rpcReply.getResultNormalizedNode()); - LOG.debug("Received response for invoke rpc : {} result : {}", rpcMsg.getRpc(), result); - } - - settableFuture.set(new DefaultDOMRpcResult(result)); - } - }; - - - future.onComplete(onComplete, ExecutionContext.Implicits$.MODULE$.global()); - // FIXME find non blocking way for implementation - try { - return Futures.immediateCheckedFuture(listenableFuture.get()); - } - catch (InterruptedException | ExecutionException e) { - LOG.debug("Unexpected remote RPC exception.", e); - return Futures.immediateFailedCheckedFuture((DOMRpcException) new DOMRpcImplementationNotAvailableException(e, "Unexpected remote RPC exception")); - } + return RemoteDOMRpcFuture.from(future); } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactory.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactory.java index d17242ed60..83101a77b8 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactory.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactory.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java index 4dee5dabb7..d88bb88017 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.remote.rpc; import static akka.pattern.Patterns.ask; + import akka.actor.ActorRef; import akka.actor.Props; import akka.dispatch.OnComplete; @@ -26,6 +27,7 @@ import java.util.List; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer; import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationNotAvailableException; import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.Node; @@ -104,14 +106,10 @@ public class RpcBroker extends AbstractUntypedActor { final List> actorRefList = findReply.getRouterWithUpdateTime(); if(actorRefList == null || actorRefList.isEmpty()) { - final String message = String.format( - "No remote implementation found for rpc %s", msg.getRpc()); - sender.tell(new akka.actor.Status.Failure(new RpcErrorsException( - message, Arrays.asList(RpcResultBuilder.newError(ErrorType.RPC, - "operation-not-supported", message)))), self); + sender.tell(new akka.actor.Status.Failure(new DOMRpcImplementationNotAvailableException( + "No remote implementation available for rpc %s", msg.getRpc())), self); return; } - finishInvokeRpc(actorRefList, msg, sender, self); } };