Merge changes I63383291,I1c9f10e9,I9cac529f,I269d373b,I7ede3ba5,I4afc1e15
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / implementation / src / main / java / org / opendaylight / controller / sal / connector / remoterpc / ClientImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  * This program and the accompanying materials are made available under the
4  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5  * and is available at http://www.eclipse.org/legal/epl-v10.html
6  */
7
8 package org.opendaylight.controller.sal.connector.remoterpc;
9
10 import static com.google.common.base.Preconditions.checkNotNull;
11 import static com.google.common.base.Preconditions.checkState;
12
13 import java.util.ArrayList;
14 import java.util.List;
15 import java.util.concurrent.ExecutorService;
16 import java.util.concurrent.Executors;
17 import java.util.concurrent.FutureTask;
18 import java.util.concurrent.TimeUnit;
19
20 import org.opendaylight.controller.sal.common.util.RpcErrors;
21 import org.opendaylight.controller.sal.common.util.Rpcs;
22 import org.opendaylight.controller.sal.connector.api.RpcRouter;
23 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
24 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
25 import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
26 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
27 import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
28 import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
29 import org.opendaylight.controller.sal.core.api.RpcImplementation;
30 import org.opendaylight.yangtools.yang.common.QName;
31 import org.opendaylight.yangtools.yang.common.RpcError;
32 import org.opendaylight.yangtools.yang.common.RpcResult;
33 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
34 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37 import org.zeromq.ZMQ;
38
39 import com.google.common.base.Optional;
40 import com.google.common.util.concurrent.Futures;
41 import com.google.common.util.concurrent.ListenableFuture;
42
43 /**
44  * An implementation of {@link RpcImplementation} that makes
45  * remote RPC calls
46  */
47 public class ClientImpl implements RemoteRpcClient {
48
49   private final Logger _logger = LoggerFactory.getLogger(ClientImpl.class);
50
51   private final ZMQ.Context context = ZMQ.context(1);
52   private final ClientRequestHandler handler;
53   private RoutingTableProvider routingTableProvider;
54
55   public ClientImpl(){
56     handler = new ClientRequestHandler(context);
57     start();
58   }
59
60   public ClientImpl(ClientRequestHandler handler){
61     this.handler = handler;
62     start();
63   }
64
65   public RoutingTableProvider getRoutingTableProvider() {
66     return routingTableProvider;
67   }
68
69   @Override
70   public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) {
71     this.routingTableProvider = routingTableProvider;
72   }
73
74   @Override
75   public void start() {/*NOOPS*/}
76
77   @Override
78   public void stop() {
79     closeZmqContext();
80     handler.close();
81     _logger.info("Stopped");
82   }
83
84   @Override
85   public void close(){
86     stop();
87   }
88
89   /**
90    * Finds remote server that can execute this rpc and sends a message to it
91    * requesting execution.
92    * The call blocks until a response from remote server is received. Its upto
93    * the client of this API to implement a timeout functionality.
94    *
95    * @param rpc   remote service to be executed
96    * @param input payload for the remote service
97    * @return
98    */
99   public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
100     RouteIdentifierImpl routeId = new RouteIdentifierImpl();
101     routeId.setType(rpc);
102
103     String address = lookupRemoteAddressForGlobalRpc(routeId);
104     return sendMessage(input, routeId, address);
105   }
106
107   /**
108    * Finds remote server that can execute this routed rpc and sends a message to it
109    * requesting execution.
110    * The call blocks until a response from remote server is received. Its upto
111    * the client of this API to implement a timeout functionality.
112    *
113    * @param rpc
114    *          rpc to be called
115    * @param identifier
116    *          instance identifier on which rpc is to be executed
117    * @param input
118    *          payload
119    * @return
120    */
121   public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
122
123     RouteIdentifierImpl routeId = new RouteIdentifierImpl();
124     routeId.setType(rpc);
125     routeId.setRoute(identifier);
126
127     String address = lookupRemoteAddressForRpc(routeId);
128
129     return sendMessage(input, routeId, address);
130   }
131
132   private ListenableFuture<RpcResult<CompositeNode>> sendMessage(CompositeNode input, RouteIdentifierImpl routeId, String address) {
133     Message request = new Message.MessageBuilder()
134         .type(Message.MessageType.REQUEST)
135         .sender(Context.getInstance().getLocalUri())
136         .recipient(address)
137         .route(routeId)
138         .payload(XmlUtils.compositeNodeToXml(input))
139         .build();
140
141     List<RpcError> errors = new ArrayList<RpcError>();
142
143     try{
144       Message response = handler.handle(request);
145       CompositeNode payload = null;
146
147       if ( response != null ) {
148
149         _logger.info("Received response [{}]", response);
150
151         Object rawPayload = response.getPayload();
152         switch (response.getType()) {
153           case ERROR:
154             if ( rawPayload instanceof List )
155               errors = (List) rawPayload;
156               break;
157
158           case RESPONSE:
159             payload = XmlUtils.xmlToCompositeNode((String) rawPayload);
160             break;
161
162           default:
163             errors.add(
164                 RpcErrors.getRpcError(null, null,null,null,"Unable to get response from remote controller", null, null)
165             );
166             break;
167
168         }
169       }
170       return Futures.immediateFuture(Rpcs.getRpcResult(true, payload, errors));
171
172     } catch (Exception e){
173       collectErrors(e, errors);
174       return Futures.immediateFuture(Rpcs.<CompositeNode>getRpcResult(false, null, errors));
175     }
176   }
177
178   /**
179    * Find address for the given route identifier in routing table
180    * @param  routeId route identifier
181    * @return         remote network address
182    */
183   private String lookupRemoteAddressForGlobalRpc(RpcRouter.RouteIdentifier<?, ?, ?> routeId){
184     checkNotNull(routeId, "route must not be null");
185
186     Optional<RoutingTable<RpcRouter.RouteIdentifier<?, ?, ?>, String>> routingTable = routingTableProvider.getRoutingTable();
187     checkNotNull(routingTable.isPresent(), "Routing table is null");
188
189     String address = null;
190     try {
191       address = routingTable.get().getGlobalRoute(routeId);
192     } catch (RoutingTableException|SystemException e) {
193       _logger.error("Exception caught while looking up remote address " + e);
194     }
195     checkState(address != null, "Address not found for route [%s]", routeId);
196
197     return address;
198   }
199
200   /**
201    * Find address for the given route identifier in routing table
202    * @param  routeId route identifier
203    * @return         remote network address
204    */
205   private String lookupRemoteAddressForRpc(RpcRouter.RouteIdentifier<?, ?, ?> routeId){
206     checkNotNull(routeId, "route must not be null");
207
208     Optional<RoutingTable<RpcRouter.RouteIdentifier<?, ?, ?>, String>> routingTable = routingTableProvider.getRoutingTable();
209     checkNotNull(routingTable.isPresent(), "Routing table is null");
210
211     String address = routingTable.get().getLastAddedRoute(routeId);
212     checkState(address != null, "Address not found for route [%s]", routeId);
213
214     return address;
215   }
216
217   private void collectErrors(Exception e, List<RpcError> errors){
218     if (e == null) return;
219     if (errors == null) errors = new ArrayList<RpcError>();
220
221     errors.add(RpcErrors.getRpcError(null, null, null, null, e.getMessage(), null, e.getCause()));
222     for (Throwable t : e.getSuppressed()) {
223       errors.add(RpcErrors.getRpcError(null, null, null, null, t.getMessage(), null, t));
224     }
225   }
226
227   /**
228    * Closes ZMQ Context. It tries to gracefully terminate the context. If
229    * termination takes more than a second, its forcefully shutdown.
230    */
231   private void closeZmqContext() {
232     ExecutorService exec = Executors.newSingleThreadExecutor();
233     FutureTask<?> zmqTermination = new FutureTask<Void>(new Runnable() {
234
235       @Override
236       public void run() {
237         try {
238           if (context != null)
239             context.term();
240           _logger.debug("ZMQ Context terminated");
241         } catch (Exception e) {
242           _logger.debug("ZMQ Context termination threw exception [{}]. Continuing shutdown...", e);
243         }
244       }
245     }, null);
246
247     exec.execute(zmqTermination);
248
249     try {
250       zmqTermination.get(1L, TimeUnit.SECONDS);
251     } catch (Exception e) {/*ignore and continue with shutdown*/}
252
253     exec.shutdownNow();
254   }
255 }