Remove use of ThreadPools in sal-netconf-connector 54/76554/2
authorRobert Varga <robert.varga@pantheon.tech>
Mon, 1 Oct 2018 16:20:15 +0000 (18:20 +0200)
committerRobert Varga <nite@hq.sk>
Mon, 1 Oct 2018 22:36:34 +0000 (22:36 +0000)
We really just need a {Listening,Scheduled}ExecutorService, nothing
else. Propagating this outwords from NetconfDevice(Builder), eliminates
the need for per-session ListeningDecorator and proxies, saving us
at least 56 bytes for each connected device.

Change-Id: Ide69bd1a8f36eca1473313e3fd2cafb76d754464
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit ec116112a7ac49a718ca81a21484201dbfa1c836)

netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfTopologyManager.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/RemoteDeviceConnectorImpl.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/utils/NetconfTopologySetup.java
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfTopologyManagerTest.java
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/RemoteDeviceConnectorImplTest.java
netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/AbstractNetconfTopology.java
netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/NetconfTopologyImpl.java
netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/NetconfDevice.java
netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/NetconfDeviceBuilder.java
netconf/sal-netconf-connector/src/test/java/org/opendaylight/netconf/sal/connect/netconf/NetconfDeviceTest.java

index a71f7cb23f832a284c72b14867a6672691855a37..9dd0a22da18dd17fa1d17f7d1116d5bdec9923ff 100644 (file)
@@ -5,7 +5,6 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.netconf.topology.singleton.impl;
 
 import akka.actor.ActorSystem;
@@ -13,11 +12,13 @@ import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import io.netty.util.concurrent.EventExecutor;
 import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
 import org.opendaylight.aaa.encrypt.AAAEncryptionService;
@@ -70,8 +71,8 @@ public class NetconfTopologyManager
     private final DataBroker dataBroker;
     private final RpcProviderRegistry rpcProviderRegistry;
     private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
-    private final ScheduledThreadPool keepaliveExecutor;
-    private final ThreadPool processingExecutor;
+    private final ScheduledExecutorService keepaliveExecutor;
+    private final ListeningExecutorService processingExecutor;
     private final ActorSystem actorSystem;
     private final EventExecutor eventExecutor;
     private final NetconfClientDispatcher clientDispatcher;
@@ -95,8 +96,8 @@ public class NetconfTopologyManager
         this.dataBroker = Preconditions.checkNotNull(dataBroker);
         this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
         this.clusterSingletonServiceProvider = Preconditions.checkNotNull(clusterSingletonServiceProvider);
-        this.keepaliveExecutor = Preconditions.checkNotNull(keepaliveExecutor);
-        this.processingExecutor = Preconditions.checkNotNull(processingExecutor);
+        this.keepaliveExecutor = keepaliveExecutor.getExecutor();
+        this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor.getExecutor());
         this.actorSystem = Preconditions.checkNotNull(actorSystemProvider).getActorSystem();
         this.eventExecutor = Preconditions.checkNotNull(eventExecutor);
         this.clientDispatcher = Preconditions.checkNotNull(clientDispatcher);
@@ -197,8 +198,8 @@ public class NetconfTopologyManager
     }
 
     @VisibleForTesting
-    protected NetconfTopologyContext newNetconfTopologyContext(NetconfTopologySetup setup,
-            ServiceGroupIdentifier serviceGroupIdent, Timeout actorResponseWaitTime) {
+    protected NetconfTopologyContext newNetconfTopologyContext(final NetconfTopologySetup setup,
+            final ServiceGroupIdentifier serviceGroupIdent, final Timeout actorResponseWaitTime) {
         return new NetconfTopologyContext(setup, serviceGroupIdent, actorResponseWaitTime, mountPointService);
     }
 
@@ -217,7 +218,7 @@ public class NetconfTopologyManager
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
-    private static void close(AutoCloseable closeable) {
+    private static void close(final AutoCloseable closeable) {
         try {
             closeable.close();
         } catch (Exception e) {
index 68366c115cf3c82e51ca5958c2345266d27500c8..78820d5c7e82ce23fadc39142f0d96f453ef19df 100644 (file)
@@ -159,7 +159,7 @@ public class RemoteDeviceConnectorImpl implements RemoteDeviceConnector {
         if (keepaliveDelay > 0) {
             LOG.info("{}: Adding keepalive facade.", remoteDeviceId);
             salFacade = new KeepaliveSalFacade(remoteDeviceId, salFacade,
-                    netconfTopologyDeviceSetup.getKeepaliveExecutor().getExecutor(), keepaliveDelay,
+                    netconfTopologyDeviceSetup.getKeepaliveExecutor(), keepaliveDelay,
                     defaultRequestTimeoutMillis);
         }
 
@@ -200,7 +200,7 @@ public class RemoteDeviceConnectorImpl implements RemoteDeviceConnector {
             device = new NetconfDeviceBuilder()
                     .setReconnectOnSchemasChange(reconnectOnChangedSchema)
                     .setSchemaResourcesDTO(schemaResourcesDTO)
-                    .setGlobalProcessingExecutor(netconfTopologyDeviceSetup.getProcessingExecutor().getExecutor())
+                    .setGlobalProcessingExecutor(netconfTopologyDeviceSetup.getProcessingExecutor())
                     .setId(remoteDeviceId)
                     .setSalFacade(salFacade)
                     .build();
index 65daa1544e28edddb8fe7066a2b85f47ff2bf812..e821f569088811854b5429ee2fa780a4560c976c 100644 (file)
@@ -5,14 +5,13 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.netconf.topology.singleton.impl.utils;
 
 import akka.actor.ActorSystem;
+import com.google.common.util.concurrent.ListeningExecutorService;
 import io.netty.util.concurrent.EventExecutor;
+import java.util.concurrent.ScheduledExecutorService;
 import org.opendaylight.aaa.encrypt.AAAEncryptionService;
-import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
-import org.opendaylight.controller.config.threadpool.ThreadPool;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
@@ -29,8 +28,8 @@ public class NetconfTopologySetup {
     private final DataBroker dataBroker;
     private final InstanceIdentifier<Node> instanceIdentifier;
     private final Node node;
-    private final ScheduledThreadPool keepaliveExecutor;
-    private final ThreadPool processingExecutor;
+    private final ScheduledExecutorService keepaliveExecutor;
+    private final ListeningExecutorService processingExecutor;
     private final ActorSystem actorSystem;
     private final EventExecutor eventExecutor;
     private final NetconfClientDispatcher netconfClientDispatcher;
@@ -80,11 +79,11 @@ public class NetconfTopologySetup {
         return node;
     }
 
-    public ThreadPool getProcessingExecutor() {
+    public ListeningExecutorService getProcessingExecutor() {
         return processingExecutor;
     }
 
-    public ScheduledThreadPool getKeepaliveExecutor() {
+    public ScheduledExecutorService getKeepaliveExecutor() {
         return keepaliveExecutor;
     }
 
@@ -131,8 +130,8 @@ public class NetconfTopologySetup {
         private DataBroker dataBroker;
         private InstanceIdentifier<Node> instanceIdentifier;
         private Node node;
-        private ScheduledThreadPool keepaliveExecutor;
-        private ThreadPool processingExecutor;
+        private ScheduledExecutorService keepaliveExecutor;
+        private ListeningExecutorService processingExecutor;
         private ActorSystem actorSystem;
         private EventExecutor eventExecutor;
         private String topologyId;
@@ -196,20 +195,20 @@ public class NetconfTopologySetup {
             return new NetconfTopologySetup(this);
         }
 
-        private ScheduledThreadPool getKeepaliveExecutor() {
+        private ScheduledExecutorService getKeepaliveExecutor() {
             return keepaliveExecutor;
         }
 
-        public NetconfTopologySetupBuilder setKeepaliveExecutor(final ScheduledThreadPool keepaliveExecutor) {
+        public NetconfTopologySetupBuilder setKeepaliveExecutor(final ScheduledExecutorService keepaliveExecutor) {
             this.keepaliveExecutor = keepaliveExecutor;
             return this;
         }
 
-        private ThreadPool getProcessingExecutor() {
+        private ListeningExecutorService getProcessingExecutor() {
             return processingExecutor;
         }
 
-        public NetconfTopologySetupBuilder setProcessingExecutor(final ThreadPool processingExecutor) {
+        public NetconfTopologySetupBuilder setProcessingExecutor(final ListeningExecutorService processingExecutor) {
             this.processingExecutor = processingExecutor;
             return this;
         }
@@ -269,7 +268,7 @@ public class NetconfTopologySetup {
             return idleTimeout;
         }
 
-        public NetconfTopologySetupBuilder setPrivateKeyPath(String privateKeyPath) {
+        public NetconfTopologySetupBuilder setPrivateKeyPath(final String privateKeyPath) {
             this.privateKeyPath = privateKeyPath;
             return this;
         }
@@ -278,7 +277,7 @@ public class NetconfTopologySetup {
             return this.privateKeyPath;
         }
 
-        public NetconfTopologySetupBuilder setPrivateKeyPassphrase(String privateKeyPassphrase) {
+        public NetconfTopologySetupBuilder setPrivateKeyPassphrase(final String privateKeyPassphrase) {
             this.privateKeyPassphrase = privateKeyPassphrase;
             return this;
         }
index d2c3638f696605c9ada1df859efbf9772e99faf2..34523ac453851e83ead492acd57d878f7dc4aa67 100644 (file)
@@ -32,6 +32,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import javax.annotation.Nonnull;
@@ -111,7 +112,9 @@ public class NetconfTopologyManagerTest {
 
         final RpcProviderRegistry rpcProviderRegistry = mock(RpcProviderRegistry.class);
         final ScheduledThreadPool keepaliveExecutor = mock(ScheduledThreadPool.class);
-        final ThreadPool processingExecutor = mock(ThreadPool.class);
+        final ThreadPool processingThreadPool = mock(ThreadPool.class);
+        final ExecutorService processingService = mock(ExecutorService.class);
+        doReturn(processingService).when(processingThreadPool).getExecutor();
         final ActorSystemProvider actorSystemProvider = mock(ActorSystemProvider.class);
         final EventExecutor eventExecutor = mock(EventExecutor.class);
         final NetconfClientDispatcher clientDispatcher = mock(NetconfClientDispatcher.class);
@@ -120,12 +123,12 @@ public class NetconfTopologyManagerTest {
 
         final Config config = new ConfigBuilder().setWriteTransactionIdleTimeout(0).build();
         netconfTopologyManager = new NetconfTopologyManager(dataBroker, rpcProviderRegistry,
-                clusterSingletonServiceProvider, keepaliveExecutor, processingExecutor,
+                clusterSingletonServiceProvider, keepaliveExecutor, processingThreadPool,
                 actorSystemProvider, eventExecutor, clientDispatcher, TOPOLOGY_ID, config,
                 mountPointService, encryptionService) {
             @Override
-            protected NetconfTopologyContext newNetconfTopologyContext(NetconfTopologySetup setup,
-                    ServiceGroupIdentifier serviceGroupIdent, Timeout actorResponseWaitTime) {
+            protected NetconfTopologyContext newNetconfTopologyContext(final NetconfTopologySetup setup,
+                    final ServiceGroupIdentifier serviceGroupIdent, final Timeout actorResponseWaitTime) {
                 assertEquals(ACTOR_RESPONSE_WAIT_TIME, actorResponseWaitTime.duration().toSeconds());
                 return Objects.requireNonNull(mockContextMap.get(setup.getInstanceIdentifier()),
                         "No mock context for " + setup.getInstanceIdentifier()).apply(setup);
index ec825d729aca86aea028ba4e06470280c78eb834..5144ace86ab32494e16f5200e76c88a2407bf85e 100644 (file)
@@ -24,14 +24,13 @@ import static org.opendaylight.mdsal.common.api.CommitInfo.emptyFluentFuture;
 import akka.actor.ActorSystem;
 import akka.util.Timeout;
 import com.google.common.net.InetAddresses;
+import com.google.common.util.concurrent.ListeningExecutorService;
 import io.netty.util.concurrent.EventExecutor;
 import java.net.InetSocketAddress;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
-import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
-import org.opendaylight.controller.config.threadpool.ThreadPool;
 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
@@ -81,10 +80,10 @@ public class RemoteDeviceConnectorImplTest {
     private ClusterSingletonServiceProvider clusterSingletonServiceProvider;
 
     @Mock
-    private ScheduledThreadPool keepaliveExecutor;
+    private ScheduledExecutorService keepaliveExecutor;
 
     @Mock
-    private ThreadPool processingExecutor;
+    private ListeningExecutorService processingExecutor;
 
     @Mock
     private ActorSystem actorSystem;
@@ -169,9 +168,6 @@ public class RemoteDeviceConnectorImplTest {
     @SuppressWarnings("unchecked")
     @Test
     public void testKeapAliveFacade() {
-        final ExecutorService executorService = mock(ExecutorService.class);
-        doReturn(executorService).when(processingExecutor).getExecutor();
-
         final Credentials credentials = new LoginPasswordBuilder()
                 .setPassword("admin").setUsername("admin").build();
         final NetconfNode netconfNode = new NetconfNodeBuilder()
index 818a3895dc0fe2944d3c35a1cfaf47f7393c6922..9b057f06195202b444e9f2b19dfdbbbf091564cc 100644 (file)
@@ -16,6 +16,7 @@ import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
 import io.netty.handler.ssl.SslHandler;
@@ -206,7 +207,7 @@ public abstract class AbstractNetconfTopology implements NetconfTopology {
     private final DeviceActionFactory deviceActionFactory;
     private final NetconfKeystoreAdapter keystoreAdapter;
     protected final ScheduledThreadPool keepaliveExecutor;
-    protected final ThreadPool processingExecutor;
+    protected final ListeningExecutorService processingExecutor;
     protected final SharedSchemaRepository sharedSchemaRepository;
     protected final DataBroker dataBroker;
     protected final DOMMountPointService mountPointService;
@@ -230,7 +231,7 @@ public abstract class AbstractNetconfTopology implements NetconfTopology {
         this.clientDispatcher = clientDispatcher;
         this.eventExecutor = eventExecutor;
         this.keepaliveExecutor = keepaliveExecutor;
-        this.processingExecutor = processingExecutor;
+        this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor.getExecutor());
         this.deviceActionFactory = deviceActionFactory;
         this.sharedSchemaRepository = schemaRepositoryProvider.getSharedSchemaRepository();
         this.dataBroker = dataBroker;
@@ -363,7 +364,7 @@ public abstract class AbstractNetconfTopology implements NetconfTopology {
             NetconfDeviceBuilder netconfDeviceBuilder = new NetconfDeviceBuilder()
                     .setReconnectOnSchemasChange(reconnectOnChangedSchema)
                     .setSchemaResourcesDTO(schemaResourcesDTO)
-                    .setGlobalProcessingExecutor(this.processingExecutor.getExecutor())
+                    .setGlobalProcessingExecutor(this.processingExecutor)
                     .setId(remoteDeviceId)
                     .setSalFacade(salFacade);
             if (this.deviceActionFactory != null) {
index ebfd283e762da1bc3de2ef3aaf83a0e4b3e3a07b..4a27320634cef58e6fcd3647d6a06e7d94b1e81a 100644 (file)
@@ -5,7 +5,6 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.netconf.topology.impl;
 
 import com.google.common.util.concurrent.FutureCallback;
index 993b1230bda2f51e48598ec8de68cc7dcb2a2fcc..30b5e94fc18e2c2102119fc91221af1a5fea8cc8 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.netconf.sal.connect.netconf;
 
+import static java.util.Objects.requireNonNull;
+
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicates;
@@ -27,7 +29,6 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
 import javax.annotation.concurrent.GuardedBy;
@@ -118,13 +119,14 @@ public class NetconfDevice
 
     public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id,
                          final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
-                         final ExecutorService globalProcessingExecutor, final boolean reconnectOnSchemasChange) {
+                         final ListeningExecutorService globalProcessingExecutor,
+                         final boolean reconnectOnSchemasChange) {
         this(schemaResourcesDTO, id, salFacade, globalProcessingExecutor, reconnectOnSchemasChange, null);
     }
 
     public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id,
             final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
-            final ExecutorService globalProcessingExecutor, final boolean reconnectOnSchemasChange,
+            final ListeningExecutorService globalProcessingExecutor, final boolean reconnectOnSchemasChange,
             final DeviceActionFactory deviceActionFactory) {
         this.id = id;
         this.reconnectOnSchemasChange = reconnectOnSchemasChange;
@@ -134,7 +136,7 @@ public class NetconfDevice
         this.schemaContextFactory = schemaResourcesDTO.getSchemaContextFactory();
         this.salFacade = salFacade;
         this.stateSchemasResolver = schemaResourcesDTO.getStateSchemasResolver();
-        this.processingExecutor = MoreExecutors.listeningDecorator(globalProcessingExecutor);
+        this.processingExecutor = requireNonNull(globalProcessingExecutor);
         this.notificationHandler = new NotificationHandler(salFacade, id);
     }
 
index 199f0c09f768ec5b952ef2d6d987e02ae7adb450..50a1e9ade98a7ae9a7c74861f25fd636692b3fff 100644 (file)
@@ -9,7 +9,7 @@
 package org.opendaylight.netconf.sal.connect.netconf;
 
 import com.google.common.base.Preconditions;
-import java.util.concurrent.ExecutorService;
+import com.google.common.util.concurrent.ListeningExecutorService;
 import org.opendaylight.netconf.sal.connect.api.DeviceActionFactory;
 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
@@ -21,38 +21,38 @@ public class NetconfDeviceBuilder {
     private NetconfDevice.SchemaResourcesDTO schemaResourcesDTO;
     private RemoteDeviceId id;
     private RemoteDeviceHandler<NetconfSessionPreferences> salFacade;
-    private ExecutorService globalProcessingExecutor;
+    private ListeningExecutorService globalProcessingExecutor;
     private DeviceActionFactory deviceActionFactory;
 
     public NetconfDeviceBuilder() {
     }
 
-    public NetconfDeviceBuilder setReconnectOnSchemasChange(boolean reconnectOnSchemasChange) {
+    public NetconfDeviceBuilder setReconnectOnSchemasChange(final boolean reconnectOnSchemasChange) {
         this.reconnectOnSchemasChange = reconnectOnSchemasChange;
         return this;
     }
 
-    public NetconfDeviceBuilder setId(RemoteDeviceId id) {
+    public NetconfDeviceBuilder setId(final RemoteDeviceId id) {
         this.id = id;
         return this;
     }
 
-    public NetconfDeviceBuilder setSchemaResourcesDTO(NetconfDevice.SchemaResourcesDTO schemaResourcesDTO) {
+    public NetconfDeviceBuilder setSchemaResourcesDTO(final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO) {
         this.schemaResourcesDTO = schemaResourcesDTO;
         return this;
     }
 
-    public NetconfDeviceBuilder setSalFacade(RemoteDeviceHandler<NetconfSessionPreferences> salFacade) {
+    public NetconfDeviceBuilder setSalFacade(final RemoteDeviceHandler<NetconfSessionPreferences> salFacade) {
         this.salFacade = salFacade;
         return this;
     }
 
-    public NetconfDeviceBuilder setGlobalProcessingExecutor(ExecutorService globalProcessingExecutor) {
+    public NetconfDeviceBuilder setGlobalProcessingExecutor(final ListeningExecutorService globalProcessingExecutor) {
         this.globalProcessingExecutor = globalProcessingExecutor;
         return this;
     }
 
-    public NetconfDeviceBuilder setDeviceActionFactory(DeviceActionFactory deviceActionFactory) {
+    public NetconfDeviceBuilder setDeviceActionFactory(final DeviceActionFactory deviceActionFactory) {
         this.deviceActionFactory = deviceActionFactory;
         return this;
     }
index 0d3ca2dab49aeb8b7e6cfc0186d62e328749746f..a7e0de26dbd283ee8b2e429e1e8c4df493f9c9d1 100644 (file)
@@ -5,7 +5,6 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.netconf.sal.connect.netconf;
 
 import static org.junit.Assert.assertEquals;
@@ -26,6 +25,8 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -34,7 +35,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
@@ -451,8 +451,8 @@ public class NetconfDeviceTest {
         return new RemoteDeviceId("test-D", InetSocketAddress.createUnresolved("localhost", 22));
     }
 
-    public ExecutorService getExecutor() {
-        return Executors.newSingleThreadExecutor();
+    public ListeningExecutorService getExecutor() {
+        return MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
     }
 
     public MessageTransformer<NetconfMessage> getMessageTransformer() throws Exception {