X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2FRpcBroker.java;h=31aac92051f1ee6948cbefcf4c68e49d926c5da0;hp=2aca655d2628eb9d89295d09419d0cd44f7491d7;hb=b09693b984ad221a7effeeec0d72ef20be3c4e65;hpb=c33b2b55b2eae406df001619885a0610800cb951 diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java index 2aca655d26..31aac92051 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java @@ -8,25 +8,29 @@ package org.opendaylight.controller.remote.rpc; -import static akka.pattern.Patterns.ask; import akka.actor.ActorRef; import akka.actor.Props; import akka.dispatch.OnComplete; import akka.japi.Creator; import akka.japi.Pair; -import akka.util.Timeout; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.JdkFutureAdapters; +import com.google.common.util.concurrent.ListenableFuture; + +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc; import org.opendaylight.controller.remote.rpc.messages.InvokeRpc; import org.opendaylight.controller.remote.rpc.messages.RpcResponse; -import org.opendaylight.controller.remote.rpc.utils.LatestEntryRoutingLogic; +import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; -import org.opendaylight.controller.remote.rpc.utils.ActorUtil; +import org.opendaylight.controller.remote.rpc.utils.LatestEntryRoutingLogic; import org.opendaylight.controller.remote.rpc.utils.RoutingLogic; -import org.opendaylight.controller.xml.codec.XmlUtils; import org.opendaylight.controller.sal.connector.api.RpcRouter; import org.opendaylight.controller.sal.core.api.Broker; import org.opendaylight.controller.sal.core.api.Broker.ProviderSession; +import org.opendaylight.controller.xml.codec.XmlUtils; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; import org.opendaylight.yangtools.yang.common.RpcResult; @@ -36,16 +40,13 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.JdkFutureAdapters; -import com.google.common.util.concurrent.ListenableFuture; - import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.concurrent.Future; +import static akka.pattern.Patterns.ask; + /** * Actor to initiate execution of remote RPC on other nodes of the cluster. */ @@ -55,13 +56,15 @@ public class RpcBroker extends AbstractUntypedActor { private static final Logger LOG = LoggerFactory.getLogger(RpcBroker.class); private final Broker.ProviderSession brokerSession; private final ActorRef rpcRegistry; - private final SchemaContext schemaContext; + private SchemaContext schemaContext; + private final RemoteRpcProviderConfig config; private RpcBroker(Broker.ProviderSession brokerSession, ActorRef rpcRegistry, SchemaContext schemaContext) { this.brokerSession = brokerSession; this.rpcRegistry = rpcRegistry; this.schemaContext = schemaContext; + config = new RemoteRpcProviderConfig(getContext().system().settings().config()); } public static Props props(Broker.ProviderSession brokerSession, ActorRef rpcRegistry, @@ -75,18 +78,24 @@ public class RpcBroker extends AbstractUntypedActor { invokeRemoteRpc((InvokeRpc) message); } else if(message instanceof ExecuteRpc) { executeRpc((ExecuteRpc) message); + } else if(message instanceof UpdateSchemaContext) { + updateSchemaContext((UpdateSchemaContext) message); } } - private void invokeRemoteRpc(final InvokeRpc msg) { - LOG.debug("Looking up the remote actor for rpc {}", msg.getRpc()); + private void updateSchemaContext(UpdateSchemaContext message) { + this.schemaContext = message.getSchemaContext(); + } + private void invokeRemoteRpc(final InvokeRpc msg) { + if(LOG.isDebugEnabled()) { + LOG.debug("Looking up the remote actor for rpc {}", msg.getRpc()); + } RpcRouter.RouteIdentifier routeId = new RouteIdentifierImpl( null, msg.getRpc(), msg.getIdentifier()); RpcRegistry.Messages.FindRouters findMsg = new RpcRegistry.Messages.FindRouters(routeId); - scala.concurrent.Future future = ask(rpcRegistry, findMsg, - new Timeout(ActorUtil.LOCAL_ASK_DURATION)); + scala.concurrent.Future future = ask(rpcRegistry, findMsg, config.getAskDuration()); final ActorRef sender = getSender(); final ActorRef self = self(); @@ -129,8 +138,7 @@ public class RpcBroker extends AbstractUntypedActor { ExecuteRpc executeMsg = new ExecuteRpc(XmlUtils.inputCompositeNodeToXml(msg.getInput(), schemaContext), msg.getRpc()); - scala.concurrent.Future future = ask(logic.select(), executeMsg, - new Timeout(ActorUtil.REMOTE_ASK_DURATION)); + scala.concurrent.Future future = ask(logic.select(), executeMsg, config.getAskDuration()); OnComplete onComplete = new OnComplete() { @Override @@ -149,8 +157,9 @@ public class RpcBroker extends AbstractUntypedActor { } private void executeRpc(final ExecuteRpc msg) { - LOG.debug("Executing rpc {}", msg.getRpc()); - + if(LOG.isDebugEnabled()) { + LOG.debug("Executing rpc {}", msg.getRpc()); + } Future> future = brokerSession.rpc(msg.getRpc(), XmlUtils.inputXmlToCompositeNode(msg.getRpc(), msg.getInputCompositeNode(), schemaContext));