Merge "Adding more unit tests for remote rpc connector and Integrating routing table"
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / RpcBroker.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.remote.rpc;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.japi.Creator;
14 import akka.japi.Pair;
15 import org.opendaylight.controller.remote.rpc.messages.ErrorResponse;
16 import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
17 import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
18 import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
19 import org.opendaylight.controller.remote.rpc.utils.LatestEntryRoutingLogic;
20 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
21 import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
22 import org.opendaylight.controller.remote.rpc.utils.RoutingLogic;
23 import org.opendaylight.controller.remote.rpc.utils.XmlUtils;
24 import org.opendaylight.controller.sal.connector.api.RpcRouter;
25 import org.opendaylight.controller.sal.core.api.Broker;
26 import org.opendaylight.yangtools.yang.common.RpcResult;
27 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
28 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 import java.util.List;
33 import java.util.concurrent.Future;
34
35 /**
36  * Actor to initiate execution of remote RPC on other nodes of the cluster.
37  */
38
39 public class RpcBroker extends AbstractUntypedActor {
40
41   private static final Logger LOG = LoggerFactory.getLogger(RpcBroker.class);
42   private final Broker.ProviderSession brokerSession;
43   private final ActorRef rpcRegistry;
44   private SchemaContext schemaContext;
45
46   private RpcBroker(Broker.ProviderSession brokerSession, ActorRef rpcRegistry, SchemaContext schemaContext){
47     this.brokerSession = brokerSession;
48     this.rpcRegistry = rpcRegistry;
49     this.schemaContext = schemaContext;
50   }
51
52   public static Props props(final Broker.ProviderSession brokerSession, final ActorRef rpcRegistry, final SchemaContext schemaContext){
53     return Props.create(new Creator<RpcBroker>(){
54
55       @Override
56       public RpcBroker create() throws Exception {
57         return new RpcBroker(brokerSession, rpcRegistry, schemaContext);
58       }
59     });
60   }
61   @Override
62   protected void handleReceive(Object message) throws Exception {
63    if(message instanceof InvokeRpc) {
64       invokeRemoteRpc((InvokeRpc) message);
65     } else if(message instanceof ExecuteRpc) {
66       executeRpc((ExecuteRpc) message);
67     }
68   }
69
70   private void invokeRemoteRpc(InvokeRpc msg) {
71     // Look up the remote actor to execute rpc
72     LOG.debug("Looking up the remote actor for route {}", msg);
73     try {
74       // Find router
75       RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(null, msg.getRpc(), msg.getIdentifier());
76       RpcRegistry.Messages.FindRouters rpcMsg = new RpcRegistry.Messages.FindRouters(routeId);
77       RpcRegistry.Messages.FindRoutersReply rpcReply =
78           (RpcRegistry.Messages.FindRoutersReply) ActorUtil.executeOperation(rpcRegistry, rpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION);
79
80       List<Pair<ActorRef, Long>> actorRefList = rpcReply.getRouterWithUpdateTime();
81
82       if(actorRefList == null || actorRefList.isEmpty()) {
83         LOG.debug("No remote actor found for rpc {{}}.", msg.getRpc());
84
85         getSender().tell(new ErrorResponse(
86             new IllegalStateException("No remote actor found for rpc execution of : " + msg.getRpc())), self());
87       } else {
88         RoutingLogic logic = new LatestEntryRoutingLogic(actorRefList);
89
90         ExecuteRpc executeMsg = new ExecuteRpc(XmlUtils.inputCompositeNodeToXml(msg.getInput(), schemaContext), msg.getRpc());
91         Object operationRes = ActorUtil.executeOperation(logic.select(),
92             executeMsg, ActorUtil.REMOTE_ASK_DURATION, ActorUtil.REMOTE_AWAIT_DURATION);
93
94         getSender().tell(operationRes, self());
95       }
96     } catch (Exception e) {
97         LOG.error("invokeRemoteRpc: {}", e);
98         getSender().tell(new ErrorResponse(e), self());
99     }
100   }
101
102
103
104   private void executeRpc(ExecuteRpc msg) {
105     LOG.debug("Executing rpc for rpc {}", msg.getRpc());
106     try {
107       Future<RpcResult<CompositeNode>> rpc = brokerSession.rpc(msg.getRpc(),
108           XmlUtils.inputXmlToCompositeNode(msg.getRpc(), msg.getInputCompositeNode(), schemaContext));
109       RpcResult<CompositeNode> rpcResult = rpc != null ? rpc.get():null;
110       CompositeNode result = rpcResult != null ? rpcResult.getResult() : null;
111       getSender().tell(new RpcResponse(XmlUtils.outputCompositeNodeToXml(result, schemaContext)), self());
112     } catch (Exception e) {
113       LOG.error("executeRpc: {}", e);
114       getSender().tell(new ErrorResponse(e), self());
115     }
116   }
117
118 }