Implement finding a primary based on the shard name and do basic wiring of Distribute...
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / implementation / src / main / java / org / opendaylight / controller / sal / connector / remoterpc / ClientImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  * This program and the accompanying materials are made available under the
4  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5  * and is available at http://www.eclipse.org/legal/epl-v10.html
6  */
7
8 package org.opendaylight.controller.sal.connector.remoterpc;
9
10 import static com.google.common.base.Preconditions.checkNotNull;
11 import static com.google.common.base.Preconditions.checkState;
12
13 import java.util.ArrayList;
14 import java.util.List;
15 import java.util.concurrent.ExecutorService;
16 import java.util.concurrent.Executors;
17 import java.util.concurrent.FutureTask;
18 import java.util.concurrent.TimeUnit;
19
20 import org.opendaylight.controller.sal.common.util.RpcErrors;
21 import org.opendaylight.controller.sal.common.util.Rpcs;
22 import org.opendaylight.controller.sal.connector.api.RpcRouter;
23 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
24 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
25 import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
26 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
27 import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
28 import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
29 import org.opendaylight.yangtools.yang.common.QName;
30 import org.opendaylight.yangtools.yang.common.RpcError;
31 import org.opendaylight.yangtools.yang.common.RpcResult;
32 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
33 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36 import org.zeromq.ZMQ;
37
38 import com.google.common.base.Optional;
39 import com.google.common.util.concurrent.Futures;
40 import com.google.common.util.concurrent.ListenableFuture;
41
42 /**
43  * An implementation of {@link org.opendaylight.controller.sal.core.api.RpcImplementation} that makes
44  * remote RPC calls
45  */
46 public class ClientImpl implements RemoteRpcClient {
47
48   private final Logger _logger = LoggerFactory.getLogger(ClientImpl.class);
49
50   private final ZMQ.Context context = ZMQ.context(1);
51   private final ClientRequestHandler handler;
52   private RoutingTableProvider routingTableProvider;
53
54   public ClientImpl(){
55     handler = new ClientRequestHandler(context);
56     start();
57   }
58
59   public ClientImpl(ClientRequestHandler handler){
60     this.handler = handler;
61     start();
62   }
63
64   public RoutingTableProvider getRoutingTableProvider() {
65     return routingTableProvider;
66   }
67
68   @Override
69   public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) {
70     this.routingTableProvider = routingTableProvider;
71   }
72
73   @Override
74   public void start() {/*NOOPS*/}
75
76   @Override
77   public void stop() {
78     closeZmqContext();
79     handler.close();
80     _logger.info("Stopped");
81   }
82
83   @Override
84   public void close(){
85     stop();
86   }
87
88   /**
89    * Finds remote server that can execute this rpc and sends a message to it
90    * requesting execution.
91    * The call blocks until a response from remote server is received. Its upto
92    * the client of this API to implement a timeout functionality.
93    *
94    * @param rpc   remote service to be executed
95    * @param input payload for the remote service
96    * @return
97    */
98   public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
99     RouteIdentifierImpl routeId = new RouteIdentifierImpl();
100     routeId.setType(rpc);
101
102     String address = lookupRemoteAddressForGlobalRpc(routeId);
103     return sendMessage(input, routeId, address);
104   }
105
106   /**
107    * Finds remote server that can execute this routed rpc and sends a message to it
108    * requesting execution.
109    * The call blocks until a response from remote server is received. Its upto
110    * the client of this API to implement a timeout functionality.
111    *
112    * @param rpc
113    *          rpc to be called
114    * @param identifier
115    *          instance identifier on which rpc is to be executed
116    * @param input
117    *          payload
118    * @return
119    */
120   public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
121
122     RouteIdentifierImpl routeId = new RouteIdentifierImpl();
123     routeId.setType(rpc);
124     routeId.setRoute(identifier);
125
126     String address = lookupRemoteAddressForRpc(routeId);
127
128     return sendMessage(input, routeId, address);
129   }
130
131   private ListenableFuture<RpcResult<CompositeNode>> sendMessage(CompositeNode input, RouteIdentifierImpl routeId, String address) {
132     Message request = new Message.MessageBuilder()
133         .type(Message.MessageType.REQUEST)
134         .sender(Context.getInstance().getLocalUri())
135         .recipient(address)
136         .route(routeId)
137         .payload(XmlUtils.compositeNodeToXml(input))
138         .build();
139
140     List<RpcError> errors = new ArrayList<RpcError>();
141
142     try{
143       Message response = handler.handle(request);
144       CompositeNode payload = null;
145
146       if ( response != null ) {
147
148         _logger.info("Received response [{}]", response);
149
150         Object rawPayload = response.getPayload();
151         switch (response.getType()) {
152           case ERROR:
153             if ( rawPayload instanceof List )
154               errors = (List) rawPayload;
155               break;
156
157           case RESPONSE:
158             payload = XmlUtils.xmlToCompositeNode((String) rawPayload);
159             break;
160
161           default:
162             errors.add(
163                 RpcErrors.getRpcError(null, null,null,null,"Unable to get response from remote controller", null, null)
164             );
165             break;
166
167         }
168       }
169       return Futures.immediateFuture(Rpcs.getRpcResult(true, payload, errors));
170
171     } catch (Exception e){
172       collectErrors(e, errors);
173       return Futures.immediateFuture(Rpcs.<CompositeNode>getRpcResult(false, null, errors));
174     }
175   }
176
177   /**
178    * Find address for the given route identifier in routing table
179    * @param  routeId route identifier
180    * @return         remote network address
181    */
182   private String lookupRemoteAddressForGlobalRpc(RpcRouter.RouteIdentifier<?, ?, ?> routeId){
183     checkNotNull(routeId, "route must not be null");
184
185     Optional<RoutingTable<RpcRouter.RouteIdentifier<?, ?, ?>, String>> routingTable = routingTableProvider.getRoutingTable();
186     checkNotNull(routingTable.isPresent(), "Routing table is null");
187
188     String address = null;
189     try {
190       address = routingTable.get().getGlobalRoute(routeId);
191     } catch (RoutingTableException|SystemException e) {
192       _logger.error("Exception caught while looking up remote address " + e);
193     }
194     checkState(address != null, "Address not found for route [%s]", routeId);
195
196     return address;
197   }
198
199   /**
200    * Find address for the given route identifier in routing table
201    * @param  routeId route identifier
202    * @return         remote network address
203    */
204   private String lookupRemoteAddressForRpc(RpcRouter.RouteIdentifier<?, ?, ?> routeId){
205     checkNotNull(routeId, "route must not be null");
206
207     Optional<RoutingTable<RpcRouter.RouteIdentifier<?, ?, ?>, String>> routingTable = routingTableProvider.getRoutingTable();
208     checkNotNull(routingTable.isPresent(), "Routing table is null");
209
210     String address = routingTable.get().getLastAddedRoute(routeId);
211     checkState(address != null, "Address not found for route [%s]", routeId);
212
213     return address;
214   }
215
216   private void collectErrors(Exception e, List<RpcError> errors){
217     if (e == null) return;
218     if (errors == null) errors = new ArrayList<RpcError>();
219
220     errors.add(RpcErrors.getRpcError(null, null, null, null, e.getMessage(), null, e.getCause()));
221     for (Throwable t : e.getSuppressed()) {
222       errors.add(RpcErrors.getRpcError(null, null, null, null, t.getMessage(), null, t));
223     }
224   }
225
226   /**
227    * Closes ZMQ Context. It tries to gracefully terminate the context. If
228    * termination takes more than a second, its forcefully shutdown.
229    */
230   private void closeZmqContext() {
231     ExecutorService exec = Executors.newSingleThreadExecutor();
232     FutureTask<?> zmqTermination = new FutureTask<Void>(new Runnable() {
233
234       @Override
235       public void run() {
236         try {
237           if (context != null)
238             context.term();
239           _logger.debug("ZMQ Context terminated");
240         } catch (Exception e) {
241           _logger.debug("ZMQ Context termination threw exception [{}]. Continuing shutdown...", e);
242         }
243       }
244     }, null);
245
246     exec.execute(zmqTermination);
247
248     try {
249       zmqTermination.get(1L, TimeUnit.SECONDS);
250     } catch (Exception e) {/*ignore and continue with shutdown*/}
251
252     exec.shutdownNow();
253   }
254 }