Enhancements to remote rpc server. Using zmq router-dealer bridge to make the server...
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / implementation / src / main / java / org / opendaylight / controller / sal / connector / remoterpc / ClientImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  * This program and the accompanying materials are made available under the
4  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5  * and is available at http://www.eclipse.org/legal/epl-v10.html
6  */
7
8 package org.opendaylight.controller.sal.connector.remoterpc;
9
10 import com.google.common.base.Optional;
11 import org.opendaylight.controller.sal.common.util.RpcErrors;
12 import org.opendaylight.controller.sal.common.util.Rpcs;
13 import org.opendaylight.controller.sal.connector.api.RpcRouter;
14 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
15 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
16 import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
17 import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
18 import org.opendaylight.controller.sal.core.api.RpcImplementation;
19 import org.opendaylight.yangtools.yang.common.QName;
20 import org.opendaylight.yangtools.yang.common.RpcError;
21 import org.opendaylight.yangtools.yang.common.RpcResult;
22 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25 import org.zeromq.ZMQ;
26
27 import java.util.ArrayList;
28 import java.util.Collections;
29 import java.util.List;
30 import java.util.Set;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Executors;
33 import java.util.concurrent.FutureTask;
34 import java.util.concurrent.TimeUnit;
35
36 import static com.google.common.base.Preconditions.checkNotNull;
37 import static com.google.common.base.Preconditions.checkState;
38
39 /**
40  * An implementation of {@link RpcImplementation} that makes
41  * remote RPC calls
42  */
43 public class ClientImpl implements RemoteRpcClient {
44
45   private final Logger _logger = LoggerFactory.getLogger(ClientImpl.class);
46
47   private ZMQ.Context context = ZMQ.context(1);
48   private ClientRequestHandler handler;
49   private RoutingTableProvider routingTableProvider;
50
51   public ClientImpl(){
52     handler = new ClientRequestHandler(context);
53     start();
54   }
55
56   public ClientImpl(ClientRequestHandler handler){
57     this.handler = handler;
58     start();
59   }
60
61   public RoutingTableProvider getRoutingTableProvider() {
62     return routingTableProvider;
63   }
64
65   public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) {
66     this.routingTableProvider = routingTableProvider;
67   }
68
69  @Override
70   public Set<QName> getSupportedRpcs(){
71     //TODO: Find the entries from routing table
72     return Collections.emptySet();
73   }
74
75   @Override
76   public void start() {/*NOOPS*/}
77
78   @Override
79   public void stop() {
80     closeZmqContext();
81     handler.close();
82     _logger.info("Stopped");
83   }
84
85   @Override
86   public void close(){
87     stop();
88   }
89
90   /**
91    * Finds remote server that can execute this rpc and sends a message to it
92    * requesting execution.
93    * The call blocks until a response from remote server is received. Its upto
94    * the client of this API to implement a timeout functionality.
95    *
96    * @param rpc   remote service to be executed
97    * @param input payload for the remote service
98    * @return
99    */
100   @Override
101   public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
102
103     RouteIdentifierImpl routeId = new RouteIdentifierImpl();
104     routeId.setType(rpc);
105
106     String address = lookupRemoteAddress(routeId);
107
108     Message request = new Message.MessageBuilder()
109         .type(Message.MessageType.REQUEST)
110         .sender(Context.getInstance().getLocalUri())
111         .recipient(address)
112         .route(routeId)
113         .payload(XmlUtils.compositeNodeToXml(input))
114         .build();
115
116     List<RpcError> errors = new ArrayList<RpcError>();
117
118     try{
119       Message response = handler.handle(request);
120       CompositeNode payload = null;
121
122       if ( response != null )
123         payload = XmlUtils.xmlToCompositeNode((String) response.getPayload());
124
125       return Rpcs.getRpcResult(true, payload, errors);
126
127     } catch (Exception e){
128       collectErrors(e, errors);
129       return Rpcs.getRpcResult(false, null, errors);
130     }
131
132   }
133
134   /**
135    * Find address for the given route identifier in routing table
136    * @param  routeId route identifier
137    * @return         remote network address
138    */
139   private String lookupRemoteAddress(RpcRouter.RouteIdentifier routeId){
140     checkNotNull(routeId, "route must not be null");
141
142     Optional<RoutingTable<String, String>> routingTable = routingTableProvider.getRoutingTable();
143     checkNotNull(routingTable.isPresent(), "Routing table is null");
144
145     Set<String> addresses = routingTable.get().getRoutes(routeId.toString());
146     checkNotNull(addresses, "Address not found for route [%s]", routeId);
147     checkState(addresses.size() == 1,
148         "Multiple remote addresses found for route [%s], \nonly 1 expected", routeId); //its a global service.
149
150     String address = addresses.iterator().next();
151     checkNotNull(address, "Address not found for route [%s]", routeId);
152
153     return address;
154   }
155
156   private void collectErrors(Exception e, List<RpcError> errors){
157     if (e == null) return;
158     if (errors == null) errors = new ArrayList<RpcError>();
159
160     errors.add(RpcErrors.getRpcError(null, null, null, null, e.getMessage(), null, e.getCause()));
161     for (Throwable t : e.getSuppressed()) {
162       errors.add(RpcErrors.getRpcError(null, null, null, null, t.getMessage(), null, t));
163     }
164   }
165
166   /**
167    * Closes ZMQ Context. It tries to gracefully terminate the context. If
168    * termination takes more than a second, its forcefully shutdown.
169    */
170   private void closeZmqContext() {
171     ExecutorService exec = Executors.newSingleThreadExecutor();
172     FutureTask zmqTermination = new FutureTask(new Runnable() {
173
174       @Override
175       public void run() {
176         try {
177           if (context != null)
178             context.term();
179           _logger.debug("ZMQ Context terminated");
180         } catch (Exception e) {
181           _logger.debug("ZMQ Context termination threw exception [{}]. Continuing shutdown...", e);
182         }
183       }
184     }, null);
185
186     exec.execute(zmqTermination);
187
188     try {
189       zmqTermination.get(1L, TimeUnit.SECONDS);
190     } catch (Exception e) {/*ignore and continue with shutdown*/}
191
192     exec.shutdownNow();
193   }
194 }