Merge "Exception for URI /restconf/operations/module_name:rpc ended with slash"
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / implementation / src / main / java / org / opendaylight / controller / sal / connector / remoterpc / ClientImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  * This program and the accompanying materials are made available under the
4  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5  * and is available at http://www.eclipse.org/legal/epl-v10.html
6  */
7
8 package org.opendaylight.controller.sal.connector.remoterpc;
9
10 import com.google.common.base.Optional;
11 import org.opendaylight.controller.sal.common.util.RpcErrors;
12 import org.opendaylight.controller.sal.common.util.Rpcs;
13 import org.opendaylight.controller.sal.connector.api.RpcRouter;
14 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
15 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
16 import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
17 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
18 import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
19 import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
20 import org.opendaylight.controller.sal.core.api.RpcImplementation;
21 import org.opendaylight.yangtools.yang.common.QName;
22 import org.opendaylight.yangtools.yang.common.RpcError;
23 import org.opendaylight.yangtools.yang.common.RpcResult;
24 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
25 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import org.zeromq.ZMQ;
29
30 import java.util.ArrayList;
31 import java.util.List;
32 import java.util.concurrent.ExecutorService;
33 import java.util.concurrent.Executors;
34 import java.util.concurrent.FutureTask;
35 import java.util.concurrent.TimeUnit;
36
37 import static com.google.common.base.Preconditions.checkNotNull;
38 import static com.google.common.base.Preconditions.checkState;
39
40 /**
41  * An implementation of {@link RpcImplementation} that makes
42  * remote RPC calls
43  */
44 public class ClientImpl implements RemoteRpcClient {
45
46   private final Logger _logger = LoggerFactory.getLogger(ClientImpl.class);
47
48   private ZMQ.Context context = ZMQ.context(1);
49   private ClientRequestHandler handler;
50   private RoutingTableProvider routingTableProvider;
51
52   public ClientImpl(){
53     handler = new ClientRequestHandler(context);
54     start();
55   }
56
57   public ClientImpl(ClientRequestHandler handler){
58     this.handler = handler;
59     start();
60   }
61
62   public RoutingTableProvider getRoutingTableProvider() {
63     return routingTableProvider;
64   }
65
66   public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) {
67     this.routingTableProvider = routingTableProvider;
68   }
69
70   @Override
71   public void start() {/*NOOPS*/}
72
73   @Override
74   public void stop() {
75     closeZmqContext();
76     handler.close();
77     _logger.info("Stopped");
78   }
79
80   @Override
81   public void close(){
82     stop();
83   }
84
85   /**
86    * Finds remote server that can execute this rpc and sends a message to it
87    * requesting execution.
88    * The call blocks until a response from remote server is received. Its upto
89    * the client of this API to implement a timeout functionality.
90    *
91    * @param rpc   remote service to be executed
92    * @param input payload for the remote service
93    * @return
94    */
95   public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
96     RouteIdentifierImpl routeId = new RouteIdentifierImpl();
97     routeId.setType(rpc);
98
99     String address = lookupRemoteAddressForGlobalRpc(routeId);
100     return sendMessage(input, routeId, address);
101   }
102
103   /**
104    * Finds remote server that can execute this routed rpc and sends a message to it
105    * requesting execution.
106    * The call blocks until a response from remote server is received. Its upto
107    * the client of this API to implement a timeout functionality.
108    *
109    * @param rpc
110    *          rpc to be called
111    * @param identifier
112    *          instance identifier on which rpc is to be executed
113    * @param input
114    *          payload
115    * @return
116    */
117   public RpcResult<CompositeNode> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
118
119     RouteIdentifierImpl routeId = new RouteIdentifierImpl();
120     routeId.setType(rpc);
121     routeId.setRoute(identifier);
122
123     String address = lookupRemoteAddressForRpc(routeId);
124
125     return sendMessage(input, routeId, address);
126   }
127
128   private RpcResult<CompositeNode> sendMessage(CompositeNode input, RouteIdentifierImpl routeId, String address) {
129     Message request = new Message.MessageBuilder()
130         .type(Message.MessageType.REQUEST)
131         .sender(Context.getInstance().getLocalUri())
132         .recipient(address)
133         .route(routeId)
134         .payload(XmlUtils.compositeNodeToXml(input))
135         .build();
136
137     List<RpcError> errors = new ArrayList<RpcError>();
138
139     try{
140       Message response = handler.handle(request);
141       CompositeNode payload = null;
142
143       if ( response != null )
144         payload = XmlUtils.xmlToCompositeNode((String) response.getPayload());
145
146       return Rpcs.getRpcResult(true, payload, errors);
147
148     } catch (Exception e){
149       collectErrors(e, errors);
150       return Rpcs.getRpcResult(false, null, errors);
151     }
152   }
153
154   /**
155    * Find address for the given route identifier in routing table
156    * @param  routeId route identifier
157    * @return         remote network address
158    */
159   private String lookupRemoteAddressForGlobalRpc(RpcRouter.RouteIdentifier routeId){
160     checkNotNull(routeId, "route must not be null");
161
162     Optional<RoutingTable<RpcRouter.RouteIdentifier, String>> routingTable = routingTableProvider.getRoutingTable();
163     checkNotNull(routingTable.isPresent(), "Routing table is null");
164
165     String address = null;
166     try {
167       address = routingTable.get().getGlobalRoute(routeId);
168     } catch (RoutingTableException|SystemException e) {
169       _logger.error("Exception caught while looking up remote address " + e);
170     }
171     checkState(address != null, "Address not found for route [%s]", routeId);
172
173     return address;
174   }
175
176   /**
177    * Find address for the given route identifier in routing table
178    * @param  routeId route identifier
179    * @return         remote network address
180    */
181   private String lookupRemoteAddressForRpc(RpcRouter.RouteIdentifier routeId){
182     checkNotNull(routeId, "route must not be null");
183
184     Optional<RoutingTable<RpcRouter.RouteIdentifier, String>> routingTable = routingTableProvider.getRoutingTable();
185     checkNotNull(routingTable.isPresent(), "Routing table is null");
186
187     String address = routingTable.get().getLastAddedRoute(routeId);
188     checkState(address != null, "Address not found for route [%s]", routeId);
189
190     return address;
191   }
192
193   private void collectErrors(Exception e, List<RpcError> errors){
194     if (e == null) return;
195     if (errors == null) errors = new ArrayList<RpcError>();
196
197     errors.add(RpcErrors.getRpcError(null, null, null, null, e.getMessage(), null, e.getCause()));
198     for (Throwable t : e.getSuppressed()) {
199       errors.add(RpcErrors.getRpcError(null, null, null, null, t.getMessage(), null, t));
200     }
201   }
202
203   /**
204    * Closes ZMQ Context. It tries to gracefully terminate the context. If
205    * termination takes more than a second, its forcefully shutdown.
206    */
207   private void closeZmqContext() {
208     ExecutorService exec = Executors.newSingleThreadExecutor();
209     FutureTask zmqTermination = new FutureTask(new Runnable() {
210
211       @Override
212       public void run() {
213         try {
214           if (context != null)
215             context.term();
216           _logger.debug("ZMQ Context terminated");
217         } catch (Exception e) {
218           _logger.debug("ZMQ Context termination threw exception [{}]. Continuing shutdown...", e);
219         }
220       }
221     }, null);
222
223     exec.execute(zmqTermination);
224
225     try {
226       zmqTermination.get(1L, TimeUnit.SECONDS);
227     } catch (Exception e) {/*ignore and continue with shutdown*/}
228
229     exec.shutdownNow();
230   }
231 }