X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2FRpcBroker.java;h=2046e419d9f2602b444becf6986fe2c73bb9756e;hp=2aca655d2628eb9d89295d09419d0cd44f7491d7;hb=531621aac4cff9d39cbd8668a53bdeba8a0e6d81;hpb=7f8512fcbe4ac373995b7e2e370d38a01f4eaeec diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java index 2aca655d26..2046e419d9 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java @@ -8,25 +8,26 @@ package org.opendaylight.controller.remote.rpc; -import static akka.pattern.Patterns.ask; import akka.actor.ActorRef; import akka.actor.Props; import akka.dispatch.OnComplete; import akka.japi.Creator; import akka.japi.Pair; -import akka.util.Timeout; - +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.JdkFutureAdapters; +import com.google.common.util.concurrent.ListenableFuture; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc; import org.opendaylight.controller.remote.rpc.messages.InvokeRpc; import org.opendaylight.controller.remote.rpc.messages.RpcResponse; -import org.opendaylight.controller.remote.rpc.utils.LatestEntryRoutingLogic; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; -import org.opendaylight.controller.remote.rpc.utils.ActorUtil; +import org.opendaylight.controller.remote.rpc.utils.LatestEntryRoutingLogic; import org.opendaylight.controller.remote.rpc.utils.RoutingLogic; -import org.opendaylight.controller.xml.codec.XmlUtils; import org.opendaylight.controller.sal.connector.api.RpcRouter; import org.opendaylight.controller.sal.core.api.Broker; import org.opendaylight.controller.sal.core.api.Broker.ProviderSession; +import org.opendaylight.controller.xml.codec.XmlUtils; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; import org.opendaylight.yangtools.yang.common.RpcResult; @@ -36,16 +37,13 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.JdkFutureAdapters; -import com.google.common.util.concurrent.ListenableFuture; - import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.concurrent.Future; +import static akka.pattern.Patterns.ask; + /** * Actor to initiate execution of remote RPC on other nodes of the cluster. */ @@ -56,12 +54,14 @@ public class RpcBroker extends AbstractUntypedActor { private final Broker.ProviderSession brokerSession; private final ActorRef rpcRegistry; private final SchemaContext schemaContext; + private final RemoteRpcProviderConfig config; private RpcBroker(Broker.ProviderSession brokerSession, ActorRef rpcRegistry, SchemaContext schemaContext) { this.brokerSession = brokerSession; this.rpcRegistry = rpcRegistry; this.schemaContext = schemaContext; + config = new RemoteRpcProviderConfig(getContext().system().settings().config()); } public static Props props(Broker.ProviderSession brokerSession, ActorRef rpcRegistry, @@ -79,14 +79,14 @@ public class RpcBroker extends AbstractUntypedActor { } private void invokeRemoteRpc(final InvokeRpc msg) { - LOG.debug("Looking up the remote actor for rpc {}", msg.getRpc()); - + if(LOG.isDebugEnabled()) { + LOG.debug("Looking up the remote actor for rpc {}", msg.getRpc()); + } RpcRouter.RouteIdentifier routeId = new RouteIdentifierImpl( null, msg.getRpc(), msg.getIdentifier()); RpcRegistry.Messages.FindRouters findMsg = new RpcRegistry.Messages.FindRouters(routeId); - scala.concurrent.Future future = ask(rpcRegistry, findMsg, - new Timeout(ActorUtil.LOCAL_ASK_DURATION)); + scala.concurrent.Future future = ask(rpcRegistry, findMsg, config.getAskDuration()); final ActorRef sender = getSender(); final ActorRef self = self(); @@ -129,8 +129,7 @@ public class RpcBroker extends AbstractUntypedActor { ExecuteRpc executeMsg = new ExecuteRpc(XmlUtils.inputCompositeNodeToXml(msg.getInput(), schemaContext), msg.getRpc()); - scala.concurrent.Future future = ask(logic.select(), executeMsg, - new Timeout(ActorUtil.REMOTE_ASK_DURATION)); + scala.concurrent.Future future = ask(logic.select(), executeMsg, config.getAskDuration()); OnComplete onComplete = new OnComplete() { @Override @@ -149,8 +148,9 @@ public class RpcBroker extends AbstractUntypedActor { } private void executeRpc(final ExecuteRpc msg) { - LOG.debug("Executing rpc {}", msg.getRpc()); - + if(LOG.isDebugEnabled()) { + LOG.debug("Executing rpc {}", msg.getRpc()); + } Future> future = brokerSession.rpc(msg.getRpc(), XmlUtils.inputXmlToCompositeNode(msg.getRpc(), msg.getInputCompositeNode(), schemaContext));