Merge "BUG 484 Anyxml normalized node"
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / implementation / src / main / java / org / opendaylight / controller / sal / connector / remoterpc / ServerImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.connector.remoterpc;
9
10 import java.net.Inet4Address;
11 import java.net.InetAddress;
12 import java.net.NetworkInterface;
13 import java.net.SocketException;
14 import java.util.Enumeration;
15 import java.util.HashSet;
16 import java.util.Set;
17 import java.util.concurrent.ExecutorService;
18 import java.util.concurrent.Executors;
19 import java.util.concurrent.FutureTask;
20 import java.util.concurrent.TimeUnit;
21
22 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
23 import org.opendaylight.yangtools.yang.common.QName;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26 import org.zeromq.ZMQ;
27
28 import com.google.common.base.Optional;
29 import com.google.common.base.Preconditions;
30
31 /**
32  * ZeroMq based implementation of RpcRouter.
33  */
34 public class ServerImpl implements RemoteRpcServer {
35
36   private final Logger _logger = LoggerFactory.getLogger(ServerImpl.class);
37
38   private ExecutorService serverPool;
39   protected ServerRequestHandler handler;
40
41   private Set<QName> remoteServices;
42   private ProviderSession brokerSession;
43   private ZMQ.Context context;
44
45   private final String HANDLER_INPROC_ADDRESS = "inproc://rpc-request-handler";
46   private final int HANDLER_WORKER_COUNT = 2;
47   private final int HWM = 200;//high water mark on sockets
48   private volatile State status = State.STOPPED;
49
50   private String serverAddress;
51   private final int port;
52
53   public static enum State {
54     STARTING, STARTED, STOPPED;
55   }
56
57   public ServerImpl(int port) {
58     this.port = port;
59   }
60
61   public State getStatus() {
62     return this.status;
63   }
64
65   public Optional<ServerRequestHandler> getHandler() {
66     return Optional.fromNullable(this.handler);
67   }
68
69   public void setBrokerSession(ProviderSession session) {
70     this.brokerSession = session;
71   }
72
73   public Optional<ProviderSession> getBrokerSession() {
74     return Optional.fromNullable(this.brokerSession);
75   }
76
77   public Optional<ZMQ.Context> getZmqContext() {
78     return Optional.fromNullable(this.context);
79   }
80
81   public String getServerAddress() {
82     return serverAddress;
83   }
84
85   public String getHandlerAddress() {
86     return HANDLER_INPROC_ADDRESS;
87   }
88
89   /**
90    *
91    */
92   public void start() {
93     Preconditions.checkState(State.STOPPED == this.getStatus(),
94         "Remote RPC Server is already running");
95
96     status = State.STARTING;
97     _logger.debug("Remote RPC Server is starting...");
98
99     String hostIpAddress = findIpAddress();
100
101     //Log and silently die as per discussion in the bug (bug-362)
102     //https://bugs.opendaylight.org/show_bug.cgi?id=362
103     //
104     // A tracking enhancement defect (bug-366) is created to properly fix this issue
105     //https://bugs.opendaylight.org/show_bug.cgi?id=366
106     //checkState(hostIpAddress != null, "Remote RPC Server could not acquire host ip address");
107
108     if (hostIpAddress == null) {
109       _logger.error("Remote RPC Server could not acquire host ip address. Stopping...");
110       stop();
111       return;
112     }
113
114     this.serverAddress = new StringBuilder(hostIpAddress).
115         append(":").
116         append(port).
117         toString();
118
119     context = ZMQ.context(1);
120     remoteServices = new HashSet<QName>();//
121     serverPool = Executors.newSingleThreadExecutor();//main server thread
122     serverPool.execute(receive()); // Start listening rpc requests
123
124     status = State.STARTED;
125     _logger.info("Remote RPC Server started [{}]", getServerAddress());
126   }
127
128   public void stop(){
129     close();
130   }
131
132   /**
133    *
134    */
135   @Override
136   public void close() {
137
138     if (State.STOPPED == this.getStatus()) return; //do nothing
139
140     if (serverPool != null)
141       serverPool.shutdown();
142
143     closeZmqContext();
144
145     status = State.STOPPED;
146     _logger.info("Remote RPC Server stopped");
147   }
148
149   /**
150    * Closes ZMQ Context. It tries to gracefully terminate the context. If
151    * termination takes more than 5 seconds, its forcefully shutdown.
152    */
153   private void closeZmqContext() {
154     ExecutorService exec = Executors.newSingleThreadExecutor();
155     FutureTask<?> zmqTermination = new FutureTask<Void>(new Runnable() {
156
157       @Override
158       public void run() {
159         try {
160           if (context != null)
161             context.term();
162           _logger.debug("ZMQ Context terminated gracefully!");
163         } catch (Exception e) {
164           _logger.debug("ZMQ Context termination threw exception [{}]. Continuing shutdown...", e);
165         }
166       }
167     }, null);
168
169     exec.execute(zmqTermination);
170
171     try {
172       zmqTermination.get(5L, TimeUnit.SECONDS);
173     } catch (Exception e) {/*ignore and continue with shutdown*/}
174
175     exec.shutdownNow();
176   }
177
178   /**
179    * Main listener thread that spawns {@link ServerRequestHandler} as workers.
180    *
181    * @return
182    */
183   private Runnable receive() {
184     return new Runnable() {
185
186       @Override
187       public void run() {
188         Thread.currentThread().setName("remote-rpc-server");
189         _logger.debug("Remote RPC Server main thread starting...");
190
191         //socket clients connect to (frontend)
192         ZMQ.Socket clients = context.socket(ZMQ.ROUTER);
193
194         //socket RequestHandlers connect to (backend)
195         ZMQ.Socket workers = context.socket(ZMQ.DEALER);
196
197         try (SocketPair capturePair = new SocketPair();
198              ServerRequestHandler requestHandler = new ServerRequestHandler(context,
199                  brokerSession,
200                  HANDLER_WORKER_COUNT,
201                  HANDLER_INPROC_ADDRESS,
202                  getServerAddress());) {
203
204           handler = requestHandler;
205           clients.setHWM(HWM);
206           clients.bind("tcp://*:" + port);
207           workers.setHWM(HWM);
208           workers.bind(HANDLER_INPROC_ADDRESS);
209           //start worker threads
210           _logger.debug("Remote RPC Server worker threads starting...");
211           requestHandler.start();
212           //start capture thread
213           // handlerPool.execute(new CaptureHandler(capturePair.getReceiver()));
214           //  Connect work threads to client threads via a queue
215           ZMQ.proxy(clients, workers, null);//capturePair.getSender());
216
217         } catch (Exception e) {
218           _logger.debug("Unhandled exception [{}, {}]", e.getClass(), e.getMessage());
219         } finally {
220           if (clients != null) clients.close();
221           if (workers != null) workers.close();
222           _logger.info("Remote RPC Server stopped");
223         }
224       }
225     };
226   }
227
228   /**
229    * Finds IPv4 address of the local VM
230    * TODO: This method is non-deterministic. There may be more than one IPv4 address. Cant say which
231    * address will be returned. Read IP from a property file or enhance the code to make it deterministic.
232    * Should we use IP or hostname?
233    *
234    * @return
235    */
236   private String findIpAddress() {
237     Enumeration<?> e = null;
238     try {
239       e = NetworkInterface.getNetworkInterfaces();
240     } catch (SocketException e1) {
241       _logger.error("Failed to get list of interfaces", e1);
242       return null;
243     }
244     while (e.hasMoreElements()) {
245
246       NetworkInterface n = (NetworkInterface) e.nextElement();
247
248       Enumeration<?> ee = n.getInetAddresses();
249       while (ee.hasMoreElements()) {
250         InetAddress i = (InetAddress) ee.nextElement();
251         _logger.debug("Trying address {}", i);
252         if ((i instanceof Inet4Address) && (!i.isLoopbackAddress())) {
253           String hostAddress = i.getHostAddress();
254           _logger.debug("Settled on host address {}", hostAddress);
255           return hostAddress;
256         }
257       }
258     }
259
260     _logger.error("Failed to find a suitable host address");
261     return null;
262   }
263
264 }