import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
+import java.io.Serial;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import org.checkerframework.checker.lock.qual.GuardedBy;
import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.mdsal.dom.api.DOMRpcResult;
import org.opendaylight.netconf.api.CapabilityURN;
-import org.opendaylight.netconf.api.NetconfMessage;
+import org.opendaylight.netconf.api.messages.NetconfMessage;
import org.opendaylight.netconf.client.mdsal.api.BaseNetconfSchemas;
import org.opendaylight.netconf.client.mdsal.api.DeviceActionFactory;
import org.opendaylight.netconf.client.mdsal.api.NetconfDeviceSchemasResolver;
import org.opendaylight.netconf.client.mdsal.impl.NetconfMessageTransformer;
import org.opendaylight.netconf.client.mdsal.spi.NetconfDeviceRpc;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev230430.connection.oper.available.capabilities.AvailableCapability;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev230430.connection.oper.available.capabilities.AvailableCapabilityBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev230430.connection.oper.unavailable.capabilities.UnavailableCapability;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev230430.connection.oper.unavailable.capabilities.UnavailableCapability.FailureReason;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev240120.connection.oper.available.capabilities.AvailableCapability;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev240120.connection.oper.available.capabilities.AvailableCapabilityBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev240120.connection.oper.unavailable.capabilities.UnavailableCapability;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev240120.connection.oper.unavailable.capabilities.UnavailableCapability.FailureReason;
import org.opendaylight.yangtools.concepts.Registration;
-import org.opendaylight.yangtools.rfc8528.data.api.MountPointContext;
-import org.opendaylight.yangtools.rfc8528.data.util.EmptyMountPointContext;
import org.opendaylight.yangtools.rfc8528.model.api.SchemaMountConstants;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.MountPointContext;
import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
import org.opendaylight.yangtools.yang.model.repo.api.EffectiveModelContextFactory;
private static final QName RFC8528_SCHEMA_MOUNTS_QNAME = QName.create(
SchemaMountConstants.RFC8528_MODULE, "schema-mounts").intern();
- private static final YangInstanceIdentifier RFC8528_SCHEMA_MOUNTS = YangInstanceIdentifier.create(
+ private static final YangInstanceIdentifier RFC8528_SCHEMA_MOUNTS = YangInstanceIdentifier.of(
NodeIdentifier.create(RFC8528_SCHEMA_MOUNTS_QNAME));
protected final RemoteDeviceId id;
protected final List<Registration> sourceRegistrations = new ArrayList<>();
private final RemoteDeviceHandler salFacade;
- private final ListeningExecutorService processingExecutor;
+ private final Executor processingExecutor;
private final DeviceActionFactory deviceActionFactory;
private final NetconfDeviceSchemasResolver stateSchemasResolver;
private final NotificationHandler notificationHandler;
private final boolean reconnectOnSchemasChange;
private final BaseNetconfSchemas baseSchemas;
+ @GuardedBy("this")
+ private ListenableFuture<List<Object>> schemaFuturesList;
@GuardedBy("this")
private boolean connected = false;
- // Message transformer is constructed once the schemas are available
- private NetconfMessageTransformer messageTransformer;
-
public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final BaseNetconfSchemas baseSchemas,
- final RemoteDeviceId id, final RemoteDeviceHandler salFacade,
- final ListeningExecutorService globalProcessingExecutor, final boolean reconnectOnSchemasChange) {
+ final RemoteDeviceId id, final RemoteDeviceHandler salFacade, final Executor globalProcessingExecutor,
+ final boolean reconnectOnSchemasChange) {
this(schemaResourcesDTO, baseSchemas, id, salFacade, globalProcessingExecutor, reconnectOnSchemasChange, null);
}
public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final BaseNetconfSchemas baseSchemas,
- final RemoteDeviceId id, final RemoteDeviceHandler salFacade,
- final ListeningExecutorService globalProcessingExecutor, final boolean reconnectOnSchemasChange,
- final DeviceActionFactory deviceActionFactory) {
+ final RemoteDeviceId id, final RemoteDeviceHandler salFacade, final Executor globalProcessingExecutor,
+ final boolean reconnectOnSchemasChange, final DeviceActionFactory deviceActionFactory) {
this.baseSchemas = requireNonNull(baseSchemas);
this.id = id;
this.reconnectOnSchemasChange = reconnectOnSchemasChange;
}
@Override
- public void onRemoteSessionUp(final NetconfSessionPreferences remoteSessionCapabilities,
- final NetconfDeviceCommunicator listener) {
- // SchemaContext setup has to be performed in a dedicated thread since
- // we are in a netty thread in this method
- // Yang models are being downloaded in this method and it would cause a
- // deadlock if we used the netty thread
- // http://netty.io/wiki/thread-model.html
+ public synchronized void onRemoteSessionUp(final NetconfSessionPreferences remoteSessionCapabilities,
+ final NetconfDeviceCommunicator listener) {
+ // SchemaContext setup has to be performed in a dedicated thread since we are in a Netty thread in this method
+ // YANG models are being downloaded in this method and it would cause a deadlock if we used the netty thread
+ // https://netty.io/wiki/thread-model.html
setConnected(true);
LOG.debug("{}: Session to remote device established with {}", id, remoteSessionCapabilities);
final BaseSchema baseSchema = resolveBaseSchema(remoteSessionCapabilities.isNotificationsSupported());
final NetconfDeviceRpc initRpc = new NetconfDeviceRpc(baseSchema.getEffectiveModelContext(), listener,
new NetconfMessageTransformer(baseSchema.getMountPointContext(), false, baseSchema));
- final ListenableFuture<DeviceSources> sourceResolverFuture = processingExecutor.submit(
- new DeviceSourcesResolver(id, baseSchema, initRpc, remoteSessionCapabilities, stateSchemasResolver));
+ final var sourceResolverFuture = Futures.submit(new DeviceSourcesResolver(id, baseSchema, initRpc,
+ remoteSessionCapabilities, stateSchemasResolver), processingExecutor);
if (shouldListenOnSchemaChange(remoteSessionCapabilities)) {
registerToBaseNetconfStream(initRpc, listener);
}
- // Set up the SchemaContext for the device
- final ListenableFuture<SchemaResult> futureSchema = Futures.transformAsync(sourceResolverFuture,
+ // Set up the EffectiveModelContext for the device
+ final var futureSchema = Futures.transformAsync(sourceResolverFuture,
deviceSources -> assembleSchemaContext(deviceSources, remoteSessionCapabilities), processingExecutor);
// Potentially acquire mount point list and interpret it
- final ListenableFuture<NetconfDeviceSchema> futureContext = Futures.transformAsync(futureSchema,
+ final var netconfDeviceSchemaFuture = Futures.transformAsync(futureSchema,
result -> Futures.transform(createMountPointContext(result.modelContext(), baseSchema, listener),
mount -> new NetconfDeviceSchema(result.capabilities(), mount), processingExecutor),
processingExecutor);
+ schemaFuturesList = Futures.allAsList(sourceResolverFuture, futureSchema, netconfDeviceSchemaFuture);
- Futures.addCallback(futureContext, new FutureCallback<>() {
- @Override
- public void onSuccess(final NetconfDeviceSchema result) {
- handleSalInitializationSuccess(result, remoteSessionCapabilities,
- getDeviceSpecificRpc(result.mountContext(), listener, baseSchema), listener);
- }
+ Futures.addCallback(netconfDeviceSchemaFuture, new FutureCallback<>() {
+ @Override
+ public void onSuccess(final NetconfDeviceSchema result) {
+ handleSalInitializationSuccess(listener, result, remoteSessionCapabilities,
+ getDeviceSpecificRpc(result.mountContext(), listener, baseSchema));
+ }
- @Override
- public void onFailure(final Throwable cause) {
- LOG.warn("{}: Unexpected error resolving device sources", id, cause);
- // FIXME: this causes salFacade to see onDeviceDisconnected() and then onDeviceFailed(), which is quite
- // weird
- handleSalInitializationFailure(cause, listener);
- salFacade.onDeviceFailed(cause);
- }
- }, MoreExecutors.directExecutor());
+ @Override
+ public void onFailure(final Throwable cause) {
+ // The method onRemoteSessionDown was called while the EffectiveModelContext for the device
+ // was being processed.
+ if (cause instanceof CancellationException) {
+ LOG.warn("{}: Device communicator was tear down since the schema setup started", id);
+ } else {
+ handleSalInitializationFailure(listener, cause);
+ }
+ }
+ }, MoreExecutors.directExecutor());
}
private void registerToBaseNetconfStream(final NetconfDeviceRpc deviceRpc,
@Override
public void onSuccess(final DOMRpcResult domRpcResult) {
notificationHandler.addNotificationFilter(notification -> {
- if (NetconfCapabilityChange.QNAME.equals(notification.getBody().getIdentifier().getNodeType())) {
+ if (NetconfCapabilityChange.QNAME.equals(notification.getBody().name().getNodeType())) {
LOG.info("{}: Schemas change detected, reconnecting", id);
// Only disconnect is enough,
// the reconnecting nature of the connector will take care of reconnecting
return remoteSessionCapabilities.isNotificationsSupported() && reconnectOnSchemasChange;
}
- private synchronized void handleSalInitializationSuccess(final NetconfDeviceSchema deviceSchema,
- final NetconfSessionPreferences remoteSessionCapabilities, final Rpcs deviceRpc,
- final RemoteDeviceCommunicator listener) {
- //NetconfDevice.SchemaSetup can complete after NetconfDeviceCommunicator was closed. In that case do nothing,
- //since salFacade.onDeviceDisconnected was already called.
- if (connected) {
- final var mount = deviceSchema.mountContext();
- messageTransformer = new NetconfMessageTransformer(mount, true,
- resolveBaseSchema(remoteSessionCapabilities.isNotificationsSupported()));
-
- // salFacade.onDeviceConnected has to be called before the notification handler is initialized
- salFacade.onDeviceConnected(deviceSchema, remoteSessionCapabilities,
- new RemoteDeviceServices(deviceRpc, deviceActionFactory == null ? null
- : deviceActionFactory.createDeviceAction(messageTransformer, listener)));
- notificationHandler.onRemoteSchemaUp(messageTransformer);
-
- LOG.info("{}: Netconf connector initialized successfully", id);
- } else {
+ private synchronized void handleSalInitializationSuccess(final RemoteDeviceCommunicator listener,
+ final NetconfDeviceSchema deviceSchema, final NetconfSessionPreferences remoteSessionCapabilities,
+ final Rpcs deviceRpc) {
+ // NetconfDevice.SchemaSetup can complete after NetconfDeviceCommunicator was closed. In that case do nothing,
+ // since salFacade.onDeviceDisconnected was already called.
+ if (!connected) {
LOG.warn("{}: Device communicator was closed before schema setup finished.", id);
+ return;
}
- }
- private void handleSalInitializationFailure(final Throwable throwable, final RemoteDeviceCommunicator listener) {
- LOG.error("{}: Initialization in sal failed, disconnecting from device", id, throwable);
- listener.close();
- onRemoteSessionDown();
- resetMessageTransformer();
+ final var messageTransformer = new NetconfMessageTransformer(deviceSchema.mountContext(), true,
+ resolveBaseSchema(remoteSessionCapabilities.isNotificationsSupported()));
+
+ // Order is important here: salFacade has to see the device come up and then the notificationHandler can deliver
+ // whatever notifications have been held back
+ salFacade.onDeviceConnected(deviceSchema, remoteSessionCapabilities,
+ new RemoteDeviceServices(deviceRpc, deviceActionFactory == null ? null
+ : deviceActionFactory.createDeviceAction(messageTransformer, listener)));
+ notificationHandler.onRemoteSchemaUp(messageTransformer);
+
+ LOG.info("{}: Netconf connector initialized successfully", id);
}
- /**
- * Set the transformer to null as is in initial state.
- */
- private void resetMessageTransformer() {
- updateTransformer(null);
+ private void handleSalInitializationFailure(final RemoteDeviceCommunicator listener, final Throwable cause) {
+ LOG.warn("{}: Unexpected error resolving device sources", id, cause);
+ listener.close();
+ cleanupInitialization();
+ salFacade.onDeviceFailed(cause);
}
- private synchronized void updateTransformer(final NetconfMessageTransformer transformer) {
- messageTransformer = transformer;
+ private synchronized void cleanupInitialization() {
+ connected = false;
+ if (schemaFuturesList != null && !schemaFuturesList.isDone()) {
+ if (!schemaFuturesList.cancel(true)) {
+ LOG.warn("The cleanup of Schema Futures for device {} was unsuccessful.", id);
+ }
+ }
+ notificationHandler.onRemoteSchemaDown();
+ sourceRegistrations.forEach(Registration::close);
+ sourceRegistrations.clear();
}
private synchronized void setConnected(final boolean connected) {
private ListenableFuture<@NonNull MountPointContext> createMountPointContext(
final EffectiveModelContext schemaContext, final BaseSchema baseSchema,
final NetconfDeviceCommunicator listener) {
- final MountPointContext emptyContext = new EmptyMountPointContext(schemaContext);
+ final MountPointContext emptyContext = MountPointContext.of(schemaContext);
if (schemaContext.findModule(SchemaMountConstants.RFC8528_MODULE).isEmpty()) {
return Futures.immediateFuture(emptyContext);
}
@Override
public void onRemoteSessionDown() {
- setConnected(false);
- notificationHandler.onRemoteSchemaDown();
-
+ cleanupInitialization();
salFacade.onDeviceDisconnected();
- sourceRegistrations.forEach(Registration::close);
- sourceRegistrations.clear();
- resetMessageTransformer();
- }
-
- @Override
- public void onRemoteSessionFailed(final Throwable throwable) {
- setConnected(false);
- salFacade.onDeviceFailed(throwable);
}
@Override
}
private BaseSchema resolveBaseSchema(final boolean notificationSupport) {
- return notificationSupport ? baseSchemas.getBaseSchemaWithNotifications() : baseSchemas.getBaseSchema();
+ return notificationSupport ? baseSchemas.baseSchemaWithNotifications() : baseSchemas.baseSchema();
}
protected NetconfDeviceRpc getDeviceSpecificRpc(final MountPointContext result,
* A dedicated exception to indicate when we fail to setup an {@link EffectiveModelContext}.
*/
public static final class EmptySchemaContextException extends Exception {
+ @Serial
private static final long serialVersionUID = 1L;
public EmptySchemaContextException(final String message) {