84df2e43f01cd54084357d58b67abbe0ea6e93e9
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / implementation / src / main / java / org / opendaylight / controller / sal / connector / remoterpc / ClientImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  * This program and the accompanying materials are made available under the
4  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5  * and is available at http://www.eclipse.org/legal/epl-v10.html
6  */
7
8 package org.opendaylight.controller.sal.connector.remoterpc;
9
10 import com.google.common.base.Optional;
11 import org.opendaylight.controller.sal.common.util.RpcErrors;
12 import org.opendaylight.controller.sal.common.util.Rpcs;
13 import org.opendaylight.controller.sal.connector.api.RpcRouter;
14 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
15 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
16 import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
17 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
18 import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
19 import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
20 import org.opendaylight.controller.sal.core.api.RpcImplementation;
21 import org.opendaylight.yangtools.yang.common.QName;
22 import org.opendaylight.yangtools.yang.common.RpcError;
23 import org.opendaylight.yangtools.yang.common.RpcResult;
24 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
25 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import org.zeromq.ZMQ;
29
30 import java.util.ArrayList;
31 import java.util.Collection;
32 import java.util.List;
33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.FutureTask;
36 import java.util.concurrent.TimeUnit;
37
38 import static com.google.common.base.Preconditions.checkNotNull;
39 import static com.google.common.base.Preconditions.checkState;
40
41 /**
42  * An implementation of {@link RpcImplementation} that makes
43  * remote RPC calls
44  */
45 public class ClientImpl implements RemoteRpcClient {
46
47   private final Logger _logger = LoggerFactory.getLogger(ClientImpl.class);
48
49   private ZMQ.Context context = ZMQ.context(1);
50   private ClientRequestHandler handler;
51   private RoutingTableProvider routingTableProvider;
52
53   public ClientImpl(){
54     handler = new ClientRequestHandler(context);
55     start();
56   }
57
58   public ClientImpl(ClientRequestHandler handler){
59     this.handler = handler;
60     start();
61   }
62
63   public RoutingTableProvider getRoutingTableProvider() {
64     return routingTableProvider;
65   }
66
67   public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) {
68     this.routingTableProvider = routingTableProvider;
69   }
70
71   @Override
72   public void start() {/*NOOPS*/}
73
74   @Override
75   public void stop() {
76     closeZmqContext();
77     handler.close();
78     _logger.info("Stopped");
79   }
80
81   @Override
82   public void close(){
83     stop();
84   }
85
86   /**
87    * Finds remote server that can execute this rpc and sends a message to it
88    * requesting execution.
89    * The call blocks until a response from remote server is received. Its upto
90    * the client of this API to implement a timeout functionality.
91    *
92    * @param rpc   remote service to be executed
93    * @param input payload for the remote service
94    * @return
95    */
96   public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
97     RouteIdentifierImpl routeId = new RouteIdentifierImpl();
98     routeId.setType(rpc);
99
100     String address = lookupRemoteAddressForGlobalRpc(routeId);
101     return sendMessage(input, routeId, address);
102   }
103
104   /**
105    * Finds remote server that can execute this routed rpc and sends a message to it
106    * requesting execution.
107    * The call blocks until a response from remote server is received. Its upto
108    * the client of this API to implement a timeout functionality.
109    *
110    * @param rpc
111    *          rpc to be called
112    * @param identifier
113    *          instance identifier on which rpc is to be executed
114    * @param input
115    *          payload
116    * @return
117    */
118   public RpcResult<CompositeNode> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
119
120     RouteIdentifierImpl routeId = new RouteIdentifierImpl();
121     routeId.setType(rpc);
122     routeId.setRoute(identifier);
123
124     String address = lookupRemoteAddressForRpc(routeId);
125
126     return sendMessage(input, routeId, address);
127   }
128
129   private RpcResult<CompositeNode> sendMessage(CompositeNode input, RouteIdentifierImpl routeId, String address) {
130     Message request = new Message.MessageBuilder()
131         .type(Message.MessageType.REQUEST)
132         .sender(Context.getInstance().getLocalUri())
133         .recipient(address)
134         .route(routeId)
135         .payload(XmlUtils.compositeNodeToXml(input))
136         .build();
137
138     List<RpcError> errors = new ArrayList<RpcError>();
139
140     try{
141       Message response = handler.handle(request);
142       CompositeNode payload = null;
143
144       if ( response != null ) {
145
146         _logger.info("Received response [{}]", response);
147
148         Object rawPayload = response.getPayload();
149         switch (response.getType()) {
150           case ERROR:
151             if ( rawPayload instanceof List )
152               errors = (List) rawPayload;
153               break;
154
155           case RESPONSE:
156             payload = XmlUtils.xmlToCompositeNode((String) rawPayload);
157             break;
158
159           default:
160             errors.add(
161                 RpcErrors.getRpcError(null, null,null,null,"Unable to get response from remote controller", null, null)
162             );
163             break;
164
165         }
166       }
167       return Rpcs.getRpcResult(true, payload, errors);
168
169     } catch (Exception e){
170       collectErrors(e, errors);
171       return Rpcs.getRpcResult(false, null, errors);
172     }
173   }
174
175   /**
176    * Find address for the given route identifier in routing table
177    * @param  routeId route identifier
178    * @return         remote network address
179    */
180   private String lookupRemoteAddressForGlobalRpc(RpcRouter.RouteIdentifier routeId){
181     checkNotNull(routeId, "route must not be null");
182
183     Optional<RoutingTable<RpcRouter.RouteIdentifier, String>> routingTable = routingTableProvider.getRoutingTable();
184     checkNotNull(routingTable.isPresent(), "Routing table is null");
185
186     String address = null;
187     try {
188       address = routingTable.get().getGlobalRoute(routeId);
189     } catch (RoutingTableException|SystemException e) {
190       _logger.error("Exception caught while looking up remote address " + e);
191     }
192     checkState(address != null, "Address not found for route [%s]", routeId);
193
194     return address;
195   }
196
197   /**
198    * Find address for the given route identifier in routing table
199    * @param  routeId route identifier
200    * @return         remote network address
201    */
202   private String lookupRemoteAddressForRpc(RpcRouter.RouteIdentifier routeId){
203     checkNotNull(routeId, "route must not be null");
204
205     Optional<RoutingTable<RpcRouter.RouteIdentifier, String>> routingTable = routingTableProvider.getRoutingTable();
206     checkNotNull(routingTable.isPresent(), "Routing table is null");
207
208     String address = routingTable.get().getLastAddedRoute(routeId);
209     checkState(address != null, "Address not found for route [%s]", routeId);
210
211     return address;
212   }
213
214   private void collectErrors(Exception e, List<RpcError> errors){
215     if (e == null) return;
216     if (errors == null) errors = new ArrayList<RpcError>();
217
218     errors.add(RpcErrors.getRpcError(null, null, null, null, e.getMessage(), null, e.getCause()));
219     for (Throwable t : e.getSuppressed()) {
220       errors.add(RpcErrors.getRpcError(null, null, null, null, t.getMessage(), null, t));
221     }
222   }
223
224   /**
225    * Closes ZMQ Context. It tries to gracefully terminate the context. If
226    * termination takes more than a second, its forcefully shutdown.
227    */
228   private void closeZmqContext() {
229     ExecutorService exec = Executors.newSingleThreadExecutor();
230     FutureTask zmqTermination = new FutureTask(new Runnable() {
231
232       @Override
233       public void run() {
234         try {
235           if (context != null)
236             context.term();
237           _logger.debug("ZMQ Context terminated");
238         } catch (Exception e) {
239           _logger.debug("ZMQ Context termination threw exception [{}]. Continuing shutdown...", e);
240         }
241       }
242     }, null);
243
244     exec.execute(zmqTermination);
245
246     try {
247       zmqTermination.get(1L, TimeUnit.SECONDS);
248     } catch (Exception e) {/*ignore and continue with shutdown*/}
249
250     exec.shutdownNow();
251   }
252 }