2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.connector.remoterpc;
10 import static com.google.common.base.Preconditions.checkNotNull;
11 import static com.google.common.base.Preconditions.checkState;
13 import java.util.ArrayList;
14 import java.util.List;
15 import java.util.concurrent.ExecutorService;
16 import java.util.concurrent.Executors;
17 import java.util.concurrent.FutureTask;
18 import java.util.concurrent.TimeUnit;
20 import org.opendaylight.controller.sal.common.util.RpcErrors;
21 import org.opendaylight.controller.sal.common.util.Rpcs;
22 import org.opendaylight.controller.sal.connector.api.RpcRouter;
23 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
24 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
25 import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
26 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
27 import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
28 import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
29 import org.opendaylight.controller.sal.core.api.RpcImplementation;
30 import org.opendaylight.yangtools.yang.common.QName;
31 import org.opendaylight.yangtools.yang.common.RpcError;
32 import org.opendaylight.yangtools.yang.common.RpcResult;
33 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
34 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37 import org.zeromq.ZMQ;
39 import com.google.common.base.Optional;
40 import com.google.common.util.concurrent.Futures;
41 import com.google.common.util.concurrent.ListenableFuture;
44 * An implementation of {@link RpcImplementation} that makes
47 public class ClientImpl implements RemoteRpcClient {
49 private final Logger _logger = LoggerFactory.getLogger(ClientImpl.class);
51 private final ZMQ.Context context = ZMQ.context(1);
52 private final ClientRequestHandler handler;
53 private RoutingTableProvider routingTableProvider;
56 handler = new ClientRequestHandler(context);
60 public ClientImpl(ClientRequestHandler handler){
61 this.handler = handler;
65 public RoutingTableProvider getRoutingTableProvider() {
66 return routingTableProvider;
70 public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) {
71 this.routingTableProvider = routingTableProvider;
75 public void start() {/*NOOPS*/}
81 _logger.info("Stopped");
90 * Finds remote server that can execute this rpc and sends a message to it
91 * requesting execution.
92 * The call blocks until a response from remote server is received. Its upto
93 * the client of this API to implement a timeout functionality.
95 * @param rpc remote service to be executed
96 * @param input payload for the remote service
99 public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
100 RouteIdentifierImpl routeId = new RouteIdentifierImpl();
101 routeId.setType(rpc);
103 String address = lookupRemoteAddressForGlobalRpc(routeId);
104 return sendMessage(input, routeId, address);
108 * Finds remote server that can execute this routed rpc and sends a message to it
109 * requesting execution.
110 * The call blocks until a response from remote server is received. Its upto
111 * the client of this API to implement a timeout functionality.
116 * instance identifier on which rpc is to be executed
121 public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
123 RouteIdentifierImpl routeId = new RouteIdentifierImpl();
124 routeId.setType(rpc);
125 routeId.setRoute(identifier);
127 String address = lookupRemoteAddressForRpc(routeId);
129 return sendMessage(input, routeId, address);
132 private ListenableFuture<RpcResult<CompositeNode>> sendMessage(CompositeNode input, RouteIdentifierImpl routeId, String address) {
133 Message request = new Message.MessageBuilder()
134 .type(Message.MessageType.REQUEST)
135 .sender(Context.getInstance().getLocalUri())
138 .payload(XmlUtils.compositeNodeToXml(input))
141 List<RpcError> errors = new ArrayList<RpcError>();
144 Message response = handler.handle(request);
145 CompositeNode payload = null;
147 if ( response != null ) {
149 _logger.info("Received response [{}]", response);
151 Object rawPayload = response.getPayload();
152 switch (response.getType()) {
154 if ( rawPayload instanceof List )
155 errors = (List) rawPayload;
159 payload = XmlUtils.xmlToCompositeNode((String) rawPayload);
164 RpcErrors.getRpcError(null, null,null,null,"Unable to get response from remote controller", null, null)
170 return Futures.immediateFuture(Rpcs.getRpcResult(true, payload, errors));
172 } catch (Exception e){
173 collectErrors(e, errors);
174 return Futures.immediateFuture(Rpcs.<CompositeNode>getRpcResult(false, null, errors));
179 * Find address for the given route identifier in routing table
180 * @param routeId route identifier
181 * @return remote network address
183 private String lookupRemoteAddressForGlobalRpc(RpcRouter.RouteIdentifier routeId){
184 checkNotNull(routeId, "route must not be null");
186 Optional<RoutingTable<RpcRouter.RouteIdentifier, String>> routingTable = routingTableProvider.getRoutingTable();
187 checkNotNull(routingTable.isPresent(), "Routing table is null");
189 String address = null;
191 address = routingTable.get().getGlobalRoute(routeId);
192 } catch (RoutingTableException|SystemException e) {
193 _logger.error("Exception caught while looking up remote address " + e);
195 checkState(address != null, "Address not found for route [%s]", routeId);
201 * Find address for the given route identifier in routing table
202 * @param routeId route identifier
203 * @return remote network address
205 private String lookupRemoteAddressForRpc(RpcRouter.RouteIdentifier routeId){
206 checkNotNull(routeId, "route must not be null");
208 Optional<RoutingTable<RpcRouter.RouteIdentifier, String>> routingTable = routingTableProvider.getRoutingTable();
209 checkNotNull(routingTable.isPresent(), "Routing table is null");
211 String address = routingTable.get().getLastAddedRoute(routeId);
212 checkState(address != null, "Address not found for route [%s]", routeId);
217 private void collectErrors(Exception e, List<RpcError> errors){
218 if (e == null) return;
219 if (errors == null) errors = new ArrayList<RpcError>();
221 errors.add(RpcErrors.getRpcError(null, null, null, null, e.getMessage(), null, e.getCause()));
222 for (Throwable t : e.getSuppressed()) {
223 errors.add(RpcErrors.getRpcError(null, null, null, null, t.getMessage(), null, t));
228 * Closes ZMQ Context. It tries to gracefully terminate the context. If
229 * termination takes more than a second, its forcefully shutdown.
231 private void closeZmqContext() {
232 ExecutorService exec = Executors.newSingleThreadExecutor();
233 FutureTask zmqTermination = new FutureTask(new Runnable() {
240 _logger.debug("ZMQ Context terminated");
241 } catch (Exception e) {
242 _logger.debug("ZMQ Context termination threw exception [{}]. Continuing shutdown...", e);
247 exec.execute(zmqTermination);
250 zmqTermination.get(1L, TimeUnit.SECONDS);
251 } catch (Exception e) {/*ignore and continue with shutdown*/}