import akka.pattern.Patterns;
import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
import io.netty.util.concurrent.EventExecutor;
+import java.util.concurrent.Executor;
import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
import org.opendaylight.controller.config.threadpool.ThreadPool;
import org.opendaylight.mdsal.binding.api.DataBroker;
private final BaseNetconfSchemas baseSchemas;
private final NetconfClientConfigurationBuilderFactory builderFactory;
private final ScheduledThreadPool keepaliveExecutor;
- private final ListeningExecutorService processingExecutor;
+ private final Executor processingExecutor;
private final DataBroker dataBroker;
private final DOMMountPointService mountPointService;
private final RemoteDeviceId remoteDeviceId;
this.clientDispatcher = requireNonNull(clientDispatcher);
this.eventExecutor = requireNonNull(eventExecutor);
this.keepaliveExecutor = requireNonNull(keepaliveExecutor);
- // FIXME: share a single instance!
- this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor.getExecutor());
+ this.processingExecutor = processingExecutor.getExecutor();
this.schemaManager = requireNonNull(schemaManager);
this.dataBroker = requireNonNull(dataBroker);
this.mountPointService = requireNonNull(mountPointService);
import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.util.concurrent.EventExecutor;
import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.opendaylight.aaa.encrypt.AAAEncryptionService;
import org.opendaylight.controller.cluster.ActorSystemProvider;
private final ScheduledThreadPool keepaliveExecutor;
private final ScheduledExecutorService keepaliveExecutorService;
private final ThreadPool processingExecutor;
- private final ListeningExecutorService processingExecutorService;
+ private final Executor processingExecutorService;
private final ActorSystem actorSystem;
private final EventExecutor eventExecutor;
private final NetconfClientDispatcher clientDispatcher;
this.keepaliveExecutor = keepaliveExecutor;
keepaliveExecutorService = keepaliveExecutor.getExecutor();
this.processingExecutor = processingExecutor;
- processingExecutorService = MoreExecutors.listeningDecorator(processingExecutor.getExecutor());
+ processingExecutorService = processingExecutor.getExecutor();
actorSystem = requireNonNull(actorSystemProvider).getActorSystem();
this.eventExecutor = requireNonNull(eventExecutor);
this.clientDispatcher = requireNonNull(clientDispatcher);
import static java.util.Objects.requireNonNull;
import akka.actor.ActorSystem;
-import com.google.common.util.concurrent.ListeningExecutorService;
import io.netty.util.concurrent.EventExecutor;
import java.time.Duration;
+import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.opendaylight.mdsal.binding.api.DataBroker;
import org.opendaylight.mdsal.dom.api.DOMActionProviderService;
private final InstanceIdentifier<Node> instanceIdentifier;
private final Node node;
private final ScheduledExecutorService keepaliveExecutor;
- private final ListeningExecutorService processingExecutor;
+ private final Executor processingExecutor;
private final ActorSystem actorSystem;
private final EventExecutor eventExecutor;
private final NetconfClientDispatcher netconfClientDispatcher;
return node;
}
- public ListeningExecutorService getProcessingExecutor() {
+ public Executor getProcessingExecutor() {
return processingExecutor;
}
private InstanceIdentifier<Node> instanceIdentifier;
private Node node;
private ScheduledExecutorService keepaliveExecutor;
- private ListeningExecutorService processingExecutor;
+ private Executor processingExecutor;
private ActorSystem actorSystem;
private EventExecutor eventExecutor;
private String topologyId;
return this;
}
- ListeningExecutorService getProcessingExecutor() {
+ Executor getProcessingExecutor() {
return processingExecutor;
}
- public NetconfTopologySetupBuilder setProcessingExecutor(final ListeningExecutorService processingExecutor) {
+ public NetconfTopologySetupBuilder setProcessingExecutor(final Executor processingExecutor) {
this.processingExecutor = processingExecutor;
return this;
}
import static java.util.Objects.requireNonNull;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
import io.netty.util.concurrent.EventExecutor;
import java.util.HashMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
import org.checkerframework.checker.lock.qual.Holding;
import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
import org.opendaylight.controller.config.threadpool.ThreadPool;
private final BaseNetconfSchemas baseSchemas;
private final NetconfClientConfigurationBuilderFactory builderFactory;
- protected final ScheduledThreadPool keepaliveExecutor;
- protected final ListeningExecutorService processingExecutor;
+ protected final ScheduledExecutorService keepaliveExecutor;
+ protected final Executor processingExecutor;
protected final DataBroker dataBroker;
protected final DOMMountPointService mountPointService;
protected final String topologyId;
this.topologyId = requireNonNull(topologyId);
this.clientDispatcher = clientDispatcher;
this.eventExecutor = eventExecutor;
- this.keepaliveExecutor = keepaliveExecutor;
- this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor.getExecutor());
+ this.keepaliveExecutor = keepaliveExecutor.getExecutor();
+ this.processingExecutor = processingExecutor.getExecutor();
this.schemaManager = requireNonNull(schemaManager);
this.deviceActionFactory = deviceActionFactory;
this.dataBroker = requireNonNull(dataBroker);
// Instantiate the handler ...
final var deviceId = NetconfNodeUtils.toRemoteDeviceId(nodeId, netconfNode);
final var deviceSalFacade = createSalFacade(deviceId, netconfNode.requireLockDatastore());
- final var nodeHandler = new NetconfNodeHandler(clientDispatcher, eventExecutor, keepaliveExecutor.getExecutor(),
+ final var nodeHandler = new NetconfNodeHandler(clientDispatcher, eventExecutor, keepaliveExecutor,
baseSchemas, schemaManager, processingExecutor, builderFactory, deviceActionFactory, deviceSalFacade,
deviceId, nodeId, netconfNode, nodeOptional);
import static java.util.Objects.requireNonNull;
-import com.google.common.util.concurrent.ListeningExecutorService;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CancellationException;
+import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.checkerframework.checker.lock.qual.GuardedBy;
public NetconfNodeHandler(final NetconfClientDispatcher clientDispatcher, final EventExecutor eventExecutor,
final ScheduledExecutorService keepaliveExecutor, final BaseNetconfSchemas baseSchemas,
- final SchemaResourceManager schemaManager, final ListeningExecutorService processingExecutor,
+ final SchemaResourceManager schemaManager, final Executor processingExecutor,
final NetconfClientConfigurationBuilderFactory builderFactory,
final DeviceActionFactory deviceActionFactory, final RemoteDeviceHandler delegate,
final RemoteDeviceId deviceId, final NodeId nodeId, final NetconfNode node,
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import java.io.Serial;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import org.checkerframework.checker.lock.qual.GuardedBy;
import org.eclipse.jdt.annotation.NonNull;
private static final QName RFC8528_SCHEMA_MOUNTS_QNAME = QName.create(
SchemaMountConstants.RFC8528_MODULE, "schema-mounts").intern();
- private static final YangInstanceIdentifier RFC8528_SCHEMA_MOUNTS = YangInstanceIdentifier.create(
+ private static final YangInstanceIdentifier RFC8528_SCHEMA_MOUNTS = YangInstanceIdentifier.of(
NodeIdentifier.create(RFC8528_SCHEMA_MOUNTS_QNAME));
protected final RemoteDeviceId id;
protected final List<Registration> sourceRegistrations = new ArrayList<>();
private final RemoteDeviceHandler salFacade;
- private final ListeningExecutorService processingExecutor;
+ private final Executor processingExecutor;
private final DeviceActionFactory deviceActionFactory;
private final NetconfDeviceSchemasResolver stateSchemasResolver;
private final NotificationHandler notificationHandler;
private NetconfMessageTransformer messageTransformer;
public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final BaseNetconfSchemas baseSchemas,
- final RemoteDeviceId id, final RemoteDeviceHandler salFacade,
- final ListeningExecutorService globalProcessingExecutor, final boolean reconnectOnSchemasChange) {
+ final RemoteDeviceId id, final RemoteDeviceHandler salFacade, final Executor globalProcessingExecutor,
+ final boolean reconnectOnSchemasChange) {
this(schemaResourcesDTO, baseSchemas, id, salFacade, globalProcessingExecutor, reconnectOnSchemasChange, null);
}
public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final BaseNetconfSchemas baseSchemas,
- final RemoteDeviceId id, final RemoteDeviceHandler salFacade,
- final ListeningExecutorService globalProcessingExecutor, final boolean reconnectOnSchemasChange,
- final DeviceActionFactory deviceActionFactory) {
+ final RemoteDeviceId id, final RemoteDeviceHandler salFacade, final Executor globalProcessingExecutor,
+ final boolean reconnectOnSchemasChange, final DeviceActionFactory deviceActionFactory) {
this.baseSchemas = requireNonNull(baseSchemas);
this.id = id;
this.reconnectOnSchemasChange = reconnectOnSchemasChange;
final BaseSchema baseSchema = resolveBaseSchema(remoteSessionCapabilities.isNotificationsSupported());
final NetconfDeviceRpc initRpc = new NetconfDeviceRpc(baseSchema.getEffectiveModelContext(), listener,
new NetconfMessageTransformer(baseSchema.getMountPointContext(), false, baseSchema));
- final ListenableFuture<DeviceSources> sourceResolverFuture = processingExecutor.submit(
- new DeviceSourcesResolver(id, baseSchema, initRpc, remoteSessionCapabilities, stateSchemasResolver));
+ final ListenableFuture<DeviceSources> sourceResolverFuture = Futures.submit(
+ new DeviceSourcesResolver(id, baseSchema, initRpc, remoteSessionCapabilities, stateSchemasResolver),
+ processingExecutor);
if (shouldListenOnSchemaChange(remoteSessionCapabilities)) {
registerToBaseNetconfStream(initRpc, listener);
import static java.util.Objects.requireNonNull;
-import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.concurrent.Executor;
import org.opendaylight.netconf.client.mdsal.NetconfDevice.SchemaResourcesDTO;
import org.opendaylight.netconf.client.mdsal.api.BaseNetconfSchemas;
import org.opendaylight.netconf.client.mdsal.api.DeviceActionFactory;
private SchemaResourcesDTO schemaResourcesDTO;
private RemoteDeviceId id;
private RemoteDeviceHandler salFacade;
- private ListeningExecutorService globalProcessingExecutor;
+ private Executor globalProcessingExecutor;
private DeviceActionFactory deviceActionFactory;
private BaseNetconfSchemas baseSchemas;
- public NetconfDeviceBuilder() {
- }
-
public NetconfDeviceBuilder setReconnectOnSchemasChange(final boolean reconnectOnSchemasChange) {
this.reconnectOnSchemasChange = reconnectOnSchemasChange;
return this;
return this;
}
- public NetconfDeviceBuilder setGlobalProcessingExecutor(final ListeningExecutorService globalProcessingExecutor) {
+ public NetconfDeviceBuilder setGlobalProcessingExecutor(final Executor globalProcessingExecutor) {
this.globalProcessingExecutor = globalProcessingExecutor;
return this;
}
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.Executors;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.opendaylight.mdsal.dom.api.DOMNotification;
final NetconfDevice device = new NetconfDeviceBuilder()
.setReconnectOnSchemasChange(true)
.setSchemaResourcesDTO(schemaResourcesDTO)
- .setGlobalProcessingExecutor(getExecutor())
+ .setGlobalProcessingExecutor(MoreExecutors.directExecutor())
.setId(getId())
.setSalFacade(facade)
.setBaseSchemas(BASE_SCHEMAS)
final NetconfDevice device = new NetconfDeviceBuilder()
.setReconnectOnSchemasChange(true)
.setSchemaResourcesDTO(schemaResourcesDTO)
- .setGlobalProcessingExecutor(getExecutor())
+ .setGlobalProcessingExecutor(MoreExecutors.directExecutor())
.setId(getId())
.setSalFacade(facade)
.setBaseSchemas(BASE_SCHEMAS)
final NetconfDevice device = new NetconfDeviceBuilder()
.setReconnectOnSchemasChange(true)
.setSchemaResourcesDTO(schemaResourcesDTO)
- .setGlobalProcessingExecutor(getExecutor())
+ .setGlobalProcessingExecutor(MoreExecutors.directExecutor())
.setBaseSchemas(BASE_SCHEMAS)
.setId(getId())
.setSalFacade(facade)
final NetconfDevice device = new NetconfDeviceBuilder()
.setReconnectOnSchemasChange(true)
.setSchemaResourcesDTO(schemaResourcesDTO)
- .setGlobalProcessingExecutor(getExecutor())
+ .setGlobalProcessingExecutor(MoreExecutors.directExecutor())
.setId(getId())
.setSalFacade(facade)
.setBaseSchemas(BASE_SCHEMAS)
final NetconfDevice device = new NetconfDeviceBuilder()
.setReconnectOnSchemasChange(true)
.setSchemaResourcesDTO(schemaResourcesDTO)
- .setGlobalProcessingExecutor(getExecutor())
+ .setGlobalProcessingExecutor(MoreExecutors.directExecutor())
.setId(getId())
.setSalFacade(facade)
.setBaseSchemas(BASE_SCHEMAS)
final NetconfDevice device = new NetconfDeviceBuilder()
.setReconnectOnSchemasChange(true)
.setSchemaResourcesDTO(schemaResourcesDTO)
- .setGlobalProcessingExecutor(getExecutor())
+ .setGlobalProcessingExecutor(MoreExecutors.directExecutor())
.setId(getId())
.setSalFacade(facade)
.setBaseSchemas(BASE_SCHEMAS)
final NetconfDevice device = new NetconfDeviceBuilder()
.setReconnectOnSchemasChange(true)
.setSchemaResourcesDTO(schemaResourcesDTO)
- .setGlobalProcessingExecutor(getExecutor())
+ .setGlobalProcessingExecutor(MoreExecutors.directExecutor())
.setId(getId())
.setSalFacade(facade)
.setBaseSchemas(BASE_SCHEMAS)
netconfSpy.onRemoteSessionUp(sessionCaps.replaceModuleCaps(moduleBasedCaps), listener);
- final ArgumentCaptor<NetconfDeviceSchema> argument = ArgumentCaptor.forClass(NetconfDeviceSchema.class);
+ final ArgumentCaptor<NetconfDeviceSchema> argument = ArgumentCaptor.forClass(NetconfDeviceSchema.class);
verify(facade, timeout(5000)).onDeviceConnected(argument.capture(), any(NetconfSessionPreferences.class),
any(RemoteDeviceServices.class));
argument.getValue().capabilities().resolvedCapabilities()
getSchemaRegistry(), getSchemaRepository(), schemaContextProviderFactory, STATE_SCHEMAS_RESOLVER);
final NetconfDevice device = new NetconfDeviceBuilder()
.setSchemaResourcesDTO(schemaResourcesDTO)
- .setGlobalProcessingExecutor(getExecutor())
+ .setGlobalProcessingExecutor(MoreExecutors.directExecutor())
.setId(getId())
.setSalFacade(facade)
.setBaseSchemas(BASE_SCHEMAS)
verify(facade, timeout(5000)).onDeviceConnected(argument.capture(), any(NetconfSessionPreferences.class),
any(RemoteDeviceServices.class));
- List<String> notificationModulesName = Arrays.asList(
+ List<String> notificationModulesName = List.of(
org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714
.$YangModuleInfoImpl.getInstance().getName().toString(),
org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715
getSchemaRegistry(), getSchemaRepository(), schemaContextProviderFactory, STATE_SCHEMAS_RESOLVER);
final NetconfDevice device = new NetconfDeviceBuilder()
.setSchemaResourcesDTO(schemaResourcesDTO)
- .setGlobalProcessingExecutor(getExecutor())
+ .setGlobalProcessingExecutor(MoreExecutors.directExecutor())
.setId(getId())
.setSalFacade(facade)
.setBaseSchemas(BASE_SCHEMAS)
getSchemaRegistry(), getSchemaRepository(), schemaContextProviderFactory, STATE_SCHEMAS_RESOLVER);
final NetconfDevice device = new NetconfDeviceBuilder()
.setSchemaResourcesDTO(schemaResourcesDTO)
- .setGlobalProcessingExecutor(getExecutor())
+ .setGlobalProcessingExecutor(MoreExecutors.directExecutor())
.setId(getId())
.setSalFacade(facade)
.setBaseSchemas(BASE_SCHEMAS)
.build();
final NetconfDevice netconfSpy = spy(device);
- final NetconfSessionPreferences sessionCaps = getSessionCaps(false, Collections.emptyList());
+ final NetconfSessionPreferences sessionCaps = getSessionCaps(false, List.of());
final Map<QName, CapabilityOrigin> moduleBasedCaps = new HashMap<>();
moduleBasedCaps.put(org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714
return new RemoteDeviceId("test-D", InetSocketAddress.createUnresolved("localhost", 22));
}
- public ListeningExecutorService getExecutor() {
- return MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
- }
-
public NetconfSessionPreferences getSessionCaps(final boolean addMonitor,
final Collection<String> additionalCapabilities) {
final var capabilities = new ArrayList<String>();
}
public NetconfDeviceCommunicator getListener() throws Exception {
- final NetconfDeviceCommunicator remoteDeviceCommunicator = mockCloseableClass(NetconfDeviceCommunicator.class);
-// doReturn(Futures.immediateFuture(rpcResult))
-// .when(remoteDeviceCommunicator).sendRequest(any(NetconfMessage.class), any(QName.class));
- return remoteDeviceCommunicator;
+ return mockCloseableClass(NetconfDeviceCommunicator.class);
}
}