Adding more unit tests for remote rpc connector and Integrating routing table
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / RpcBroker.java
index 26e8e960e3ce46d5f9ea9b3047160134a1f2df36..611618f1f69b106d85a0959188646252fc89de60 100644 (file)
@@ -11,17 +11,17 @@ package org.opendaylight.controller.remote.rpc;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.japi.Creator;
+import akka.japi.Pair;
 import org.opendaylight.controller.remote.rpc.messages.ErrorResponse;
 import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
-import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc;
-import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply;
-import org.opendaylight.controller.remote.rpc.messages.GetRpc;
-import org.opendaylight.controller.remote.rpc.messages.GetRpcReply;
-import org.opendaylight.controller.remote.rpc.messages.InvokeRoutedRpc;
 import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
 import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
+import org.opendaylight.controller.remote.rpc.utils.LatestEntryRoutingLogic;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
 import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
+import org.opendaylight.controller.remote.rpc.utils.RoutingLogic;
 import org.opendaylight.controller.remote.rpc.utils.XmlUtils;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
 import org.opendaylight.controller.sal.core.api.Broker;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
@@ -29,6 +29,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
 import java.util.concurrent.Future;
 
 /**
@@ -59,81 +60,57 @@ public class RpcBroker extends AbstractUntypedActor {
   }
   @Override
   protected void handleReceive(Object message) throws Exception {
-    if(message instanceof InvokeRoutedRpc) {
-      invokeRemoteRoutedRpc((InvokeRoutedRpc) message);
-    } else if(message instanceof InvokeRpc) {
+   if(message instanceof InvokeRpc) {
       invokeRemoteRpc((InvokeRpc) message);
     } else if(message instanceof ExecuteRpc) {
       executeRpc((ExecuteRpc) message);
     }
   }
 
-  private void invokeRemoteRoutedRpc(InvokeRoutedRpc msg) {
-    // Look up the remote actor to execute rpc
-    LOG.debug("Looking up the remote actor for route {}", msg);
-    try {
-      RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, msg.getRpc(), msg.getIdentifier());
-      GetRoutedRpc routedRpcMsg = new GetRoutedRpc(routeId);
-      GetRoutedRpcReply rpcReply = (GetRoutedRpcReply) ActorUtil.executeLocalOperation(rpcRegistry, routedRpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION);
-
-      String remoteActorPath = rpcReply.getRoutePath();
-      if(remoteActorPath == null) {
-        LOG.debug("No remote actor found for rpc execution.");
-
-        getSender().tell(new ErrorResponse(
-          new IllegalStateException("No remote actor found for rpc execution.")), self());
-      } else {
-
-        ExecuteRpc executeMsg = new ExecuteRpc(XmlUtils.inputCompositeNodeToXml(msg.getInput(), schemaContext), msg.getRpc());
-
-        Object operationRes = ActorUtil.executeRemoteOperation(this.context().actorSelection(remoteActorPath),
-            executeMsg, ActorUtil.REMOTE_ASK_DURATION, ActorUtil.REMOTE_AWAIT_DURATION);
-
-        getSender().tell(operationRes, self());
-      }
-    } catch (Exception e) {
-        LOG.error(e.toString());
-        getSender().tell(new ErrorResponse(e), self());
-    }
-  }
-
   private void invokeRemoteRpc(InvokeRpc msg) {
     // Look up the remote actor to execute rpc
     LOG.debug("Looking up the remote actor for route {}", msg);
     try {
-      RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, msg.getRpc(), null);
-      GetRpc rpcMsg = new GetRpc(routeId);
-      GetRpcReply rpcReply = (GetRpcReply)ActorUtil.executeLocalOperation(rpcRegistry, rpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION);
-      String remoteActorPath = rpcReply.getRoutePath();
+      // Find router
+      RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(null, msg.getRpc(), msg.getIdentifier());
+      RpcRegistry.Messages.FindRouters rpcMsg = new RpcRegistry.Messages.FindRouters(routeId);
+      RpcRegistry.Messages.FindRoutersReply rpcReply =
+          (RpcRegistry.Messages.FindRoutersReply) ActorUtil.executeOperation(rpcRegistry, rpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION);
+
+      List<Pair<ActorRef, Long>> actorRefList = rpcReply.getRouterWithUpdateTime();
 
-      if(remoteActorPath == null) {
+      if(actorRefList == null || actorRefList.isEmpty()) {
         LOG.debug("No remote actor found for rpc {{}}.", msg.getRpc());
 
         getSender().tell(new ErrorResponse(
-          new IllegalStateException("No remote actor found for rpc execution of : " + msg.getRpc())), self());
+            new IllegalStateException("No remote actor found for rpc execution of : " + msg.getRpc())), self());
       } else {
+        RoutingLogic logic = new LatestEntryRoutingLogic(actorRefList);
 
         ExecuteRpc executeMsg = new ExecuteRpc(XmlUtils.inputCompositeNodeToXml(msg.getInput(), schemaContext), msg.getRpc());
-        Object operationRes = ActorUtil.executeRemoteOperation(this.context().actorSelection(remoteActorPath),
+        Object operationRes = ActorUtil.executeOperation(logic.select(),
             executeMsg, ActorUtil.REMOTE_ASK_DURATION, ActorUtil.REMOTE_AWAIT_DURATION);
 
         getSender().tell(operationRes, self());
       }
     } catch (Exception e) {
-        LOG.error(e.toString());
+        LOG.error("invokeRemoteRpc: {}", e);
         getSender().tell(new ErrorResponse(e), self());
     }
   }
 
+
+
   private void executeRpc(ExecuteRpc msg) {
     LOG.debug("Executing rpc for rpc {}", msg.getRpc());
     try {
-      Future<RpcResult<CompositeNode>> rpc = brokerSession.rpc(msg.getRpc(), XmlUtils.inputXmlToCompositeNode(msg.getRpc(), msg.getInputCompositeNode(), schemaContext));
+      Future<RpcResult<CompositeNode>> rpc = brokerSession.rpc(msg.getRpc(),
+          XmlUtils.inputXmlToCompositeNode(msg.getRpc(), msg.getInputCompositeNode(), schemaContext));
       RpcResult<CompositeNode> rpcResult = rpc != null ? rpc.get():null;
       CompositeNode result = rpcResult != null ? rpcResult.getResult() : null;
       getSender().tell(new RpcResponse(XmlUtils.outputCompositeNodeToXml(result, schemaContext)), self());
     } catch (Exception e) {
-      LOG.error(e.toString());
+      LOG.error("executeRpc: {}", e);
       getSender().tell(new ErrorResponse(e), self());
     }
   }