Merge "Bug 509: Fixed incorrect merging of Data Store Writes / Events"
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / implementation / src / main / java / org / opendaylight / controller / sal / connector / remoterpc / ServerImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.connector.remoterpc;
9
10 import com.google.common.base.Optional;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Predicate;
13 import com.google.common.collect.Sets;
14 import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
15 import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener;
16 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
17 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
18 import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
19 import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
20 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
21 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
22 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
23 import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
24 import org.opendaylight.yangtools.yang.common.QName;
25 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import org.zeromq.ZMQ;
29
30 import java.net.Inet4Address;
31 import java.net.InetAddress;
32 import java.net.NetworkInterface;
33 import java.net.SocketException;
34 import java.util.Enumeration;
35 import java.util.HashSet;
36 import java.util.Map;
37 import java.util.Set;
38 import java.util.concurrent.ExecutorService;
39 import java.util.concurrent.Executors;
40 import java.util.concurrent.FutureTask;
41 import java.util.concurrent.TimeUnit;
42
43 import static com.google.common.base.Preconditions.checkNotNull;
44 import static com.google.common.base.Preconditions.checkState;
45
46 /**
47  * ZeroMq based implementation of RpcRouter.
48  */
49 public class ServerImpl implements RemoteRpcServer {
50
51   private Logger _logger = LoggerFactory.getLogger(ServerImpl.class);
52
53   private ExecutorService serverPool;
54   protected ServerRequestHandler handler;
55
56   private Set<QName> remoteServices;
57   private ProviderSession brokerSession;
58   private ZMQ.Context context;
59
60   private final String HANDLER_INPROC_ADDRESS = "inproc://rpc-request-handler";
61   private final int HANDLER_WORKER_COUNT = 2;
62   private final int HWM = 200;//high water mark on sockets
63   private volatile State status = State.STOPPED;
64
65   private String serverAddress;
66   private int port;
67
68   public static enum State {
69     STARTING, STARTED, STOPPED;
70   }
71
72   public ServerImpl(int port) {
73     this.port = port;
74   }
75
76   public State getStatus() {
77     return this.status;
78   }
79
80   public Optional<ServerRequestHandler> getHandler() {
81     return Optional.fromNullable(this.handler);
82   }
83
84   public void setBrokerSession(ProviderSession session) {
85     this.brokerSession = session;
86   }
87
88   public Optional<ProviderSession> getBrokerSession() {
89     return Optional.fromNullable(this.brokerSession);
90   }
91
92   public Optional<ZMQ.Context> getZmqContext() {
93     return Optional.fromNullable(this.context);
94   }
95
96   public String getServerAddress() {
97     return serverAddress;
98   }
99
100   public String getHandlerAddress() {
101     return HANDLER_INPROC_ADDRESS;
102   }
103
104   /**
105    *
106    */
107   public void start() {
108     Preconditions.checkState(State.STOPPED == this.getStatus(),
109         "Remote RPC Server is already running");
110
111     status = State.STARTING;
112     _logger.debug("Remote RPC Server is starting...");
113
114     String hostIpAddress = findIpAddress();
115
116     //Log and silently die as per discussion in the bug (bug-362)
117     //https://bugs.opendaylight.org/show_bug.cgi?id=362
118     //
119     // A tracking enhancement defect (bug-366) is created to properly fix this issue
120     //https://bugs.opendaylight.org/show_bug.cgi?id=366
121     //checkState(hostIpAddress != null, "Remote RPC Server could not acquire host ip address");
122
123     if (hostIpAddress == null) {
124       _logger.error("Remote RPC Server could not acquire host ip address. Stopping...");
125       stop();
126       return;
127     }
128
129     this.serverAddress = new StringBuilder(hostIpAddress).
130         append(":").
131         append(port).
132         toString();
133
134     context = ZMQ.context(1);
135     remoteServices = new HashSet<QName>();//
136     serverPool = Executors.newSingleThreadExecutor();//main server thread
137     serverPool.execute(receive()); // Start listening rpc requests
138
139     status = State.STARTED;
140     _logger.info("Remote RPC Server started [{}]", getServerAddress());
141   }
142
143   public void stop(){
144     close();
145   }
146
147   /**
148    *
149    */
150   @Override
151   public void close() {
152
153     if (State.STOPPED == this.getStatus()) return; //do nothing
154
155     if (serverPool != null)
156       serverPool.shutdown();
157
158     closeZmqContext();
159
160     status = State.STOPPED;
161     _logger.info("Remote RPC Server stopped");
162   }
163
164   /**
165    * Closes ZMQ Context. It tries to gracefully terminate the context. If
166    * termination takes more than 5 seconds, its forcefully shutdown.
167    */
168   private void closeZmqContext() {
169     ExecutorService exec = Executors.newSingleThreadExecutor();
170     FutureTask zmqTermination = new FutureTask(new Runnable() {
171
172       @Override
173       public void run() {
174         try {
175           if (context != null)
176             context.term();
177           _logger.debug("ZMQ Context terminated gracefully!");
178         } catch (Exception e) {
179           _logger.debug("ZMQ Context termination threw exception [{}]. Continuing shutdown...", e);
180         }
181       }
182     }, null);
183
184     exec.execute(zmqTermination);
185
186     try {
187       zmqTermination.get(5L, TimeUnit.SECONDS);
188     } catch (Exception e) {/*ignore and continue with shutdown*/}
189
190     exec.shutdownNow();
191   }
192
193   /**
194    * Main listener thread that spawns {@link ServerRequestHandler} as workers.
195    *
196    * @return
197    */
198   private Runnable receive() {
199     return new Runnable() {
200
201       @Override
202       public void run() {
203         Thread.currentThread().setName("remote-rpc-server");
204         _logger.debug("Remote RPC Server main thread starting...");
205
206         //socket clients connect to (frontend)
207         ZMQ.Socket clients = context.socket(ZMQ.ROUTER);
208
209         //socket RequestHandlers connect to (backend)
210         ZMQ.Socket workers = context.socket(ZMQ.DEALER);
211
212         try (SocketPair capturePair = new SocketPair();
213              ServerRequestHandler requestHandler = new ServerRequestHandler(context,
214                  brokerSession,
215                  HANDLER_WORKER_COUNT,
216                  HANDLER_INPROC_ADDRESS,
217                  getServerAddress());) {
218
219           handler = requestHandler;
220           clients.setHWM(HWM);
221           clients.bind("tcp://*:" + port);
222           workers.setHWM(HWM);
223           workers.bind(HANDLER_INPROC_ADDRESS);
224           //start worker threads
225           _logger.debug("Remote RPC Server worker threads starting...");
226           requestHandler.start();
227           //start capture thread
228           // handlerPool.execute(new CaptureHandler(capturePair.getReceiver()));
229           //  Connect work threads to client threads via a queue
230           ZMQ.proxy(clients, workers, null);//capturePair.getSender());
231
232         } catch (Exception e) {
233           _logger.debug("Unhandled exception [{}, {}]", e.getClass(), e.getMessage());
234         } finally {
235           if (clients != null) clients.close();
236           if (workers != null) workers.close();
237           _logger.info("Remote RPC Server stopped");
238         }
239       }
240     };
241   }
242
243   /**
244    * Finds IPv4 address of the local VM
245    * TODO: This method is non-deterministic. There may be more than one IPv4 address. Cant say which
246    * address will be returned. Read IP from a property file or enhance the code to make it deterministic.
247    * Should we use IP or hostname?
248    *
249    * @return
250    */
251   private String findIpAddress() {
252     Enumeration e = null;
253     try {
254       e = NetworkInterface.getNetworkInterfaces();
255     } catch (SocketException e1) {
256       _logger.error("Failed to get list of interfaces", e1);
257       return null;
258     }
259     while (e.hasMoreElements()) {
260
261       NetworkInterface n = (NetworkInterface) e.nextElement();
262
263       Enumeration ee = n.getInetAddresses();
264       while (ee.hasMoreElements()) {
265         InetAddress i = (InetAddress) ee.nextElement();
266         _logger.debug("Trying address {}", i);
267         if ((i instanceof Inet4Address) && (!i.isLoopbackAddress())) {
268           String hostAddress = i.getHostAddress();
269           _logger.debug("Settled on host address {}", hostAddress);
270           return hostAddress;
271         }
272       }
273     }
274
275     _logger.error("Failed to find a suitable host address");
276     return null;
277   }
278
279 }