2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.connector.remoterpc;
10 import static com.google.common.base.Preconditions.checkNotNull;
11 import static com.google.common.base.Preconditions.checkState;
13 import java.util.ArrayList;
14 import java.util.List;
15 import java.util.concurrent.ExecutorService;
16 import java.util.concurrent.Executors;
17 import java.util.concurrent.FutureTask;
18 import java.util.concurrent.TimeUnit;
20 import org.opendaylight.controller.sal.common.util.RpcErrors;
21 import org.opendaylight.controller.sal.common.util.Rpcs;
22 import org.opendaylight.controller.sal.connector.api.RpcRouter;
23 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
24 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
25 import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
26 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
27 import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
28 import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
29 import org.opendaylight.yangtools.yang.common.QName;
30 import org.opendaylight.yangtools.yang.common.RpcError;
31 import org.opendaylight.yangtools.yang.common.RpcResult;
32 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
33 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36 import org.zeromq.ZMQ;
38 import com.google.common.base.Optional;
39 import com.google.common.util.concurrent.Futures;
40 import com.google.common.util.concurrent.ListenableFuture;
43 * An implementation of {@link org.opendaylight.controller.sal.core.api.RpcImplementation} that makes
46 public class ClientImpl implements RemoteRpcClient {
48 private final Logger _logger = LoggerFactory.getLogger(ClientImpl.class);
50 private final ZMQ.Context context = ZMQ.context(1);
51 private final ClientRequestHandler handler;
52 private RoutingTableProvider routingTableProvider;
55 handler = new ClientRequestHandler(context);
59 public ClientImpl(ClientRequestHandler handler){
60 this.handler = handler;
64 public RoutingTableProvider getRoutingTableProvider() {
65 return routingTableProvider;
69 public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) {
70 this.routingTableProvider = routingTableProvider;
74 public void start() {/*NOOPS*/}
80 _logger.info("Stopped");
89 * Finds remote server that can execute this rpc and sends a message to it
90 * requesting execution.
91 * The call blocks until a response from remote server is received. Its upto
92 * the client of this API to implement a timeout functionality.
94 * @param rpc remote service to be executed
95 * @param input payload for the remote service
98 public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
99 RouteIdentifierImpl routeId = new RouteIdentifierImpl();
100 routeId.setType(rpc);
102 String address = lookupRemoteAddressForGlobalRpc(routeId);
103 return sendMessage(input, routeId, address);
107 * Finds remote server that can execute this routed rpc and sends a message to it
108 * requesting execution.
109 * The call blocks until a response from remote server is received. Its upto
110 * the client of this API to implement a timeout functionality.
115 * instance identifier on which rpc is to be executed
120 public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
122 RouteIdentifierImpl routeId = new RouteIdentifierImpl();
123 routeId.setType(rpc);
124 routeId.setRoute(identifier);
126 String address = lookupRemoteAddressForRpc(routeId);
128 return sendMessage(input, routeId, address);
131 private ListenableFuture<RpcResult<CompositeNode>> sendMessage(CompositeNode input, RouteIdentifierImpl routeId, String address) {
132 Message request = new Message.MessageBuilder()
133 .type(Message.MessageType.REQUEST)
134 .sender(Context.getInstance().getLocalUri())
137 .payload(XmlUtils.compositeNodeToXml(input))
140 List<RpcError> errors = new ArrayList<RpcError>();
143 Message response = handler.handle(request);
144 CompositeNode payload = null;
146 if ( response != null ) {
148 _logger.info("Received response [{}]", response);
150 Object rawPayload = response.getPayload();
151 switch (response.getType()) {
153 if ( rawPayload instanceof List )
154 errors = (List) rawPayload;
158 payload = XmlUtils.xmlToCompositeNode((String) rawPayload);
163 RpcErrors.getRpcError(null, null,null,null,"Unable to get response from remote controller", null, null)
169 return Futures.immediateFuture(Rpcs.getRpcResult(true, payload, errors));
171 } catch (Exception e){
172 collectErrors(e, errors);
173 return Futures.immediateFuture(Rpcs.<CompositeNode>getRpcResult(false, null, errors));
178 * Find address for the given route identifier in routing table
179 * @param routeId route identifier
180 * @return remote network address
182 private String lookupRemoteAddressForGlobalRpc(RpcRouter.RouteIdentifier<?, ?, ?> routeId){
183 checkNotNull(routeId, "route must not be null");
185 Optional<RoutingTable<RpcRouter.RouteIdentifier<?, ?, ?>, String>> routingTable = routingTableProvider.getRoutingTable();
186 checkNotNull(routingTable.isPresent(), "Routing table is null");
188 String address = null;
190 address = routingTable.get().getGlobalRoute(routeId);
191 } catch (RoutingTableException|SystemException e) {
192 _logger.error("Exception caught while looking up remote address " + e);
194 checkState(address != null, "Address not found for route [%s]", routeId);
200 * Find address for the given route identifier in routing table
201 * @param routeId route identifier
202 * @return remote network address
204 private String lookupRemoteAddressForRpc(RpcRouter.RouteIdentifier<?, ?, ?> routeId){
205 checkNotNull(routeId, "route must not be null");
207 Optional<RoutingTable<RpcRouter.RouteIdentifier<?, ?, ?>, String>> routingTable = routingTableProvider.getRoutingTable();
208 checkNotNull(routingTable.isPresent(), "Routing table is null");
210 String address = routingTable.get().getLastAddedRoute(routeId);
211 checkState(address != null, "Address not found for route [%s]", routeId);
216 private void collectErrors(Exception e, List<RpcError> errors){
217 if (e == null) return;
218 if (errors == null) errors = new ArrayList<RpcError>();
220 errors.add(RpcErrors.getRpcError(null, null, null, null, e.getMessage(), null, e.getCause()));
221 for (Throwable t : e.getSuppressed()) {
222 errors.add(RpcErrors.getRpcError(null, null, null, null, t.getMessage(), null, t));
227 * Closes ZMQ Context. It tries to gracefully terminate the context. If
228 * termination takes more than a second, its forcefully shutdown.
230 private void closeZmqContext() {
231 ExecutorService exec = Executors.newSingleThreadExecutor();
232 FutureTask<?> zmqTermination = new FutureTask<Void>(new Runnable() {
239 _logger.debug("ZMQ Context terminated");
240 } catch (Exception e) {
241 _logger.debug("ZMQ Context termination threw exception [{}]. Continuing shutdown...", e);
246 exec.execute(zmqTermination);
249 zmqTermination.get(1L, TimeUnit.SECONDS);
250 } catch (Exception e) {/*ignore and continue with shutdown*/}