Reuse StringEncoders for all connections
[ovsdb.git] / library / impl / src / main / java / org / opendaylight / ovsdb / lib / impl / OvsdbConnectionService.java
1 /*
2  * Copyright © 2014, 2017 Red Hat, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.ovsdb.lib.impl;
9
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import com.google.common.util.concurrent.ThreadFactoryBuilder;
14 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
15 import io.netty.bootstrap.Bootstrap;
16 import io.netty.bootstrap.ServerBootstrap;
17 import io.netty.channel.AdaptiveRecvByteBufAllocator;
18 import io.netty.channel.Channel;
19 import io.netty.channel.ChannelFuture;
20 import io.netty.channel.ChannelInitializer;
21 import io.netty.channel.ChannelOption;
22 import io.netty.channel.EventLoopGroup;
23 import io.netty.channel.nio.NioEventLoopGroup;
24 import io.netty.channel.socket.SocketChannel;
25 import io.netty.channel.socket.nio.NioServerSocketChannel;
26 import io.netty.channel.socket.nio.NioSocketChannel;
27 import io.netty.handler.codec.string.StringEncoder;
28 import io.netty.handler.logging.LogLevel;
29 import io.netty.handler.logging.LoggingHandler;
30 import io.netty.handler.ssl.SslHandler;
31 import io.netty.handler.timeout.IdleStateHandler;
32 import io.netty.handler.timeout.ReadTimeoutHandler;
33 import java.net.InetAddress;
34 import java.nio.charset.StandardCharsets;
35 import java.util.ArrayList;
36 import java.util.Arrays;
37 import java.util.Collection;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.Map.Entry;
41 import java.util.Set;
42 import java.util.concurrent.ConcurrentHashMap;
43 import java.util.concurrent.ExecutorService;
44 import java.util.concurrent.Executors;
45 import java.util.concurrent.ScheduledExecutorService;
46 import java.util.concurrent.TimeUnit;
47 import java.util.concurrent.atomic.AtomicBoolean;
48 import javax.inject.Inject;
49 import javax.inject.Singleton;
50 import javax.net.ssl.SSLContext;
51 import javax.net.ssl.SSLEngine;
52 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
53 import javax.net.ssl.SSLPeerUnverifiedException;
54 import org.apache.aries.blueprint.annotation.service.Reference;
55 import org.apache.aries.blueprint.annotation.service.Service;
56 import org.eclipse.jdt.annotation.Nullable;
57 import org.opendaylight.aaa.cert.api.ICertificateManager;
58 import org.opendaylight.ovsdb.lib.OvsdbClient;
59 import org.opendaylight.ovsdb.lib.OvsdbConnection;
60 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo.ConnectionType;
61 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo.SocketConnectionType;
62 import org.opendaylight.ovsdb.lib.OvsdbConnectionListener;
63 import org.opendaylight.ovsdb.lib.jsonrpc.ExceptionHandler;
64 import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcDecoder;
65 import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcEndpoint;
66 import org.slf4j.Logger;
67 import org.slf4j.LoggerFactory;
68
69 /**
70  * OvsDBConnectionService provides OVSDB connection management functionality which includes
71  * both Active and Passive connections.
72  * From the Library perspective, Active OVSDB connections are those that are initiated from
73  * the Controller towards the ovsdb-manager.
74  * While Passive OVSDB connections are those that are initiated from the ovs towards
75  * the controller.
76  *
77  * <p>Applications that use OvsDBConnectionService can use the OvsDBConnection class' connect APIs
78  * to initiate Active connections and can listen to the asynchronous Passive connections via
79  * registerConnectionListener listener API.
80  *
81  * <p>The library is designed as Java modular component that can work in both OSGi and non-OSGi
82  * environment. Hence a single instance of the service will be active (via Service Registry in OSGi)
83  * and a Singleton object in a non-OSGi environment.
84  */
85 @Singleton
86 @Service(classes = OvsdbConnection.class)
87 public class OvsdbConnectionService implements AutoCloseable, OvsdbConnection {
88     private static final Logger LOG = LoggerFactory.getLogger(OvsdbConnectionService.class);
89     private static final int IDLE_READER_TIMEOUT = 30;
90     private static final int READ_TIMEOUT = 180;
91     private static final String OVSDB_RPC_TASK_TIMEOUT_PARAM = "ovsdb-rpc-task-timeout";
92     private static final String USE_SSL = "use-ssl";
93     private static final int RETRY_PERIOD = 100; // retry after 100 milliseconds
94
95     private static final StringEncoder UTF8_ENCODER = new StringEncoder(StandardCharsets.UTF_8);
96
97     private static final ScheduledExecutorService EXECUTOR_SERVICE = Executors.newScheduledThreadPool(10,
98             new ThreadFactoryBuilder().setNameFormat("OVSDBPassiveConnServ-%d").build());
99
100     private static final ExecutorService CONNECTION_NOTIFIER_SERVICE = Executors
101             .newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("OVSDBConnNotifSer-%d").build());
102
103     private static final StalePassiveConnectionService STALE_PASSIVE_CONNECTION_SERVICE =
104             new StalePassiveConnectionService((client) -> {
105                 notifyListenerForPassiveConnection(client);
106                 return null;
107             });
108
109     private static final Set<OvsdbConnectionListener> CONNECTION_LISTENERS = ConcurrentHashMap.newKeySet();
110     private static final Map<OvsdbClient, Channel> CONNECTIONS = new ConcurrentHashMap<>();
111
112     private volatile boolean useSSL = false;
113     private final ICertificateManager certManagerSrv;
114
115     private volatile int jsonRpcDecoderMaxFrameLength = 100000;
116     private volatile Channel serverChannel;
117
118     private final AtomicBoolean singletonCreated = new AtomicBoolean(false);
119     private volatile String listenerIp = "0.0.0.0";
120     private volatile int listenerPort = 6640;
121
122     @Inject
123     public OvsdbConnectionService(@Reference(filter = "type=default-certificate-manager")
124             final ICertificateManager certManagerSrv) {
125         this.certManagerSrv = certManagerSrv;
126     }
127
128     /**
129      * If the SSL flag is enabled, the method internally will establish TLS communication using the default
130      * ODL certificateManager SSLContext and attributes.
131      */
132     @Override
133     public OvsdbClient connect(final InetAddress address, final int port) {
134         if (useSSL) {
135             if (certManagerSrv == null) {
136                 LOG.error("Certificate Manager service is not available cannot establish the SSL communication.");
137                 return null;
138             }
139             return connectWithSsl(address, port, certManagerSrv);
140         } else {
141             return connectWithSsl(address, port, null /* SslContext */);
142         }
143     }
144
145     @Override
146     @SuppressWarnings("checkstyle:IllegalCatch")
147     public OvsdbClient connectWithSsl(final InetAddress address, final int port,
148             final ICertificateManager certificateManagerSrv) {
149         try {
150             Bootstrap bootstrap = new Bootstrap();
151             bootstrap.group(new NioEventLoopGroup());
152             bootstrap.channel(NioSocketChannel.class);
153             bootstrap.option(ChannelOption.TCP_NODELAY, true);
154             bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
155
156             bootstrap.handler(new ChannelInitializer<SocketChannel>() {
157                 @Override
158                 public void initChannel(final SocketChannel channel) throws Exception {
159                     if (certificateManagerSrv != null && certificateManagerSrv.getServerContext() != null) {
160                         SSLContext sslContext = certificateManagerSrv.getServerContext();
161                         /* First add ssl handler if ssl context is given */
162                         SSLEngine engine =
163                             sslContext.createSSLEngine(address.toString(), port);
164                         engine.setUseClientMode(true);
165                         channel.pipeline().addLast("ssl", new SslHandler(engine));
166                     }
167                     channel.pipeline().addLast(
168                             //new LoggingHandler(LogLevel.INFO),
169                             new JsonRpcDecoder(jsonRpcDecoderMaxFrameLength),
170                             UTF8_ENCODER,
171                             new IdleStateHandler(IDLE_READER_TIMEOUT, 0, 0),
172                             new ReadTimeoutHandler(READ_TIMEOUT),
173                             new ExceptionHandler(OvsdbConnectionService.this));
174                 }
175             });
176
177             ChannelFuture future = bootstrap.connect(address, port).sync();
178             Channel channel = future.channel();
179             return getChannelClient(channel, ConnectionType.ACTIVE, SocketConnectionType.SSL);
180         } catch (InterruptedException e) {
181             LOG.warn("Failed to connect {}:{}", address, port, e);
182         } catch (Throwable throwable) {
183             // sync() re-throws exceptions declared as Throwable, so the compiler doesn't see them
184             LOG.error("Error while binding to address {}, port {}", address, port, throwable);
185             throw throwable;
186         }
187         return null;
188     }
189
190     @Override
191     public void disconnect(final OvsdbClient client) {
192         if (client == null) {
193             return;
194         }
195         Channel channel = CONNECTIONS.get(client);
196         if (channel != null) {
197             //It's an explicit disconnect from user, so no need to notify back
198             //to user about the disconnect.
199             client.setConnectionPublished(false);
200             channel.disconnect();
201         }
202         CONNECTIONS.remove(client);
203     }
204
205     @Override
206     public void registerConnectionListener(final OvsdbConnectionListener listener) {
207         LOG.info("registerConnectionListener: registering {}", listener.getClass().getSimpleName());
208         CONNECTION_LISTENERS.add(listener);
209         notifyAlreadyExistingConnectionsToListener(listener);
210     }
211
212     private void notifyAlreadyExistingConnectionsToListener(final OvsdbConnectionListener listener) {
213         for (final OvsdbClient client : getConnections()) {
214             CONNECTION_NOTIFIER_SERVICE.execute(() -> {
215                 LOG.trace("Connection {} notified to listener {}", client.getConnectionInfo(), listener);
216                 listener.connected(client);
217             });
218         }
219     }
220
221     @Override
222     public void unregisterConnectionListener(final OvsdbConnectionListener listener) {
223         CONNECTION_LISTENERS.remove(listener);
224     }
225
226     private static OvsdbClient getChannelClient(final Channel channel, final ConnectionType type,
227             final SocketConnectionType socketConnType) {
228
229         JsonRpcEndpoint endpoint = new JsonRpcEndpoint(channel);
230         channel.pipeline().addLast(endpoint);
231
232         OvsdbClientImpl client = new OvsdbClientImpl(endpoint, channel, type, socketConnType);
233         client.setConnectionPublished(true);
234         CONNECTIONS.put(client, channel);
235         channel.closeFuture().addListener(new ChannelConnectionHandler(client));
236         return client;
237     }
238
239     /**
240      * Method that initiates the Passive OVSDB channel listening functionality.
241      * By default the ovsdb passive connection will listen in port 6640 which can
242      * be overridden using the ovsdb.listenPort system property.
243      */
244     @Override
245     public synchronized boolean startOvsdbManager() {
246         final int ovsdbListenerPort = this.listenerPort;
247         final String ovsdbListenerIp = this.listenerIp;
248         if (!singletonCreated.getAndSet(true)) {
249             LOG.info("startOvsdbManager: Starting");
250             new Thread(() -> ovsdbManager(ovsdbListenerIp, ovsdbListenerPort)).start();
251             return true;
252         } else {
253             return false;
254         }
255     }
256
257     /**
258      * Method that initiates the Passive OVSDB channel listening functionality
259      * with ssl.By default the ovsdb passive connection will listen in port
260      * 6640 which can be overridden using the ovsdb.listenPort system property.
261      */
262     @Override
263     public synchronized boolean startOvsdbManagerWithSsl(final String ovsdbListenIp, final int ovsdbListenPort,
264                                                          final ICertificateManager certificateManagerSrv,
265                                                          final String[] protocols, final String[] cipherSuites) {
266         if (!singletonCreated.getAndSet(true)) {
267             new Thread(() -> ovsdbManagerWithSsl(ovsdbListenIp, ovsdbListenPort,
268                     certificateManagerSrv, protocols, cipherSuites)).start();
269             return true;
270         } else {
271             return false;
272         }
273     }
274
275     @Override
276     public synchronized boolean restartOvsdbManagerWithSsl(final String ovsdbListenIp,
277         final int ovsdbListenPort,
278         final ICertificateManager certificateManagerSrv,
279         final String[] protocols,
280         final String[] cipherSuites) {
281         if (singletonCreated.getAndSet(false) && serverChannel != null) {
282             serverChannel.close();
283             LOG.info("Server channel closed");
284         }
285         serverChannel = null;
286         return startOvsdbManagerWithSsl(ovsdbListenIp, ovsdbListenPort,
287             certificateManagerSrv, protocols, cipherSuites);
288     }
289
290     /**
291      * OVSDB Passive listening thread that uses Netty ServerBootstrap to open
292      * passive connection handle channel callbacks.
293      * If the SSL flag is enabled, the method internally will establish TLS communication using the default
294      * ODL certificateManager SSLContext and attributes.
295      */
296     private void ovsdbManager(final String ip, final int port) {
297         if (useSSL) {
298             if (certManagerSrv == null) {
299                 LOG.error("Certificate Manager service is not available cannot establish the SSL communication.");
300                 return;
301             }
302             ovsdbManagerWithSsl(ip, port, certManagerSrv, certManagerSrv.getTlsProtocols(),
303                     certManagerSrv.getCipherSuites());
304         } else {
305             ovsdbManagerWithSsl(ip, port, null /* SslContext */, null, null);
306         }
307     }
308
309     /**
310      * OVSDB Passive listening thread that uses Netty ServerBootstrap to open
311      * passive connection with Ssl and handle channel callbacks.
312      */
313     @SuppressWarnings("checkstyle:IllegalCatch")
314     private void ovsdbManagerWithSsl(final String ip, final int port, final ICertificateManager certificateManagerSrv,
315                                             final String[] protocols, final String[] cipherSuites) {
316         EventLoopGroup bossGroup = new NioEventLoopGroup();
317         EventLoopGroup workerGroup = new NioEventLoopGroup();
318         try {
319             ServerBootstrap serverBootstrap = new ServerBootstrap();
320             serverBootstrap.group(bossGroup, workerGroup)
321                     .channel(NioServerSocketChannel.class)
322                     .option(ChannelOption.SO_BACKLOG, 100)
323                     .handler(new LoggingHandler(LogLevel.INFO))
324                     .childHandler(new ChannelInitializer<SocketChannel>() {
325                         @Override
326                         public void initChannel(final SocketChannel channel) throws Exception {
327                             LOG.debug("New Passive channel created : {}", channel);
328                             if (certificateManagerSrv != null && certificateManagerSrv.getServerContext() != null) {
329                                 /* Add SSL handler first if SSL context is provided */
330                                 SSLContext sslContext = certificateManagerSrv.getServerContext();
331                                 SSLEngine engine = sslContext.createSSLEngine();
332                                 engine.setUseClientMode(false); // work in a server mode
333                                 engine.setNeedClientAuth(true); // need client authentication
334                                 if (protocols != null && protocols.length > 0) {
335                                     //Set supported protocols
336                                     engine.setEnabledProtocols(protocols);
337                                     LOG.debug("Supported ssl protocols {}",
338                                             Arrays.toString(engine.getSupportedProtocols()));
339                                     LOG.debug("Enabled ssl protocols {}",
340                                             Arrays.toString(engine.getEnabledProtocols()));
341                                 }
342                                 if (cipherSuites != null && cipherSuites.length > 0) {
343                                     //Set supported cipher suites
344                                     engine.setEnabledCipherSuites(cipherSuites);
345                                     LOG.debug("Enabled cipher suites {}",
346                                             Arrays.toString(engine.getEnabledCipherSuites()));
347                                 }
348                                 channel.pipeline().addLast("ssl", new SslHandler(engine));
349                             }
350
351                             channel.pipeline().addLast(
352                                  new JsonRpcDecoder(jsonRpcDecoderMaxFrameLength),
353                                  UTF8_ENCODER,
354                                  new IdleStateHandler(IDLE_READER_TIMEOUT, 0, 0),
355                                  new ReadTimeoutHandler(READ_TIMEOUT),
356                                  new ExceptionHandler(OvsdbConnectionService.this));
357
358                             handleNewPassiveConnection(channel);
359                         }
360                     });
361             serverBootstrap.option(ChannelOption.TCP_NODELAY, true);
362             serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR,
363                     new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
364             // Start the server.
365             ChannelFuture channelFuture = serverBootstrap.bind(ip, port).sync();
366             Channel serverListenChannel = channelFuture.channel();
367             serverChannel = serverListenChannel;
368             // Wait until the server socket is closed.
369             serverListenChannel.closeFuture().sync();
370         } catch (InterruptedException e) {
371             LOG.error("Thread interrupted", e);
372         } catch (Throwable throwable) {
373             // sync() re-throws exceptions declared as Throwable, so the compiler doesn't see them
374             LOG.error("Error while binding to address {}, port {}", ip, port, throwable);
375             throw throwable;
376         } finally {
377             // Shut down all event loops to terminate all threads.
378             bossGroup.shutdownGracefully();
379             workerGroup.shutdownGracefully();
380         }
381     }
382
383     private static void handleNewPassiveConnection(final OvsdbClient client) {
384         ListenableFuture<List<String>> echoFuture = client.echo();
385         LOG.debug("Send echo message to probe the OVSDB switch {}",client.getConnectionInfo());
386         Futures.addCallback(echoFuture, new FutureCallback<List<String>>() {
387             @Override
388             public void onSuccess(@Nullable final List<String> result) {
389                 LOG.debug("Probe was successful to OVSDB switch {}",client.getConnectionInfo());
390                 List<OvsdbClient> clientsFromSameNode = getPassiveClientsFromSameNode(client);
391                 if (clientsFromSameNode.size() == 0) {
392                     notifyListenerForPassiveConnection(client);
393                 } else {
394                     STALE_PASSIVE_CONNECTION_SERVICE.handleNewPassiveConnection(client, clientsFromSameNode);
395                 }
396             }
397
398             @Override
399             public void onFailure(final Throwable failureException) {
400                 LOG.error("Probe failed to OVSDB switch. Disconnecting the channel {}", client.getConnectionInfo());
401                 client.disconnect();
402             }
403         }, CONNECTION_NOTIFIER_SERVICE);
404     }
405
406     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
407             justification = "https://github.com/spotbugs/spotbugs/issues/811")
408     private static void handleNewPassiveConnection(final Channel channel) {
409         if (!channel.isOpen()) {
410             LOG.warn("Channel {} is not open, skipped further processing of the connection.",channel);
411             return;
412         }
413         SslHandler sslHandler = (SslHandler) channel.pipeline().get("ssl");
414         if (sslHandler != null) {
415             class HandleNewPassiveSslRunner implements Runnable {
416                 private int retryTimes = 3;
417
418                 private void retry() {
419                     if (retryTimes > 0) {
420                         EXECUTOR_SERVICE.schedule(this,  RETRY_PERIOD, TimeUnit.MILLISECONDS);
421                     } else {
422                         LOG.debug("channel closed {}", channel);
423                         channel.disconnect();
424                     }
425                     retryTimes--;
426                 }
427
428                 @Override
429                 public void run() {
430                     HandshakeStatus status = sslHandler.engine().getHandshakeStatus();
431                     LOG.debug("Handshake status {}", status);
432                     switch (status) {
433                         case FINISHED:
434                         case NOT_HANDSHAKING:
435                             if (sslHandler.engine().getSession().getCipherSuite()
436                                     .equals("SSL_NULL_WITH_NULL_NULL")) {
437                                 // Not begin handshake yet. Retry later.
438                                 LOG.debug("handshake not begin yet {}", status);
439                                 retry();
440                             } else {
441                               //Check if peer is trusted before notifying listeners
442                                 try {
443                                     sslHandler.engine().getSession().getPeerCertificates();
444                                     //Handshake done. Notify listener.
445                                     OvsdbClient client = getChannelClient(channel, ConnectionType.PASSIVE,
446                                         SocketConnectionType.SSL);
447                                     handleNewPassiveConnection(client);
448                                 } catch (SSLPeerUnverifiedException e) {
449                                     //Trust manager is still checking peer certificate. Retry later
450                                     LOG.debug("Peer certifiacte is not verified yet {}", status);
451                                     retry();
452                                 }
453                             }
454                             break;
455
456                         case NEED_UNWRAP:
457                         case NEED_TASK:
458                             //Handshake still ongoing. Retry later.
459                             LOG.debug("handshake not done yet {}", status);
460                             retry();
461                             break;
462
463                         case NEED_WRAP:
464                             if (sslHandler.engine().getSession().getCipherSuite()
465                                     .equals("SSL_NULL_WITH_NULL_NULL")) {
466                                 /* peer not authenticated. No need to notify listener in this case. */
467                                 LOG.error("Ssl handshake fail. channel {}", channel);
468                                 channel.disconnect();
469                             } else {
470                                 /*
471                                  * peer is authenticated. Give some time to wait for completion.
472                                  * If status is still NEED_WRAP, client might already disconnect.
473                                  * This happens when the first time client connects to controller in two-way handshake.
474                                  * After obtaining controller certificate, client will disconnect and start
475                                  * new connection with controller certificate it obtained.
476                                  * In this case no need to do anything for the first connection attempt. Just skip
477                                  * since client will reconnect later.
478                                  */
479                                 LOG.debug("handshake not done yet {}", status);
480                                 retry();
481                             }
482                             break;
483
484                         default:
485                             LOG.error("unknown hadshake status {}", status);
486                     }
487                 }
488             }
489
490             EXECUTOR_SERVICE.schedule(new HandleNewPassiveSslRunner(),
491                     RETRY_PERIOD, TimeUnit.MILLISECONDS);
492         } else {
493             EXECUTOR_SERVICE.execute(() -> {
494                 OvsdbClient client = getChannelClient(channel, ConnectionType.PASSIVE,
495                     SocketConnectionType.NON_SSL);
496                 handleNewPassiveConnection(client);
497             });
498         }
499     }
500
501     public static void channelClosed(final OvsdbClient client) {
502         LOG.info("Connection closed {}", client.getConnectionInfo());
503         CONNECTIONS.remove(client);
504         if (client.isConnectionPublished()) {
505             for (OvsdbConnectionListener listener : CONNECTION_LISTENERS) {
506                 listener.disconnected(client);
507             }
508         }
509         STALE_PASSIVE_CONNECTION_SERVICE.clientDisconnected(client);
510     }
511
512     @Override
513     public Collection<OvsdbClient> getConnections() {
514         return CONNECTIONS.keySet();
515     }
516
517     @Override
518     public void close() throws Exception {
519         LOG.info("OvsdbConnectionService closed");
520         JsonRpcEndpoint.close();
521     }
522
523     @Override
524     public OvsdbClient getClient(final Channel channel) {
525         for (Entry<OvsdbClient, Channel> entry : CONNECTIONS.entrySet()) {
526             OvsdbClient client = entry.getKey();
527             Channel ctx = entry.getValue();
528             if (ctx.equals(channel)) {
529                 return client;
530             }
531         }
532         return null;
533     }
534
535     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
536             justification = "https://github.com/spotbugs/spotbugs/issues/811")
537     private static List<OvsdbClient> getPassiveClientsFromSameNode(final OvsdbClient ovsdbClient) {
538         List<OvsdbClient> passiveClients = new ArrayList<>();
539         for (OvsdbClient client : CONNECTIONS.keySet()) {
540             if (!client.equals(ovsdbClient)
541                     && client.getConnectionInfo().getRemoteAddress()
542                             .equals(ovsdbClient.getConnectionInfo().getRemoteAddress())
543                     && client.getConnectionInfo().getType() == ConnectionType.PASSIVE) {
544                 passiveClients.add(client);
545             }
546         }
547         return passiveClients;
548     }
549
550     public static void notifyListenerForPassiveConnection(final OvsdbClient client) {
551         client.setConnectionPublished(true);
552         for (final OvsdbConnectionListener listener : CONNECTION_LISTENERS) {
553             CONNECTION_NOTIFIER_SERVICE.execute(() -> {
554                 LOG.trace("Connection {} notified to listener {}", client.getConnectionInfo(), listener);
555                 listener.connected(client);
556             });
557         }
558     }
559
560     public void setOvsdbRpcTaskTimeout(final int timeout) {
561         JsonRpcEndpoint.setReaperInterval(timeout);
562     }
563
564     /**
565      * Set useSSL flag.
566      *
567      * @param flag boolean for using ssl
568      */
569     public void setUseSsl(final boolean flag) {
570         useSSL = flag;
571     }
572
573     /**
574      * Blueprint property setter method. Blueprint call this method and set the value of json rpc decoder
575      * max frame length to the value configured for config option (json-rpc-decoder-max-frame-length) in
576      * the configuration file. This option is only configured at the  boot time of the controller. Any
577      * change at the run time will have no impact.
578      * @param maxFrameLength Max frame length (default : 100000)
579      */
580     public void setJsonRpcDecoderMaxFrameLength(final int maxFrameLength) {
581         jsonRpcDecoderMaxFrameLength = maxFrameLength;
582         LOG.info("Json Rpc Decoder Max Frame Length set to : {}", jsonRpcDecoderMaxFrameLength);
583     }
584
585     public void setOvsdbListenerIp(final String ip) {
586         LOG.info("OVSDB IP for listening connection is set to : {}", ip);
587         listenerIp = ip;
588     }
589
590     public void setOvsdbListenerPort(final int portNumber) {
591         LOG.info("OVSDB port for listening connection is set to : {}", portNumber);
592         listenerPort = portNumber;
593     }
594
595     public void updateConfigParameter(final Map<String, Object> configParameters) {
596         if (configParameters != null && !configParameters.isEmpty()) {
597             LOG.debug("Config parameters received : {}", configParameters.entrySet());
598             for (Map.Entry<String, Object> paramEntry : configParameters.entrySet()) {
599                 if (paramEntry.getKey().equalsIgnoreCase(OVSDB_RPC_TASK_TIMEOUT_PARAM)) {
600                     setOvsdbRpcTaskTimeout(Integer.parseInt((String)paramEntry.getValue()));
601                 } else if (paramEntry.getKey().equalsIgnoreCase(USE_SSL)) {
602                     useSSL = Boolean.parseBoolean(paramEntry.getValue().toString());
603                 }
604             }
605         }
606     }
607 }