2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.remote.rpc;
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.japi.Creator;
14 import org.opendaylight.controller.remote.rpc.messages.ErrorResponse;
15 import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
16 import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc;
17 import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply;
18 import org.opendaylight.controller.remote.rpc.messages.GetRpc;
19 import org.opendaylight.controller.remote.rpc.messages.GetRpcReply;
20 import org.opendaylight.controller.remote.rpc.messages.InvokeRoutedRpc;
21 import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
22 import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
23 import org.opendaylight.controller.sal.core.api.Broker;
24 import org.opendaylight.yangtools.yang.common.RpcResult;
25 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
26 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
30 import java.util.concurrent.Future;
33 * Actor to initiate execution of remote RPC on other nodes of the cluster.
36 public class RpcBroker extends AbstractUntypedActor {
38 private static final Logger LOG = LoggerFactory.getLogger(RpcBroker.class);
39 private final Broker.ProviderSession brokerSession;
40 private final ActorRef rpcRegistry;
41 private final SchemaContext schemaContext;
43 private RpcBroker(Broker.ProviderSession brokerSession, ActorRef rpcRegistry, SchemaContext schemaContext){
44 this.brokerSession = brokerSession;
45 this.rpcRegistry = rpcRegistry;
46 this.schemaContext = schemaContext;
49 public static Props props(final Broker.ProviderSession brokerSession, final ActorRef rpcRegistry, final SchemaContext schemaContext){
50 return Props.create(new Creator<RpcBroker>(){
53 public RpcBroker create() throws Exception {
54 return new RpcBroker(brokerSession, rpcRegistry, schemaContext);
59 protected void handleReceive(Object message) throws Exception {
60 if(message instanceof InvokeRoutedRpc) {
61 invokeRemoteRoutedRpc((InvokeRoutedRpc) message);
62 } else if(message instanceof InvokeRpc) {
63 invokeRemoteRpc((InvokeRpc) message);
64 } else if(message instanceof ExecuteRpc) {
65 executeRpc((ExecuteRpc) message);
69 private void invokeRemoteRoutedRpc(InvokeRoutedRpc msg) {
70 // Look up the remote actor to execute rpc
71 LOG.debug("Looking up the remote actor for route {}", msg);
73 RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, msg.getRpc(), msg.getIdentifier());
74 GetRoutedRpc routedRpcMsg = new GetRoutedRpc(routeId);
75 GetRoutedRpcReply rpcReply = (GetRoutedRpcReply)ActorUtil.executeLocalOperation(rpcRegistry, routedRpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION);
77 String remoteActorPath = rpcReply.getRoutePath();
78 if(remoteActorPath == null) {
79 LOG.debug("No remote actor found for rpc execution.");
81 getSender().tell(new ErrorResponse(
82 new IllegalStateException("No remote actor found for rpc execution.")), self());
85 ExecuteRpc executeMsg = new ExecuteRpc(XmlUtils.inputCompositeNodeToXml(msg.getInput(), schemaContext), msg.getRpc());
87 Object operationRes = ActorUtil.executeRemoteOperation(this.context().actorSelection(remoteActorPath),
88 executeMsg, ActorUtil.REMOTE_ASK_DURATION, ActorUtil.REMOTE_AWAIT_DURATION);
90 getSender().tell(operationRes, self());
92 } catch (Exception e) {
93 LOG.error(e.toString());
94 getSender().tell(new ErrorResponse(e), self());
98 private void invokeRemoteRpc(InvokeRpc msg) {
99 // Look up the remote actor to execute rpc
100 LOG.debug("Looking up the remote actor for route {}", msg);
102 RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, msg.getRpc(), null);
103 GetRpc rpcMsg = new GetRpc(routeId);
104 GetRpcReply rpcReply = (GetRpcReply)ActorUtil.executeLocalOperation(rpcRegistry, rpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION);
105 String remoteActorPath = rpcReply.getRoutePath();
107 if(remoteActorPath == null) {
108 LOG.debug("No remote actor found for rpc execution.");
110 getSender().tell(new ErrorResponse(
111 new IllegalStateException("No remote actor found for rpc execution.")), self());
113 ExecuteRpc executeMsg = new ExecuteRpc(XmlUtils.inputCompositeNodeToXml(msg.getInput(), schemaContext), msg.getRpc());
114 Object operationRes = ActorUtil.executeRemoteOperation(this.context().actorSelection(remoteActorPath),
115 executeMsg, ActorUtil.REMOTE_ASK_DURATION, ActorUtil.REMOTE_AWAIT_DURATION);
117 getSender().tell(operationRes, self());
119 } catch (Exception e) {
120 LOG.error(e.toString());
121 getSender().tell(new ErrorResponse(e), self());
125 private void executeRpc(ExecuteRpc msg) {
126 LOG.debug("Executing rpc for rpc {}", msg.getRpc());
128 Future<RpcResult<CompositeNode>> rpc = brokerSession.rpc(msg.getRpc(), XmlUtils.inputXmlToCompositeNode(msg.getRpc(), msg.getInputCompositeNode(), schemaContext));
129 RpcResult<CompositeNode> rpcResult = rpc != null ? rpc.get():null;
131 CompositeNode result = rpcResult != null ? rpcResult.getResult() : null;
132 getSender().tell(new RpcResponse(XmlUtils.outputCompositeNodeToXml(result, schemaContext)), self());
133 } catch (Exception e) {
134 LOG.error(e.toString());
135 getSender().tell(new ErrorResponse(e), self());