Merge "Created Network Service Functions Features"
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / RpcBroker.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.remote.rpc;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.japi.Creator;
14 import org.opendaylight.controller.remote.rpc.messages.ErrorResponse;
15 import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
16 import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc;
17 import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply;
18 import org.opendaylight.controller.remote.rpc.messages.GetRpc;
19 import org.opendaylight.controller.remote.rpc.messages.GetRpcReply;
20 import org.opendaylight.controller.remote.rpc.messages.InvokeRoutedRpc;
21 import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
22 import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
23 import org.opendaylight.controller.sal.core.api.Broker;
24 import org.opendaylight.yangtools.yang.common.RpcResult;
25 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
26 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 import java.util.concurrent.Future;
31
32 /**
33  * Actor to initiate execution of remote RPC on other nodes of the cluster.
34  */
35
36 public class RpcBroker extends AbstractUntypedActor {
37
38   private static final Logger LOG = LoggerFactory.getLogger(RpcBroker.class);
39   private final Broker.ProviderSession brokerSession;
40   private final ActorRef rpcRegistry;
41   private final SchemaContext schemaContext;
42
43   private RpcBroker(Broker.ProviderSession brokerSession, ActorRef rpcRegistry, SchemaContext schemaContext){
44     this.brokerSession = brokerSession;
45     this.rpcRegistry = rpcRegistry;
46     this.schemaContext = schemaContext;
47   }
48
49   public static Props props(final Broker.ProviderSession brokerSession, final ActorRef rpcRegistry, final SchemaContext schemaContext){
50     return Props.create(new Creator<RpcBroker>(){
51
52       @Override
53       public RpcBroker create() throws Exception {
54         return new RpcBroker(brokerSession, rpcRegistry, schemaContext);
55       }
56     });
57   }
58   @Override
59   protected void handleReceive(Object message) throws Exception {
60     if(message instanceof InvokeRoutedRpc) {
61       invokeRemoteRoutedRpc((InvokeRoutedRpc) message);
62     } else if(message instanceof InvokeRpc) {
63       invokeRemoteRpc((InvokeRpc) message);
64     } else if(message instanceof ExecuteRpc) {
65       executeRpc((ExecuteRpc) message);
66     }
67   }
68
69   private void invokeRemoteRoutedRpc(InvokeRoutedRpc msg) {
70     // Look up the remote actor to execute rpc
71     LOG.debug("Looking up the remote actor for route {}", msg);
72     try {
73       RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, msg.getRpc(), msg.getIdentifier());
74       GetRoutedRpc routedRpcMsg = new GetRoutedRpc(routeId);
75       GetRoutedRpcReply rpcReply = (GetRoutedRpcReply)ActorUtil.executeLocalOperation(rpcRegistry, routedRpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION);
76
77       String remoteActorPath = rpcReply.getRoutePath();
78       if(remoteActorPath == null) {
79         LOG.debug("No remote actor found for rpc execution.");
80
81         getSender().tell(new ErrorResponse(
82           new IllegalStateException("No remote actor found for rpc execution.")), self());
83       } else {
84
85         ExecuteRpc executeMsg = new ExecuteRpc(XmlUtils.inputCompositeNodeToXml(msg.getInput(), schemaContext), msg.getRpc());
86
87         Object operationRes = ActorUtil.executeRemoteOperation(this.context().actorSelection(remoteActorPath),
88             executeMsg, ActorUtil.REMOTE_ASK_DURATION, ActorUtil.REMOTE_AWAIT_DURATION);
89
90         getSender().tell(operationRes, self());
91       }
92     } catch (Exception e) {
93         LOG.error(e.toString());
94         getSender().tell(new ErrorResponse(e), self());
95     }
96   }
97
98   private void invokeRemoteRpc(InvokeRpc msg) {
99     // Look up the remote actor to execute rpc
100     LOG.debug("Looking up the remote actor for route {}", msg);
101     try {
102       RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, msg.getRpc(), null);
103       GetRpc rpcMsg = new GetRpc(routeId);
104       GetRpcReply rpcReply = (GetRpcReply)ActorUtil.executeLocalOperation(rpcRegistry, rpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION);
105       String remoteActorPath = rpcReply.getRoutePath();
106
107       if(remoteActorPath == null) {
108         LOG.debug("No remote actor found for rpc execution.");
109
110         getSender().tell(new ErrorResponse(
111           new IllegalStateException("No remote actor found for rpc execution.")), self());
112       } else {
113         ExecuteRpc executeMsg = new ExecuteRpc(XmlUtils.inputCompositeNodeToXml(msg.getInput(), schemaContext), msg.getRpc());
114         Object operationRes = ActorUtil.executeRemoteOperation(this.context().actorSelection(remoteActorPath),
115             executeMsg, ActorUtil.REMOTE_ASK_DURATION, ActorUtil.REMOTE_AWAIT_DURATION);
116
117         getSender().tell(operationRes, self());
118       }
119     } catch (Exception e) {
120         LOG.error(e.toString());
121         getSender().tell(new ErrorResponse(e), self());
122     }
123   }
124
125   private void executeRpc(ExecuteRpc msg) {
126     LOG.debug("Executing rpc for rpc {}", msg.getRpc());
127     try {
128       Future<RpcResult<CompositeNode>> rpc = brokerSession.rpc(msg.getRpc(), XmlUtils.inputXmlToCompositeNode(msg.getRpc(), msg.getInputCompositeNode(), schemaContext));
129       RpcResult<CompositeNode> rpcResult = rpc != null ? rpc.get():null;
130
131       CompositeNode result = rpcResult != null ? rpcResult.getResult() : null;
132       getSender().tell(new RpcResponse(XmlUtils.outputCompositeNodeToXml(result, schemaContext)), self());
133     } catch (Exception e) {
134       LOG.error(e.toString());
135       getSender().tell(new ErrorResponse(e), self());
136     }
137   }
138
139 }