2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.connector.remoterpc;
10 import com.google.common.base.Optional;
11 import org.opendaylight.controller.sal.common.util.RpcErrors;
12 import org.opendaylight.controller.sal.common.util.Rpcs;
13 import org.opendaylight.controller.sal.connector.api.RpcRouter;
14 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
15 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
16 import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
17 import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
18 import org.opendaylight.controller.sal.core.api.RpcImplementation;
19 import org.opendaylight.yangtools.yang.common.QName;
20 import org.opendaylight.yangtools.yang.common.RpcError;
21 import org.opendaylight.yangtools.yang.common.RpcResult;
22 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25 import org.zeromq.ZMQ;
27 import java.util.ArrayList;
28 import java.util.Collections;
29 import java.util.List;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Executors;
33 import java.util.concurrent.FutureTask;
34 import java.util.concurrent.TimeUnit;
36 import static com.google.common.base.Preconditions.checkNotNull;
37 import static com.google.common.base.Preconditions.checkState;
40 * An implementation of {@link RpcImplementation} that makes
43 public class ClientImpl implements RemoteRpcClient {
45 private final Logger _logger = LoggerFactory.getLogger(ClientImpl.class);
47 private ZMQ.Context context = ZMQ.context(1);
48 private ClientRequestHandler handler;
49 private RoutingTableProvider routingTableProvider;
52 handler = new ClientRequestHandler(context);
56 public ClientImpl(ClientRequestHandler handler){
57 this.handler = handler;
61 public RoutingTableProvider getRoutingTableProvider() {
62 return routingTableProvider;
65 public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) {
66 this.routingTableProvider = routingTableProvider;
70 public Set<QName> getSupportedRpcs(){
71 //TODO: Find the entries from routing table
72 return Collections.emptySet();
76 public void start() {/*NOOPS*/}
82 _logger.info("Stopped");
91 * Finds remote server that can execute this rpc and sends a message to it
92 * requesting execution.
93 * The call blocks until a response from remote server is received. Its upto
94 * the client of this API to implement a timeout functionality.
96 * @param rpc remote service to be executed
97 * @param input payload for the remote service
101 public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
103 RouteIdentifierImpl routeId = new RouteIdentifierImpl();
104 routeId.setType(rpc);
106 String address = lookupRemoteAddress(routeId);
108 Message request = new Message.MessageBuilder()
109 .type(Message.MessageType.REQUEST)
110 .sender(Context.getInstance().getLocalUri())
113 .payload(XmlUtils.compositeNodeToXml(input))
116 List<RpcError> errors = new ArrayList<RpcError>();
119 Message response = handler.handle(request);
120 CompositeNode payload = null;
122 if ( response != null )
123 payload = XmlUtils.xmlToCompositeNode((String) response.getPayload());
125 return Rpcs.getRpcResult(true, payload, errors);
127 } catch (Exception e){
128 collectErrors(e, errors);
129 return Rpcs.getRpcResult(false, null, errors);
135 * Find address for the given route identifier in routing table
136 * @param routeId route identifier
137 * @return remote network address
139 private String lookupRemoteAddress(RpcRouter.RouteIdentifier routeId){
140 checkNotNull(routeId, "route must not be null");
142 Optional<RoutingTable<String, String>> routingTable = routingTableProvider.getRoutingTable();
143 checkNotNull(routingTable.isPresent(), "Routing table is null");
145 Set<String> addresses = routingTable.get().getRoutes(routeId.toString());
146 checkNotNull(addresses, "Address not found for route [%s]", routeId);
147 checkState(addresses.size() == 1,
148 "Multiple remote addresses found for route [%s], \nonly 1 expected", routeId); //its a global service.
150 String address = addresses.iterator().next();
151 checkNotNull(address, "Address not found for route [%s]", routeId);
156 private void collectErrors(Exception e, List<RpcError> errors){
157 if (e == null) return;
158 if (errors == null) errors = new ArrayList<RpcError>();
160 errors.add(RpcErrors.getRpcError(null, null, null, null, e.getMessage(), null, e.getCause()));
161 for (Throwable t : e.getSuppressed()) {
162 errors.add(RpcErrors.getRpcError(null, null, null, null, t.getMessage(), null, t));
167 * Closes ZMQ Context. It tries to gracefully terminate the context. If
168 * termination takes more than a second, its forcefully shutdown.
170 private void closeZmqContext() {
171 ExecutorService exec = Executors.newSingleThreadExecutor();
172 FutureTask zmqTermination = new FutureTask(new Runnable() {
179 _logger.debug("ZMQ Context terminated");
180 } catch (Exception e) {
181 _logger.debug("ZMQ Context termination threw exception [{}]. Continuing shutdown...", e);
186 exec.execute(zmqTermination);
189 zmqTermination.get(1L, TimeUnit.SECONDS);
190 } catch (Exception e) {/*ignore and continue with shutdown*/}