From f18f86a82f6fc46426e6f198bdf7625a17063878 Mon Sep 17 00:00:00 2001 From: Tomas Slusny Date: Tue, 24 May 2016 10:14:52 +0200 Subject: [PATCH] Bug 5924 - Reuse Threads using ThreadPool in SystemNotificationListenerImpl - Removed creating new thread every time switch was connected - Instead, used thread pool created in Bug 5925 and passed it from ConnectionManagerImpl to SystemNotificationListenerImpl Change-Id: I02f50a9720b659b910250feab08553e142acaf44 Signed-off-by: Tomas Slusny --- .../connection/ConnectionManagerImpl.java | 2 +- .../SystemNotificationsListenerImpl.java | 92 ++++++++++--------- .../SystemNotificationsListenerImplTest.java | 10 +- 3 files changed, 57 insertions(+), 47 deletions(-) diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/ConnectionManagerImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/ConnectionManagerImpl.java index 05bb534fed..6450e6d647 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/ConnectionManagerImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/ConnectionManagerImpl.java @@ -69,7 +69,7 @@ public class ConnectionManagerImpl implements ConnectionManager { new OpenflowProtocolListenerInitialImpl(connectionContext, handshakeContext); connectionAdapter.setMessageListener(ofMessageListener); - final SystemNotificationsListener systemListener = new SystemNotificationsListenerImpl(connectionContext, echoReplyTimeout); + final SystemNotificationsListener systemListener = new SystemNotificationsListenerImpl(connectionContext, echoReplyTimeout, threadPool); connectionAdapter.setSystemListener(systemListener); LOG.trace("connection ballet finished"); diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/listener/SystemNotificationsListenerImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/listener/SystemNotificationsListenerImpl.java index 1d99f8073f..ae169a021f 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/listener/SystemNotificationsListenerImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/listener/SystemNotificationsListenerImpl.java @@ -12,6 +12,7 @@ import com.google.common.base.Preconditions; import java.net.InetSocketAddress; import java.util.Date; import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; @@ -37,8 +38,12 @@ public class SystemNotificationsListenerImpl implements SystemNotificationsListe @VisibleForTesting static final long MAX_ECHO_REPLY_TIMEOUT = 2000; private final long echoReplyTimeout; + private final ThreadPoolExecutor threadPool; - public SystemNotificationsListenerImpl(@Nonnull final ConnectionContext connectionContext, long echoReplyTimeout) { + public SystemNotificationsListenerImpl(@Nonnull final ConnectionContext connectionContext, + long echoReplyTimeout, + @Nonnull final ThreadPoolExecutor threadPool) { + this.threadPool = threadPool; this.connectionContext = Preconditions.checkNotNull(connectionContext); this.echoReplyTimeout = echoReplyTimeout; } @@ -52,62 +57,59 @@ public class SystemNotificationsListenerImpl implements SystemNotificationsListe @Override public void onSwitchIdleEvent(final SwitchIdleEvent notification) { - new Thread(new Runnable() { - @Override - public void run() { - boolean shouldBeDisconnected = true; + threadPool.execute(() -> { + boolean shouldBeDisconnected = true; - final InetSocketAddress remoteAddress = connectionContext.getConnectionAdapter().getRemoteAddress(); + final InetSocketAddress remoteAddress = connectionContext.getConnectionAdapter().getRemoteAddress(); - if (ConnectionContext.CONNECTION_STATE.WORKING.equals(connectionContext.getConnectionState())) { - FeaturesReply features = connectionContext.getFeatures(); - LOG.info("Switch Idle state occurred, node={}|auxId={}", remoteAddress, features.getAuxiliaryId()); - connectionContext.changeStateToTimeouting(); - EchoInputBuilder builder = new EchoInputBuilder(); - builder.setVersion(features.getVersion()); - Xid xid = new Xid(0L); - builder.setXid(xid.getValue()); + if (ConnectionContext.CONNECTION_STATE.WORKING.equals(connectionContext.getConnectionState())) { + FeaturesReply features = connectionContext.getFeatures(); + LOG.info("Switch Idle state occurred, node={}|auxId={}", remoteAddress, features.getAuxiliaryId()); + connectionContext.changeStateToTimeouting(); + EchoInputBuilder builder = new EchoInputBuilder(); + builder.setVersion(features.getVersion()); + Xid xid = new Xid(0L); + builder.setXid(xid.getValue()); - Future> echoReplyFuture = connectionContext.getConnectionAdapter().echo(builder.build()); + Future> echoReplyFuture = connectionContext.getConnectionAdapter().echo(builder.build()); - try { - RpcResult echoReplyValue = echoReplyFuture.get(echoReplyTimeout, TimeUnit.MILLISECONDS); - if (echoReplyValue.isSuccessful()) { - connectionContext.changeStateToWorking(); - shouldBeDisconnected = false; - } else { - for (RpcError replyError : echoReplyValue.getErrors()) { - Throwable cause = replyError.getCause(); - if (LOG.isWarnEnabled()) { - LOG.warn("Received EchoReply from [{}] in TIMEOUTING state, Error:{}", remoteAddress, cause.getMessage()); - } - - if (LOG.isTraceEnabled()) { - LOG.trace("Received EchoReply from [{}] in TIMEOUTING state, Error:{}", remoteAddress, cause); - } + try { + RpcResult echoReplyValue = echoReplyFuture.get(echoReplyTimeout, TimeUnit.MILLISECONDS); + if (echoReplyValue.isSuccessful()) { + connectionContext.changeStateToWorking(); + shouldBeDisconnected = false; + } else { + for (RpcError replyError : echoReplyValue.getErrors()) { + Throwable cause = replyError.getCause(); + if (LOG.isWarnEnabled()) { + LOG.warn("Received EchoReply from [{}] in TIMEOUTING state, Error:{}", remoteAddress, cause.getMessage()); + } + if (LOG.isTraceEnabled()) { + LOG.trace("Received EchoReply from [{}] in TIMEOUTING state, Error:{}", remoteAddress, cause); } - } - } catch (Exception e) { - if (LOG.isWarnEnabled()) { - LOG.warn("Exception while waiting for echoReply from [{}] in TIMEOUTING state: {}", remoteAddress, e.getMessage()); - } - if (LOG.isTraceEnabled()) { - LOG.trace("Exception while waiting for echoReply from [{}] in TIMEOUTING state: {}", remoteAddress, e); } - } - } - if (shouldBeDisconnected) { - if (LOG.isInfoEnabled()) { - LOG.info("ConnectionEvent:Closing connection as device is idle. Echo sent at {}. Device:{}, NodeId:{}", - new Date(System.currentTimeMillis() - echoReplyTimeout), remoteAddress, connectionContext.getNodeId()); + } catch (Exception e) { + if (LOG.isWarnEnabled()) { + LOG.warn("Exception while waiting for echoReply from [{}] in TIMEOUTING state: {}", remoteAddress, e.getMessage()); + } + + if (LOG.isTraceEnabled()) { + LOG.trace("Exception while waiting for echoReply from [{}] in TIMEOUTING state: {}", remoteAddress, e); } - connectionContext.closeConnection(true); } } - }).start(); + if (shouldBeDisconnected) { + if (LOG.isInfoEnabled()) { + LOG.info("ConnectionEvent:Closing connection as device is idle. Echo sent at {}. Device:{}, NodeId:{}", + new Date(System.currentTimeMillis() - echoReplyTimeout), remoteAddress, connectionContext.getNodeId()); + } + + connectionContext.closeConnection(true); + } + }); } } diff --git a/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/connection/listener/SystemNotificationsListenerImplTest.java b/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/connection/listener/SystemNotificationsListenerImplTest.java index a361d61ac9..3b051fc25e 100644 --- a/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/connection/listener/SystemNotificationsListenerImplTest.java +++ b/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/connection/listener/SystemNotificationsListenerImplTest.java @@ -11,6 +11,9 @@ package org.opendaylight.openflowplugin.impl.connection.listener; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; import java.net.InetSocketAddress; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; + import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -21,6 +24,7 @@ import org.mockito.Mockito; import org.mockito.runners.MockitoJUnitRunner; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; import org.opendaylight.openflowplugin.impl.connection.ConnectionContextImpl; +import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput; @@ -66,7 +70,11 @@ public class SystemNotificationsListenerImplTest { Mockito.when(connectionContext.getConnectionAdapter()).thenReturn(connectionAdapter); Mockito.when(connectionContext.getFeatures()).thenReturn(features); - systemNotificationsListener = new SystemNotificationsListenerImpl(connectionContext, ECHO_REPLY_TIMEOUT); + final ThreadPoolLoggingExecutor threadPool = new ThreadPoolLoggingExecutor(0, Integer.MAX_VALUE, + 60L, TimeUnit.SECONDS, + new SynchronousQueue<>(), "opfpool"); + + systemNotificationsListener = new SystemNotificationsListenerImpl(connectionContext, ECHO_REPLY_TIMEOUT, threadPool); } @After -- 2.36.6