Bump versions by x.(y+1).z
[ovsdb.git] / library / impl / src / main / java / org / opendaylight / ovsdb / lib / impl / OvsdbConnectionService.java
1 /*
2  * Copyright © 2014, 2017 Red Hat, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.ovsdb.lib.impl;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.ThreadFactoryBuilder;
16 import io.netty.channel.Channel;
17 import io.netty.channel.ChannelFuture;
18 import io.netty.channel.ChannelFutureListener;
19 import io.netty.channel.ChannelInitializer;
20 import io.netty.channel.socket.SocketChannel;
21 import io.netty.handler.codec.string.StringEncoder;
22 import io.netty.handler.logging.LogLevel;
23 import io.netty.handler.logging.LoggingHandler;
24 import io.netty.handler.ssl.SslHandler;
25 import io.netty.handler.timeout.IdleStateHandler;
26 import io.netty.handler.timeout.ReadTimeoutHandler;
27 import java.net.InetAddress;
28 import java.nio.charset.StandardCharsets;
29 import java.util.ArrayList;
30 import java.util.Arrays;
31 import java.util.Collection;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Map.Entry;
35 import java.util.Set;
36 import java.util.concurrent.ConcurrentHashMap;
37 import java.util.concurrent.ExecutorService;
38 import java.util.concurrent.Executors;
39 import java.util.concurrent.ScheduledExecutorService;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicBoolean;
42 import javax.inject.Inject;
43 import javax.inject.Singleton;
44 import javax.net.ssl.SSLContext;
45 import javax.net.ssl.SSLEngine;
46 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
47 import javax.net.ssl.SSLPeerUnverifiedException;
48 import org.opendaylight.aaa.cert.api.ICertificateManager;
49 import org.opendaylight.ovsdb.lib.OvsdbClient;
50 import org.opendaylight.ovsdb.lib.OvsdbConnection;
51 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo.ConnectionType;
52 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo.SocketConnectionType;
53 import org.opendaylight.ovsdb.lib.OvsdbConnectionListener;
54 import org.opendaylight.ovsdb.lib.jsonrpc.ExceptionHandler;
55 import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcDecoder;
56 import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcEndpoint;
57 import org.osgi.service.component.annotations.Activate;
58 import org.osgi.service.component.annotations.Component;
59 import org.osgi.service.component.annotations.Reference;
60 import org.osgi.service.metatype.annotations.AttributeDefinition;
61 import org.osgi.service.metatype.annotations.Designate;
62 import org.osgi.service.metatype.annotations.ObjectClassDefinition;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
65
66 /**
67  * OvsDBConnectionService provides OVSDB connection management functionality which includes
68  * both Active and Passive connections.
69  * From the Library perspective, Active OVSDB connections are those that are initiated from
70  * the Controller towards the ovsdb-manager.
71  * While Passive OVSDB connections are those that are initiated from the ovs towards
72  * the controller.
73  *
74  * <p>Applications that use OvsDBConnectionService can use the OvsDBConnection class' connect APIs
75  * to initiate Active connections and can listen to the asynchronous Passive connections via
76  * registerConnectionListener listener API.
77  *
78  * <p>The library is designed as Java modular component that can work in both OSGi and non-OSGi
79  * environment. Hence a single instance of the service will be active (via Service Registry in OSGi)
80  * and a Singleton object in a non-OSGi environment.
81  */
82 @Singleton
83 @Component(service = OvsdbConnection.class, configurationPid = "org.opendaylight.ovsdb.library")
84 @Designate(ocd = OvsdbConnectionService.Configuration.class)
85 public class OvsdbConnectionService implements AutoCloseable, OvsdbConnection {
86     @ObjectClassDefinition
87     public @interface Configuration {
88         @AttributeDefinition
89         String ovsdb$_$listener$_$ip() default DEFAULT_LISTENER_IP;
90         @AttributeDefinition(min = "1", max = "65535")
91         int ovsdb$_$listener$_$port() default DEFAULT_LISTENER_PORT;
92         @AttributeDefinition
93         int ovsdb$_$rpc$_$task$_$timeout() default DEFAULT_RPC_TASK_TIMEOUT;
94         @AttributeDefinition
95         boolean use$_$ssl() default false;
96         @AttributeDefinition
97         int json$_$rpc$_$decoder$_$max$_$frame$_$length() default DEFAULT_JSON_RPC_DECODER_MAX_FRAME_LENGTH;
98     }
99
100     private class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
101         @Override
102         public void initChannel(final SocketChannel channel) throws Exception {
103             channel.pipeline().addLast(
104                 //new LoggingHandler(LogLevel.INFO),
105                 new JsonRpcDecoder(jsonRpcDecoderMaxFrameLength),
106                 UTF8_ENCODER,
107                 new IdleStateHandler(IDLE_READER_TIMEOUT, 0, 0),
108                 new ReadTimeoutHandler(READ_TIMEOUT),
109                 new ExceptionHandler(OvsdbConnectionService.this));
110         }
111     }
112
113     private class SslClientChannelInitializer extends ClientChannelInitializer {
114         private final ICertificateManager certManagerSrv;
115         private final InetAddress address;
116         private final int port;
117
118         SslClientChannelInitializer(final ICertificateManager certManagerSrv, final InetAddress address,
119                 final int port) {
120             this.certManagerSrv = requireNonNull(certManagerSrv);
121             this.address = requireNonNull(address);
122             this.port = port;
123         }
124
125         @Override
126         public void initChannel(final SocketChannel channel) throws Exception {
127             SSLContext sslContext = certManagerSrv.getServerContext();
128             if (sslContext != null) {
129                 /* First add ssl handler if ssl context is given */
130                 SSLEngine engine = sslContext.createSSLEngine(address.toString(), port);
131                 engine.setUseClientMode(true);
132                 channel.pipeline().addLast("ssl", new SslHandler(engine));
133             }
134
135             super.initChannel(channel);
136         }
137     }
138
139     private class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
140         @Override
141         public final void initChannel(final SocketChannel channel) {
142             LOG.debug("New Passive channel created : {}", channel);
143             initChannelImpl(channel);
144         }
145
146         void initChannelImpl(final SocketChannel channel) {
147             channel.pipeline().addLast(
148                 new JsonRpcDecoder(jsonRpcDecoderMaxFrameLength),
149                 UTF8_ENCODER,
150                 new IdleStateHandler(IDLE_READER_TIMEOUT, 0, 0),
151                 new ReadTimeoutHandler(READ_TIMEOUT),
152                 new ExceptionHandler(OvsdbConnectionService.this));
153             handleNewPassiveConnection(channel);
154         }
155     }
156
157     private final class SslServerChannelInitializer extends ServerChannelInitializer {
158         private final ICertificateManager certManagerSrv;
159         private final String[] protocols;
160         private final String[] cipherSuites;
161
162         SslServerChannelInitializer(final ICertificateManager certManagerSrv, final String[] protocols,
163                 final String[] cipherSuites) {
164             this.certManagerSrv = requireNonNull(certManagerSrv);
165             this.protocols = requireNonNull(protocols);
166             this.cipherSuites = requireNonNull(cipherSuites);
167
168         }
169
170         SslServerChannelInitializer(final ICertificateManager certManagerSrv) {
171             this(certManagerSrv, certManagerSrv.getTlsProtocols(), certManagerSrv.getCipherSuites());
172         }
173
174         @Override
175         void initChannelImpl(final SocketChannel channel) {
176             /* Add SSL handler first if SSL context is provided */
177             final SSLContext sslContext = certManagerSrv.getServerContext();
178             if (sslContext != null) {
179                 SSLEngine engine = sslContext.createSSLEngine();
180                 engine.setUseClientMode(false); // work in a server mode
181                 engine.setNeedClientAuth(true); // need client authentication
182                 if (protocols != null && protocols.length > 0) {
183                     //Set supported protocols
184                     engine.setEnabledProtocols(protocols);
185                     LOG.debug("Supported ssl protocols {}",
186                         Arrays.toString(engine.getSupportedProtocols()));
187                     LOG.debug("Enabled ssl protocols {}",
188                         Arrays.toString(engine.getEnabledProtocols()));
189                 }
190                 if (cipherSuites != null && cipherSuites.length > 0) {
191                     //Set supported cipher suites
192                     engine.setEnabledCipherSuites(cipherSuites);
193                     LOG.debug("Enabled cipher suites {}",
194                         Arrays.toString(engine.getEnabledCipherSuites()));
195                 }
196                 channel.pipeline().addLast("ssl", new SslHandler(engine));
197             }
198             super.initChannelImpl(channel);
199         }
200     }
201
202     private static final Logger LOG = LoggerFactory.getLogger(OvsdbConnectionService.class);
203     private static final int IDLE_READER_TIMEOUT = 30;
204     private static final int READ_TIMEOUT = 180;
205     private static final int RETRY_PERIOD = 100; // retry after 100 milliseconds
206     private static final String DEFAULT_LISTENER_IP = "0.0.0.0";
207     private static final int DEFAULT_LISTENER_PORT = 6640;
208     private static final int DEFAULT_RPC_TASK_TIMEOUT = 1000;
209     private static final int DEFAULT_JSON_RPC_DECODER_MAX_FRAME_LENGTH = 100000;
210
211     private static final StringEncoder UTF8_ENCODER = new StringEncoder(StandardCharsets.UTF_8);
212
213     private static final ScheduledExecutorService EXECUTOR_SERVICE = Executors.newScheduledThreadPool(10,
214             new ThreadFactoryBuilder().setNameFormat("OVSDBPassiveConnServ-%d").build());
215
216     private static final ExecutorService CONNECTION_NOTIFIER_SERVICE = Executors
217             .newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("OVSDBConnNotifSer-%d").build());
218
219     private static final StalePassiveConnectionService STALE_PASSIVE_CONNECTION_SERVICE =
220             new StalePassiveConnectionService(client -> {
221                 notifyListenerForPassiveConnection(client);
222                 return null;
223             });
224
225     // FIXME: these should not be static
226     private static final Set<OvsdbConnectionListener> CONNECTION_LISTENERS = ConcurrentHashMap.newKeySet();
227     private static final Map<OvsdbClient, Channel> CONNECTIONS = new ConcurrentHashMap<>();
228
229     private final NettyBootstrapFactory bootstrapFactory;
230     private final ICertificateManager certManagerSrv;
231
232     private final boolean useSSL;
233     private final int jsonRpcDecoderMaxFrameLength;
234
235     private final AtomicBoolean singletonCreated = new AtomicBoolean(false);
236     private volatile Channel serverChannel;
237
238     private final String listenerIp;
239     private final int listenerPort;
240
241     @Inject
242     public OvsdbConnectionService(final NettyBootstrapFactory bootstrapFactory,
243             final ICertificateManager certManagerSrv) {
244         this(bootstrapFactory, certManagerSrv, DEFAULT_LISTENER_IP, DEFAULT_LISTENER_PORT, DEFAULT_RPC_TASK_TIMEOUT,
245             false, DEFAULT_JSON_RPC_DECODER_MAX_FRAME_LENGTH);
246     }
247
248     @Activate
249     public OvsdbConnectionService(@Reference final NettyBootstrapFactory bootstrapFactory,
250             @Reference(target = "(type=default-certificate-manager)") final ICertificateManager certManagerSrv,
251             final Configuration configuration) {
252         this(bootstrapFactory, certManagerSrv, configuration.ovsdb$_$listener$_$ip(),
253             configuration.ovsdb$_$listener$_$port(), configuration.ovsdb$_$rpc$_$task$_$timeout(),
254             configuration.use$_$ssl(), configuration.json$_$rpc$_$decoder$_$max$_$frame$_$length());
255     }
256
257     public OvsdbConnectionService(final NettyBootstrapFactory bootstrapFactory,
258             final ICertificateManager certManagerSrv, final String listenerIp, final int listenerPort,
259             final int ovsdbRpcTaskTimeout, final boolean useSSL, final int jsonRpcDecoderMaxFrameLength) {
260         this.bootstrapFactory = requireNonNull(bootstrapFactory);
261         this.certManagerSrv = requireNonNull(certManagerSrv);
262         this.listenerIp = requireNonNull(listenerIp);
263         this.listenerPort = listenerPort;
264         this.useSSL = useSSL;
265         this.jsonRpcDecoderMaxFrameLength = jsonRpcDecoderMaxFrameLength;
266
267         // FIXME: static state!
268         JsonRpcEndpoint.setReaperInterval(ovsdbRpcTaskTimeout);
269         LOG.info("OVSDB IP for listening connection is set to : {}", listenerIp);
270         LOG.info("OVSDB port for listening connection is set to : {}", listenerPort);
271         LOG.info("Json Rpc Decoder Max Frame Length set to : {}", jsonRpcDecoderMaxFrameLength);
272     }
273
274     /**
275      * If the SSL flag is enabled, the method internally will establish TLS communication using the default
276      * ODL certificateManager SSLContext and attributes.
277      */
278     @Override
279     public OvsdbClient connect(final InetAddress address, final int port) {
280         if (useSSL) {
281             if (certManagerSrv == null) {
282                 LOG.error("Certificate Manager service is not available cannot establish the SSL communication.");
283                 return null;
284             }
285             return connectWithSsl(address, port, certManagerSrv);
286         } else {
287             return connectWithSsl(address, port, null /* SslContext */);
288         }
289     }
290
291     @Override
292     @SuppressWarnings("checkstyle:IllegalCatch")
293     public OvsdbClient connectWithSsl(final InetAddress address, final int port,
294             final ICertificateManager certificateManagerSrv) {
295
296         final ChannelFuture future = bootstrapFactory.newClient()
297                 .handler(certificateManagerSrv == null ? new ClientChannelInitializer() :
298                         new SslClientChannelInitializer(certificateManagerSrv, address, port))
299                 .connect(address, port);
300
301         try {
302             future.sync();
303         } catch (InterruptedException e) {
304             LOG.warn("Failed to connect {}:{}", address, port, e);
305             return null;
306         } catch (Throwable throwable) {
307             // sync() re-throws exceptions declared as Throwable, so the compiler doesn't see them
308             LOG.error("Error while binding to address {}, port {}", address, port, throwable);
309             throw throwable;
310         }
311
312         return getChannelClient(future.channel(), ConnectionType.ACTIVE, SocketConnectionType.SSL);
313     }
314
315     @Override
316     public void disconnect(final OvsdbClient client) {
317         if (client == null) {
318             return;
319         }
320         Channel channel = CONNECTIONS.get(client);
321         if (channel != null) {
322             //It's an explicit disconnect from user, so no need to notify back
323             //to user about the disconnect.
324             client.setConnectionPublished(false);
325             channel.disconnect();
326         }
327         CONNECTIONS.remove(client);
328     }
329
330     @Override
331     public void registerConnectionListener(final OvsdbConnectionListener listener) {
332         LOG.info("registerConnectionListener: registering {}", listener.getClass().getSimpleName());
333         if (CONNECTION_LISTENERS.add(listener)) {
334             LOG.info("registerConnectionListener: registered {} notifying exisitng connections",
335                     listener.getClass().getSimpleName());
336             //notify only the first time if called multiple times
337             notifyAlreadyExistingConnectionsToListener(listener);
338         }
339     }
340
341     private void notifyAlreadyExistingConnectionsToListener(final OvsdbConnectionListener listener) {
342         for (final OvsdbClient client : getConnections()) {
343             CONNECTION_NOTIFIER_SERVICE.execute(() -> {
344                 LOG.trace("Connection {} notified to listener {}", client.getConnectionInfo(), listener);
345                 listener.connected(client);
346             });
347         }
348     }
349
350     @Override
351     public void unregisterConnectionListener(final OvsdbConnectionListener listener) {
352         CONNECTION_LISTENERS.remove(listener);
353     }
354
355     private static OvsdbClient getChannelClient(final Channel channel, final ConnectionType type,
356             final SocketConnectionType socketConnType) {
357
358         JsonRpcEndpoint endpoint = new JsonRpcEndpoint(channel);
359         channel.pipeline().addLast(endpoint);
360
361         OvsdbClientImpl client = new OvsdbClientImpl(endpoint, channel, type, socketConnType);
362         client.setConnectionPublished(true);
363         CONNECTIONS.put(client, channel);
364         channel.closeFuture().addListener(new ChannelConnectionHandler(client));
365         return client;
366     }
367
368     /**
369      * Method that initiates the Passive OVSDB channel listening functionality.
370      * By default the ovsdb passive connection will listen in port 6640 which can
371      * be overridden using the ovsdb.listenPort system property.
372      */
373     @Override
374     public synchronized boolean startOvsdbManager() {
375         final int ovsdbListenerPort = listenerPort;
376         final String ovsdbListenerIp = listenerIp;
377         if (singletonCreated.getAndSet(true)) {
378             return false;
379         }
380
381         LOG.info("startOvsdbManager: Starting");
382         ovsdbManager(ovsdbListenerIp, ovsdbListenerPort);
383         return true;
384     }
385
386     /**
387      * Method that initiates the Passive OVSDB channel listening functionality
388      * with ssl.By default the ovsdb passive connection will listen in port
389      * 6640 which can be overridden using the ovsdb.listenPort system property.
390      */
391     @Override
392     public synchronized boolean startOvsdbManagerWithSsl(final String ovsdbListenIp, final int ovsdbListenPort,
393                                                          final ICertificateManager certificateManagerSrv,
394                                                          final String[] protocols, final String[] cipherSuites) {
395         if (!singletonCreated.getAndSet(true)) {
396             ovsdbManagerWithSsl(ovsdbListenIp, ovsdbListenPort,
397                 certificateManagerSrv == null ? new ServerChannelInitializer()
398                         : new SslServerChannelInitializer(certificateManagerSrv, protocols, cipherSuites));
399             return true;
400         } else {
401             return false;
402         }
403     }
404
405     @Override
406     public synchronized boolean restartOvsdbManagerWithSsl(final String ovsdbListenIp, final int ovsdbListenPort,
407             final ICertificateManager certificateManagerSrv, final String[] protocols, final String[] cipherSuites) {
408         if (singletonCreated.getAndSet(false) && serverChannel != null) {
409             serverChannel.close();
410             LOG.info("Server channel closed");
411         }
412         serverChannel = null;
413         return startOvsdbManagerWithSsl(ovsdbListenIp, ovsdbListenPort,
414             certificateManagerSrv, protocols, cipherSuites);
415     }
416
417     /**
418      * OVSDB Passive listening thread that uses Netty ServerBootstrap to open
419      * passive connection handle channel callbacks.
420      * If the SSL flag is enabled, the method internally will establish TLS communication using the default
421      * ODL certificateManager SSLContext and attributes.
422      */
423     private void ovsdbManager(final String ip, final int port) {
424         if (useSSL) {
425             if (certManagerSrv == null) {
426                 LOG.error("Certificate Manager service is not available cannot establish the SSL communication.");
427                 return;
428             }
429             ovsdbManagerWithSsl(ip, port, new SslServerChannelInitializer(certManagerSrv));
430         } else {
431             ovsdbManagerWithSsl(ip, port, new ServerChannelInitializer());
432         }
433     }
434
435     /**
436      * OVSDB Passive listening thread that uses Netty ServerBootstrap to open
437      * passive connection with Ssl and handle channel callbacks.
438      */
439     private void ovsdbManagerWithSsl(final String ip, final int port, final ServerChannelInitializer channelHandler) {
440         bootstrapFactory.newServer()
441             .handler(new LoggingHandler(LogLevel.INFO))
442             .childHandler(channelHandler)
443             // Start the server.
444             .bind(ip, port)
445             // Propagate the channel when its ready
446             .addListener((ChannelFutureListener) future -> {
447                 if (future.isSuccess()) {
448                     serverChannel = future.channel();
449                 } else {
450                     LOG.error("Error while binding to address {}, port {}", ip, port, future.cause());
451                 }
452             });
453     }
454
455     @SuppressWarnings("checkstyle:IllegalCatch")
456     private static void handleNewPassiveConnection(final OvsdbClient client) {
457         ListenableFuture<List<String>> echoFuture = client.echo();
458         LOG.debug("Send echo message to probe the OVSDB switch {}", client.getConnectionInfo());
459         Futures.addCallback(echoFuture, new FutureCallback<List<String>>() {
460             @Override
461             public void onSuccess(final List<String> result) {
462                 LOG.info("Probe was successful to OVSDB switch {}", client.getConnectionInfo());
463                 //List<OvsdbClient> clientsFromSameNode = getPassiveClientsFromSameNode(client);
464                 try {
465                     getPassiveClientsFromSameNode(client);
466                 } catch (Throwable throwable) {
467                     LOG.error("Failed to get passive clients from same node", throwable);
468                 }
469                 notifyListenerForPassiveConnection(client);
470                 /*
471                 if (clientsFromSameNode.size() == 0) {
472                     notifyListenerForPassiveConnection(client);
473                 } else {
474                     STALE_PASSIVE_CONNECTION_SERVICE.handleNewPassiveConnection(client, clientsFromSameNode);
475                 }
476                 */
477             }
478
479             @Override
480             public void onFailure(final Throwable failureException) {
481                 LOG.error("Probe failed to OVSDB switch. Disconnecting the channel {}", client.getConnectionInfo());
482                 client.disconnect();
483             }
484         }, CONNECTION_NOTIFIER_SERVICE);
485     }
486
487     private static void handleNewPassiveConnection(final Channel channel) {
488         if (!channel.isOpen()) {
489             LOG.warn("Channel {} is not open, skipped further processing of the connection.",channel);
490             return;
491         }
492         SslHandler sslHandler = (SslHandler) channel.pipeline().get("ssl");
493         if (sslHandler != null) {
494             class HandleNewPassiveSslRunner implements Runnable {
495                 private int retryTimes = 3;
496
497                 private void retry() {
498                     if (retryTimes > 0) {
499                         EXECUTOR_SERVICE.schedule(this,  RETRY_PERIOD, TimeUnit.MILLISECONDS);
500                     } else {
501                         LOG.debug("channel closed {}", channel);
502                         channel.disconnect();
503                     }
504                     retryTimes--;
505                 }
506
507                 @Override
508                 public void run() {
509                     HandshakeStatus status = sslHandler.engine().getHandshakeStatus();
510                     LOG.debug("Handshake status {}", status);
511                     switch (status) {
512                         case FINISHED:
513                         case NOT_HANDSHAKING:
514                             if (sslHandler.engine().getSession().getCipherSuite()
515                                     .equals("SSL_NULL_WITH_NULL_NULL")) {
516                                 // Not begin handshake yet. Retry later.
517                                 LOG.debug("handshake not begin yet {}", status);
518                                 retry();
519                             } else {
520                               //Check if peer is trusted before notifying listeners
521                                 try {
522                                     sslHandler.engine().getSession().getPeerCertificates();
523                                     //Handshake done. Notify listener.
524                                     OvsdbClient client = getChannelClient(channel, ConnectionType.PASSIVE,
525                                         SocketConnectionType.SSL);
526                                     handleNewPassiveConnection(client);
527                                 } catch (SSLPeerUnverifiedException e) {
528                                     //Trust manager is still checking peer certificate. Retry later
529                                     LOG.debug("Peer certifiacte is not verified yet {}", status);
530                                     retry();
531                                 }
532                             }
533                             break;
534
535                         case NEED_UNWRAP:
536                         case NEED_TASK:
537                             //Handshake still ongoing. Retry later.
538                             LOG.debug("handshake not done yet {}", status);
539                             retry();
540                             break;
541
542                         case NEED_WRAP:
543                             if (sslHandler.engine().getSession().getCipherSuite()
544                                     .equals("SSL_NULL_WITH_NULL_NULL")) {
545                                 /* peer not authenticated. No need to notify listener in this case. */
546                                 LOG.error("Ssl handshake fail. channel {}", channel);
547                                 channel.disconnect();
548                             } else {
549                                 /*
550                                  * peer is authenticated. Give some time to wait for completion.
551                                  * If status is still NEED_WRAP, client might already disconnect.
552                                  * This happens when the first time client connects to controller in two-way handshake.
553                                  * After obtaining controller certificate, client will disconnect and start
554                                  * new connection with controller certificate it obtained.
555                                  * In this case no need to do anything for the first connection attempt. Just skip
556                                  * since client will reconnect later.
557                                  */
558                                 LOG.debug("handshake not done yet {}", status);
559                                 retry();
560                             }
561                             break;
562
563                         default:
564                             LOG.error("unknown hadshake status {}", status);
565                     }
566                 }
567             }
568
569             EXECUTOR_SERVICE.schedule(new HandleNewPassiveSslRunner(),
570                     RETRY_PERIOD, TimeUnit.MILLISECONDS);
571         } else {
572             EXECUTOR_SERVICE.execute(() -> {
573                 OvsdbClient client = getChannelClient(channel, ConnectionType.PASSIVE,
574                     SocketConnectionType.NON_SSL);
575                 handleNewPassiveConnection(client);
576             });
577         }
578     }
579
580     public static void channelClosed(final OvsdbClient client) {
581         LOG.info("Connection closed {}", client.getConnectionInfo());
582         CONNECTIONS.remove(client);
583         if (client.isConnectionPublished()) {
584             for (OvsdbConnectionListener listener : CONNECTION_LISTENERS) {
585                 listener.disconnected(client);
586             }
587         }
588         STALE_PASSIVE_CONNECTION_SERVICE.clientDisconnected(client);
589     }
590
591     @Override
592     public Collection<OvsdbClient> getConnections() {
593         return CONNECTIONS.keySet();
594     }
595
596     @Override
597     public void close() throws Exception {
598         LOG.info("OvsdbConnectionService closed");
599         JsonRpcEndpoint.close();
600     }
601
602     @Override
603     public OvsdbClient getClient(final Channel channel) {
604         for (Entry<OvsdbClient, Channel> entry : CONNECTIONS.entrySet()) {
605             OvsdbClient client = entry.getKey();
606             Channel ctx = entry.getValue();
607             if (ctx.equals(channel)) {
608                 return client;
609             }
610         }
611         return null;
612     }
613
614     private static List<OvsdbClient> getPassiveClientsFromSameNode(final OvsdbClient ovsdbClient) {
615         List<OvsdbClient> passiveClients = new ArrayList<>();
616         for (OvsdbClient client : CONNECTIONS.keySet()) {
617             if (!client.equals(ovsdbClient)
618                     && client.getConnectionInfo().getRemoteAddress()
619                             .equals(ovsdbClient.getConnectionInfo().getRemoteAddress())
620                     && client.getConnectionInfo().getType() == ConnectionType.PASSIVE) {
621                 passiveClients.add(client);
622             }
623         }
624         return passiveClients;
625     }
626
627     public static void notifyListenerForPassiveConnection(final OvsdbClient client) {
628         client.setConnectionPublished(true);
629         for (final OvsdbConnectionListener listener : CONNECTION_LISTENERS) {
630             CONNECTION_NOTIFIER_SERVICE.execute(() -> {
631                 LOG.trace("Connection {} notified to listener {}", client.getConnectionInfo(), listener);
632                 listener.connected(client);
633             });
634         }
635     }
636 }