* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.netconf.topology.singleton.impl;
import akka.actor.ActorSystem;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.util.concurrent.EventExecutor;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.opendaylight.aaa.encrypt.AAAEncryptionService;
private final DataBroker dataBroker;
private final RpcProviderRegistry rpcProviderRegistry;
private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
- private final ScheduledThreadPool keepaliveExecutor;
- private final ThreadPool processingExecutor;
+ private final ScheduledExecutorService keepaliveExecutor;
+ private final ListeningExecutorService processingExecutor;
private final ActorSystem actorSystem;
private final EventExecutor eventExecutor;
private final NetconfClientDispatcher clientDispatcher;
this.dataBroker = Preconditions.checkNotNull(dataBroker);
this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
this.clusterSingletonServiceProvider = Preconditions.checkNotNull(clusterSingletonServiceProvider);
- this.keepaliveExecutor = Preconditions.checkNotNull(keepaliveExecutor);
- this.processingExecutor = Preconditions.checkNotNull(processingExecutor);
+ this.keepaliveExecutor = keepaliveExecutor.getExecutor();
+ this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor.getExecutor());
this.actorSystem = Preconditions.checkNotNull(actorSystemProvider).getActorSystem();
this.eventExecutor = Preconditions.checkNotNull(eventExecutor);
this.clientDispatcher = Preconditions.checkNotNull(clientDispatcher);
}
@VisibleForTesting
- protected NetconfTopologyContext newNetconfTopologyContext(NetconfTopologySetup setup,
- ServiceGroupIdentifier serviceGroupIdent, Timeout actorResponseWaitTime) {
+ protected NetconfTopologyContext newNetconfTopologyContext(final NetconfTopologySetup setup,
+ final ServiceGroupIdentifier serviceGroupIdent, final Timeout actorResponseWaitTime) {
return new NetconfTopologyContext(setup, serviceGroupIdent, actorResponseWaitTime, mountPointService);
}
}
@SuppressWarnings("checkstyle:IllegalCatch")
- private static void close(AutoCloseable closeable) {
+ private static void close(final AutoCloseable closeable) {
try {
closeable.close();
} catch (Exception e) {
if (keepaliveDelay > 0) {
LOG.info("{}: Adding keepalive facade.", remoteDeviceId);
salFacade = new KeepaliveSalFacade(remoteDeviceId, salFacade,
- netconfTopologyDeviceSetup.getKeepaliveExecutor().getExecutor(), keepaliveDelay,
+ netconfTopologyDeviceSetup.getKeepaliveExecutor(), keepaliveDelay,
defaultRequestTimeoutMillis);
}
device = new NetconfDeviceBuilder()
.setReconnectOnSchemasChange(reconnectOnChangedSchema)
.setSchemaResourcesDTO(schemaResourcesDTO)
- .setGlobalProcessingExecutor(netconfTopologyDeviceSetup.getProcessingExecutor().getExecutor())
+ .setGlobalProcessingExecutor(netconfTopologyDeviceSetup.getProcessingExecutor())
.setId(remoteDeviceId)
.setSalFacade(salFacade)
.build();
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.netconf.topology.singleton.impl.utils;
import akka.actor.ActorSystem;
+import com.google.common.util.concurrent.ListeningExecutorService;
import io.netty.util.concurrent.EventExecutor;
+import java.util.concurrent.ScheduledExecutorService;
import org.opendaylight.aaa.encrypt.AAAEncryptionService;
-import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
-import org.opendaylight.controller.config.threadpool.ThreadPool;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
private final DataBroker dataBroker;
private final InstanceIdentifier<Node> instanceIdentifier;
private final Node node;
- private final ScheduledThreadPool keepaliveExecutor;
- private final ThreadPool processingExecutor;
+ private final ScheduledExecutorService keepaliveExecutor;
+ private final ListeningExecutorService processingExecutor;
private final ActorSystem actorSystem;
private final EventExecutor eventExecutor;
private final NetconfClientDispatcher netconfClientDispatcher;
return node;
}
- public ThreadPool getProcessingExecutor() {
+ public ListeningExecutorService getProcessingExecutor() {
return processingExecutor;
}
- public ScheduledThreadPool getKeepaliveExecutor() {
+ public ScheduledExecutorService getKeepaliveExecutor() {
return keepaliveExecutor;
}
private DataBroker dataBroker;
private InstanceIdentifier<Node> instanceIdentifier;
private Node node;
- private ScheduledThreadPool keepaliveExecutor;
- private ThreadPool processingExecutor;
+ private ScheduledExecutorService keepaliveExecutor;
+ private ListeningExecutorService processingExecutor;
private ActorSystem actorSystem;
private EventExecutor eventExecutor;
private String topologyId;
return new NetconfTopologySetup(this);
}
- private ScheduledThreadPool getKeepaliveExecutor() {
+ private ScheduledExecutorService getKeepaliveExecutor() {
return keepaliveExecutor;
}
- public NetconfTopologySetupBuilder setKeepaliveExecutor(final ScheduledThreadPool keepaliveExecutor) {
+ public NetconfTopologySetupBuilder setKeepaliveExecutor(final ScheduledExecutorService keepaliveExecutor) {
this.keepaliveExecutor = keepaliveExecutor;
return this;
}
- private ThreadPool getProcessingExecutor() {
+ private ListeningExecutorService getProcessingExecutor() {
return processingExecutor;
}
- public NetconfTopologySetupBuilder setProcessingExecutor(final ThreadPool processingExecutor) {
+ public NetconfTopologySetupBuilder setProcessingExecutor(final ListeningExecutorService processingExecutor) {
this.processingExecutor = processingExecutor;
return this;
}
return idleTimeout;
}
- public NetconfTopologySetupBuilder setPrivateKeyPath(String privateKeyPath) {
+ public NetconfTopologySetupBuilder setPrivateKeyPath(final String privateKeyPath) {
this.privateKeyPath = privateKeyPath;
return this;
}
return this.privateKeyPath;
}
- public NetconfTopologySetupBuilder setPrivateKeyPassphrase(String privateKeyPassphrase) {
+ public NetconfTopologySetupBuilder setPrivateKeyPassphrase(final String privateKeyPassphrase) {
this.privateKeyPassphrase = privateKeyPassphrase;
return this;
}
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.annotation.Nonnull;
final RpcProviderRegistry rpcProviderRegistry = mock(RpcProviderRegistry.class);
final ScheduledThreadPool keepaliveExecutor = mock(ScheduledThreadPool.class);
- final ThreadPool processingExecutor = mock(ThreadPool.class);
+ final ThreadPool processingThreadPool = mock(ThreadPool.class);
+ final ExecutorService processingService = mock(ExecutorService.class);
+ doReturn(processingService).when(processingThreadPool).getExecutor();
final ActorSystemProvider actorSystemProvider = mock(ActorSystemProvider.class);
final EventExecutor eventExecutor = mock(EventExecutor.class);
final NetconfClientDispatcher clientDispatcher = mock(NetconfClientDispatcher.class);
final Config config = new ConfigBuilder().setWriteTransactionIdleTimeout(0).build();
netconfTopologyManager = new NetconfTopologyManager(dataBroker, rpcProviderRegistry,
- clusterSingletonServiceProvider, keepaliveExecutor, processingExecutor,
+ clusterSingletonServiceProvider, keepaliveExecutor, processingThreadPool,
actorSystemProvider, eventExecutor, clientDispatcher, TOPOLOGY_ID, config,
mountPointService, encryptionService) {
@Override
- protected NetconfTopologyContext newNetconfTopologyContext(NetconfTopologySetup setup,
- ServiceGroupIdentifier serviceGroupIdent, Timeout actorResponseWaitTime) {
+ protected NetconfTopologyContext newNetconfTopologyContext(final NetconfTopologySetup setup,
+ final ServiceGroupIdentifier serviceGroupIdent, final Timeout actorResponseWaitTime) {
assertEquals(ACTOR_RESPONSE_WAIT_TIME, actorResponseWaitTime.duration().toSeconds());
return Objects.requireNonNull(mockContextMap.get(setup.getInstanceIdentifier()),
"No mock context for " + setup.getInstanceIdentifier()).apply(setup);
import akka.actor.ActorSystem;
import akka.util.Timeout;
import com.google.common.net.InetAddresses;
+import com.google.common.util.concurrent.ListeningExecutorService;
import io.netty.util.concurrent.EventExecutor;
import java.net.InetSocketAddress;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
-import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
-import org.opendaylight.controller.config.threadpool.ThreadPool;
import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
private ClusterSingletonServiceProvider clusterSingletonServiceProvider;
@Mock
- private ScheduledThreadPool keepaliveExecutor;
+ private ScheduledExecutorService keepaliveExecutor;
@Mock
- private ThreadPool processingExecutor;
+ private ListeningExecutorService processingExecutor;
@Mock
private ActorSystem actorSystem;
@SuppressWarnings("unchecked")
@Test
public void testKeapAliveFacade() {
- final ExecutorService executorService = mock(ExecutorService.class);
- doReturn(executorService).when(processingExecutor).getExecutor();
-
final Credentials credentials = new LoginPasswordBuilder()
.setPassword("admin").setUsername("admin").build();
final NetconfNode netconfNode = new NetconfNodeBuilder()
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import io.netty.handler.ssl.SslHandler;
private final DeviceActionFactory deviceActionFactory;
private final NetconfKeystoreAdapter keystoreAdapter;
protected final ScheduledThreadPool keepaliveExecutor;
- protected final ThreadPool processingExecutor;
+ protected final ListeningExecutorService processingExecutor;
protected final SharedSchemaRepository sharedSchemaRepository;
protected final DataBroker dataBroker;
protected final DOMMountPointService mountPointService;
this.clientDispatcher = clientDispatcher;
this.eventExecutor = eventExecutor;
this.keepaliveExecutor = keepaliveExecutor;
- this.processingExecutor = processingExecutor;
+ this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor.getExecutor());
this.deviceActionFactory = deviceActionFactory;
this.sharedSchemaRepository = schemaRepositoryProvider.getSharedSchemaRepository();
this.dataBroker = dataBroker;
NetconfDeviceBuilder netconfDeviceBuilder = new NetconfDeviceBuilder()
.setReconnectOnSchemasChange(reconnectOnChangedSchema)
.setSchemaResourcesDTO(schemaResourcesDTO)
- .setGlobalProcessingExecutor(this.processingExecutor.getExecutor())
+ .setGlobalProcessingExecutor(this.processingExecutor)
.setId(remoteDeviceId)
.setSalFacade(salFacade);
if (this.deviceActionFactory != null) {
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.netconf.topology.impl;
import com.google.common.util.concurrent.FutureCallback;
*/
package org.opendaylight.netconf.sal.connect.netconf;
+import static java.util.Objects.requireNonNull;
+
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id,
final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
- final ExecutorService globalProcessingExecutor, final boolean reconnectOnSchemasChange) {
+ final ListeningExecutorService globalProcessingExecutor,
+ final boolean reconnectOnSchemasChange) {
this(schemaResourcesDTO, id, salFacade, globalProcessingExecutor, reconnectOnSchemasChange, null);
}
public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id,
final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
- final ExecutorService globalProcessingExecutor, final boolean reconnectOnSchemasChange,
+ final ListeningExecutorService globalProcessingExecutor, final boolean reconnectOnSchemasChange,
final DeviceActionFactory deviceActionFactory) {
this.id = id;
this.reconnectOnSchemasChange = reconnectOnSchemasChange;
this.schemaContextFactory = schemaResourcesDTO.getSchemaContextFactory();
this.salFacade = salFacade;
this.stateSchemasResolver = schemaResourcesDTO.getStateSchemasResolver();
- this.processingExecutor = MoreExecutors.listeningDecorator(globalProcessingExecutor);
+ this.processingExecutor = requireNonNull(globalProcessingExecutor);
this.notificationHandler = new NotificationHandler(salFacade, id);
}
package org.opendaylight.netconf.sal.connect.netconf;
import com.google.common.base.Preconditions;
-import java.util.concurrent.ExecutorService;
+import com.google.common.util.concurrent.ListeningExecutorService;
import org.opendaylight.netconf.sal.connect.api.DeviceActionFactory;
import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
private NetconfDevice.SchemaResourcesDTO schemaResourcesDTO;
private RemoteDeviceId id;
private RemoteDeviceHandler<NetconfSessionPreferences> salFacade;
- private ExecutorService globalProcessingExecutor;
+ private ListeningExecutorService globalProcessingExecutor;
private DeviceActionFactory deviceActionFactory;
public NetconfDeviceBuilder() {
}
- public NetconfDeviceBuilder setReconnectOnSchemasChange(boolean reconnectOnSchemasChange) {
+ public NetconfDeviceBuilder setReconnectOnSchemasChange(final boolean reconnectOnSchemasChange) {
this.reconnectOnSchemasChange = reconnectOnSchemasChange;
return this;
}
- public NetconfDeviceBuilder setId(RemoteDeviceId id) {
+ public NetconfDeviceBuilder setId(final RemoteDeviceId id) {
this.id = id;
return this;
}
- public NetconfDeviceBuilder setSchemaResourcesDTO(NetconfDevice.SchemaResourcesDTO schemaResourcesDTO) {
+ public NetconfDeviceBuilder setSchemaResourcesDTO(final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO) {
this.schemaResourcesDTO = schemaResourcesDTO;
return this;
}
- public NetconfDeviceBuilder setSalFacade(RemoteDeviceHandler<NetconfSessionPreferences> salFacade) {
+ public NetconfDeviceBuilder setSalFacade(final RemoteDeviceHandler<NetconfSessionPreferences> salFacade) {
this.salFacade = salFacade;
return this;
}
- public NetconfDeviceBuilder setGlobalProcessingExecutor(ExecutorService globalProcessingExecutor) {
+ public NetconfDeviceBuilder setGlobalProcessingExecutor(final ListeningExecutorService globalProcessingExecutor) {
this.globalProcessingExecutor = globalProcessingExecutor;
return this;
}
- public NetconfDeviceBuilder setDeviceActionFactory(DeviceActionFactory deviceActionFactory) {
+ public NetconfDeviceBuilder setDeviceActionFactory(final DeviceActionFactory deviceActionFactory) {
this.deviceActionFactory = deviceActionFactory;
return this;
}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.netconf.sal.connect.netconf;
import static org.junit.Assert.assertEquals;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
return new RemoteDeviceId("test-D", InetSocketAddress.createUnresolved("localhost", 22));
}
- public ExecutorService getExecutor() {
- return Executors.newSingleThreadExecutor();
+ public ListeningExecutorService getExecutor() {
+ return MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
}
public MessageTransformer<NetconfMessage> getMessageTransformer() throws Exception {