Merge "Added hosttracker shell for karaf (rebased)"
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / RpcBroker.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.remote.rpc;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.japi.Creator;
14 import org.opendaylight.controller.remote.rpc.messages.ErrorResponse;
15 import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
16 import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc;
17 import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply;
18 import org.opendaylight.controller.remote.rpc.messages.GetRpc;
19 import org.opendaylight.controller.remote.rpc.messages.GetRpcReply;
20 import org.opendaylight.controller.remote.rpc.messages.InvokeRoutedRpc;
21 import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
22 import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
23 import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
24 import org.opendaylight.controller.remote.rpc.utils.XmlUtils;
25 import org.opendaylight.controller.sal.core.api.Broker;
26 import org.opendaylight.yangtools.yang.common.RpcResult;
27 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
28 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 import java.util.concurrent.Future;
33
34 /**
35  * Actor to initiate execution of remote RPC on other nodes of the cluster.
36  */
37
38 public class RpcBroker extends AbstractUntypedActor {
39
40   private static final Logger LOG = LoggerFactory.getLogger(RpcBroker.class);
41   private final Broker.ProviderSession brokerSession;
42   private final ActorRef rpcRegistry;
43   private SchemaContext schemaContext;
44
45   private RpcBroker(Broker.ProviderSession brokerSession, ActorRef rpcRegistry, SchemaContext schemaContext){
46     this.brokerSession = brokerSession;
47     this.rpcRegistry = rpcRegistry;
48     this.schemaContext = schemaContext;
49   }
50
51   public static Props props(final Broker.ProviderSession brokerSession, final ActorRef rpcRegistry, final SchemaContext schemaContext){
52     return Props.create(new Creator<RpcBroker>(){
53
54       @Override
55       public RpcBroker create() throws Exception {
56         return new RpcBroker(brokerSession, rpcRegistry, schemaContext);
57       }
58     });
59   }
60   @Override
61   protected void handleReceive(Object message) throws Exception {
62     if(message instanceof InvokeRoutedRpc) {
63       invokeRemoteRoutedRpc((InvokeRoutedRpc) message);
64     } else if(message instanceof InvokeRpc) {
65       invokeRemoteRpc((InvokeRpc) message);
66     } else if(message instanceof ExecuteRpc) {
67       executeRpc((ExecuteRpc) message);
68     }
69   }
70
71   private void invokeRemoteRoutedRpc(InvokeRoutedRpc msg) {
72     // Look up the remote actor to execute rpc
73     LOG.debug("Looking up the remote actor for route {}", msg);
74     try {
75       RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, msg.getRpc(), msg.getIdentifier());
76       GetRoutedRpc routedRpcMsg = new GetRoutedRpc(routeId);
77       GetRoutedRpcReply rpcReply = (GetRoutedRpcReply) ActorUtil.executeLocalOperation(rpcRegistry, routedRpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION);
78
79       String remoteActorPath = rpcReply.getRoutePath();
80       if(remoteActorPath == null) {
81         LOG.debug("No remote actor found for rpc execution.");
82
83         getSender().tell(new ErrorResponse(
84           new IllegalStateException("No remote actor found for rpc execution.")), self());
85       } else {
86
87         ExecuteRpc executeMsg = new ExecuteRpc(XmlUtils.inputCompositeNodeToXml(msg.getInput(), schemaContext), msg.getRpc());
88
89         Object operationRes = ActorUtil.executeRemoteOperation(this.context().actorSelection(remoteActorPath),
90             executeMsg, ActorUtil.REMOTE_ASK_DURATION, ActorUtil.REMOTE_AWAIT_DURATION);
91
92         getSender().tell(operationRes, self());
93       }
94     } catch (Exception e) {
95         LOG.error(e.toString());
96         getSender().tell(new ErrorResponse(e), self());
97     }
98   }
99
100   private void invokeRemoteRpc(InvokeRpc msg) {
101     // Look up the remote actor to execute rpc
102     LOG.debug("Looking up the remote actor for route {}", msg);
103     try {
104       RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, msg.getRpc(), null);
105       GetRpc rpcMsg = new GetRpc(routeId);
106       GetRpcReply rpcReply = (GetRpcReply)ActorUtil.executeLocalOperation(rpcRegistry, rpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION);
107       String remoteActorPath = rpcReply.getRoutePath();
108
109       if(remoteActorPath == null) {
110         LOG.debug("No remote actor found for rpc {{}}.", msg.getRpc());
111
112         getSender().tell(new ErrorResponse(
113           new IllegalStateException("No remote actor found for rpc execution of : " + msg.getRpc())), self());
114       } else {
115
116         ExecuteRpc executeMsg = new ExecuteRpc(XmlUtils.inputCompositeNodeToXml(msg.getInput(), schemaContext), msg.getRpc());
117         Object operationRes = ActorUtil.executeRemoteOperation(this.context().actorSelection(remoteActorPath),
118             executeMsg, ActorUtil.REMOTE_ASK_DURATION, ActorUtil.REMOTE_AWAIT_DURATION);
119
120         getSender().tell(operationRes, self());
121       }
122     } catch (Exception e) {
123         LOG.error(e.toString());
124         getSender().tell(new ErrorResponse(e), self());
125     }
126   }
127
128   private void executeRpc(ExecuteRpc msg) {
129     LOG.debug("Executing rpc for rpc {}", msg.getRpc());
130     try {
131       Future<RpcResult<CompositeNode>> rpc = brokerSession.rpc(msg.getRpc(), XmlUtils.inputXmlToCompositeNode(msg.getRpc(), msg.getInputCompositeNode(), schemaContext));
132       RpcResult<CompositeNode> rpcResult = rpc != null ? rpc.get():null;
133       CompositeNode result = rpcResult != null ? rpcResult.getResult() : null;
134       getSender().tell(new RpcResponse(XmlUtils.outputCompositeNodeToXml(result, schemaContext)), self());
135     } catch (Exception e) {
136       LOG.error(e.toString());
137       getSender().tell(new ErrorResponse(e), self());
138     }
139   }
140
141 }