package org.opendaylight.controller.remote.rpc;
-import static akka.pattern.Patterns.ask;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.dispatch.OnComplete;
import akka.japi.Creator;
import akka.japi.Pair;
-import akka.util.Timeout;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
-import org.opendaylight.controller.remote.rpc.utils.LatestEntryRoutingLogic;
+import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
+import org.opendaylight.controller.remote.rpc.utils.LatestEntryRoutingLogic;
import org.opendaylight.controller.remote.rpc.utils.RoutingLogic;
-import org.opendaylight.controller.xml.codec.XmlUtils;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
import org.opendaylight.controller.sal.core.api.Broker;
import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
+import org.opendaylight.controller.xml.codec.XmlUtils;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.JdkFutureAdapters;
-import com.google.common.util.concurrent.ListenableFuture;
-
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Future;
+import static akka.pattern.Patterns.ask;
+
/**
* Actor to initiate execution of remote RPC on other nodes of the cluster.
*/
private static final Logger LOG = LoggerFactory.getLogger(RpcBroker.class);
private final Broker.ProviderSession brokerSession;
private final ActorRef rpcRegistry;
- private final SchemaContext schemaContext;
+ private SchemaContext schemaContext;
+ private final RemoteRpcProviderConfig config;
private RpcBroker(Broker.ProviderSession brokerSession, ActorRef rpcRegistry,
SchemaContext schemaContext) {
this.brokerSession = brokerSession;
this.rpcRegistry = rpcRegistry;
this.schemaContext = schemaContext;
+ config = new RemoteRpcProviderConfig(getContext().system().settings().config());
}
public static Props props(Broker.ProviderSession brokerSession, ActorRef rpcRegistry,
invokeRemoteRpc((InvokeRpc) message);
} else if(message instanceof ExecuteRpc) {
executeRpc((ExecuteRpc) message);
+ } else if(message instanceof UpdateSchemaContext) {
+ updateSchemaContext((UpdateSchemaContext) message);
}
}
- private void invokeRemoteRpc(final InvokeRpc msg) {
- LOG.debug("Looking up the remote actor for rpc {}", msg.getRpc());
+ private void updateSchemaContext(UpdateSchemaContext message) {
+ this.schemaContext = message.getSchemaContext();
+ }
+ private void invokeRemoteRpc(final InvokeRpc msg) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Looking up the remote actor for rpc {}", msg.getRpc());
+ }
RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(
null, msg.getRpc(), msg.getIdentifier());
RpcRegistry.Messages.FindRouters findMsg = new RpcRegistry.Messages.FindRouters(routeId);
- scala.concurrent.Future<Object> future = ask(rpcRegistry, findMsg,
- new Timeout(ActorUtil.LOCAL_ASK_DURATION));
+ scala.concurrent.Future<Object> future = ask(rpcRegistry, findMsg, config.getAskDuration());
final ActorRef sender = getSender();
final ActorRef self = self();
ExecuteRpc executeMsg = new ExecuteRpc(XmlUtils.inputCompositeNodeToXml(msg.getInput(),
schemaContext), msg.getRpc());
- scala.concurrent.Future<Object> future = ask(logic.select(), executeMsg,
- new Timeout(ActorUtil.REMOTE_ASK_DURATION));
+ scala.concurrent.Future<Object> future = ask(logic.select(), executeMsg, config.getAskDuration());
OnComplete<Object> onComplete = new OnComplete<Object>() {
@Override
}
private void executeRpc(final ExecuteRpc msg) {
- LOG.debug("Executing rpc {}", msg.getRpc());
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Executing rpc {}", msg.getRpc());
+ }
Future<RpcResult<CompositeNode>> future = brokerSession.rpc(msg.getRpc(),
XmlUtils.inputXmlToCompositeNode(msg.getRpc(), msg.getInputCompositeNode(),
schemaContext));