X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=plugins%2Fnetconf-client-mdsal%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Fclient%2Fmdsal%2FNetconfDevice.java;h=cefb1cbc7444fff9656736daef5d61dad050d627;hb=1afaf777cb77151f3b04f76854a578fedd046627;hp=8b41bbff4a2ca2986c1c796888bc0bf49b2946e2;hpb=b9ec0da5de634765a97034df88a1250d89fa12c4;p=netconf.git diff --git a/plugins/netconf-client-mdsal/src/main/java/org/opendaylight/netconf/client/mdsal/NetconfDevice.java b/plugins/netconf-client-mdsal/src/main/java/org/opendaylight/netconf/client/mdsal/NetconfDevice.java index 8b41bbff4a..cefb1cbc74 100644 --- a/plugins/netconf-client-mdsal/src/main/java/org/opendaylight/netconf/client/mdsal/NetconfDevice.java +++ b/plugins/netconf-client-mdsal/src/main/java/org/opendaylight/netconf/client/mdsal/NetconfDevice.java @@ -19,9 +19,9 @@ import com.google.common.collect.Sets; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; +import java.io.Serial; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -30,13 +30,15 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.stream.Collectors; import org.checkerframework.checker.lock.qual.GuardedBy; import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.mdsal.dom.api.DOMRpcResult; import org.opendaylight.netconf.api.CapabilityURN; -import org.opendaylight.netconf.api.NetconfMessage; +import org.opendaylight.netconf.api.messages.NetconfMessage; import org.opendaylight.netconf.client.mdsal.api.BaseNetconfSchemas; import org.opendaylight.netconf.client.mdsal.api.DeviceActionFactory; import org.opendaylight.netconf.client.mdsal.api.NetconfDeviceSchemasResolver; @@ -52,17 +54,16 @@ import org.opendaylight.netconf.client.mdsal.impl.NetconfMessageTransformUtil; import org.opendaylight.netconf.client.mdsal.impl.NetconfMessageTransformer; import org.opendaylight.netconf.client.mdsal.spi.NetconfDeviceRpc; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange; -import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev230430.connection.oper.available.capabilities.AvailableCapability; -import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev230430.connection.oper.available.capabilities.AvailableCapabilityBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev230430.connection.oper.unavailable.capabilities.UnavailableCapability; -import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev230430.connection.oper.unavailable.capabilities.UnavailableCapability.FailureReason; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev240120.connection.oper.available.capabilities.AvailableCapability; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev240120.connection.oper.available.capabilities.AvailableCapabilityBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev240120.connection.oper.unavailable.capabilities.UnavailableCapability; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev240120.connection.oper.unavailable.capabilities.UnavailableCapability.FailureReason; import org.opendaylight.yangtools.concepts.Registration; -import org.opendaylight.yangtools.rfc8528.data.api.MountPointContext; -import org.opendaylight.yangtools.rfc8528.data.util.EmptyMountPointContext; import org.opendaylight.yangtools.rfc8528.model.api.SchemaMountConstants; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.MountPointContext; import org.opendaylight.yangtools.yang.data.impl.schema.Builders; import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext; import org.opendaylight.yangtools.yang.model.repo.api.EffectiveModelContextFactory; @@ -83,7 +84,7 @@ public class NetconfDevice implements RemoteDevice { private static final QName RFC8528_SCHEMA_MOUNTS_QNAME = QName.create( SchemaMountConstants.RFC8528_MODULE, "schema-mounts").intern(); - private static final YangInstanceIdentifier RFC8528_SCHEMA_MOUNTS = YangInstanceIdentifier.create( + private static final YangInstanceIdentifier RFC8528_SCHEMA_MOUNTS = YangInstanceIdentifier.of( NodeIdentifier.create(RFC8528_SCHEMA_MOUNTS_QNAME)); protected final RemoteDeviceId id; @@ -94,29 +95,27 @@ public class NetconfDevice implements RemoteDevice { protected final List sourceRegistrations = new ArrayList<>(); private final RemoteDeviceHandler salFacade; - private final ListeningExecutorService processingExecutor; + private final Executor processingExecutor; private final DeviceActionFactory deviceActionFactory; private final NetconfDeviceSchemasResolver stateSchemasResolver; private final NotificationHandler notificationHandler; private final boolean reconnectOnSchemasChange; private final BaseNetconfSchemas baseSchemas; + @GuardedBy("this") + private ListenableFuture> schemaFuturesList; @GuardedBy("this") private boolean connected = false; - // Message transformer is constructed once the schemas are available - private NetconfMessageTransformer messageTransformer; - public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final BaseNetconfSchemas baseSchemas, - final RemoteDeviceId id, final RemoteDeviceHandler salFacade, - final ListeningExecutorService globalProcessingExecutor, final boolean reconnectOnSchemasChange) { + final RemoteDeviceId id, final RemoteDeviceHandler salFacade, final Executor globalProcessingExecutor, + final boolean reconnectOnSchemasChange) { this(schemaResourcesDTO, baseSchemas, id, salFacade, globalProcessingExecutor, reconnectOnSchemasChange, null); } public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final BaseNetconfSchemas baseSchemas, - final RemoteDeviceId id, final RemoteDeviceHandler salFacade, - final ListeningExecutorService globalProcessingExecutor, final boolean reconnectOnSchemasChange, - final DeviceActionFactory deviceActionFactory) { + final RemoteDeviceId id, final RemoteDeviceHandler salFacade, final Executor globalProcessingExecutor, + final boolean reconnectOnSchemasChange, final DeviceActionFactory deviceActionFactory) { this.baseSchemas = requireNonNull(baseSchemas); this.id = id; this.reconnectOnSchemasChange = reconnectOnSchemasChange; @@ -131,52 +130,53 @@ public class NetconfDevice implements RemoteDevice { } @Override - public void onRemoteSessionUp(final NetconfSessionPreferences remoteSessionCapabilities, - final NetconfDeviceCommunicator listener) { - // SchemaContext setup has to be performed in a dedicated thread since - // we are in a netty thread in this method - // Yang models are being downloaded in this method and it would cause a - // deadlock if we used the netty thread - // http://netty.io/wiki/thread-model.html + public synchronized void onRemoteSessionUp(final NetconfSessionPreferences remoteSessionCapabilities, + final NetconfDeviceCommunicator listener) { + // SchemaContext setup has to be performed in a dedicated thread since we are in a Netty thread in this method + // YANG models are being downloaded in this method and it would cause a deadlock if we used the netty thread + // https://netty.io/wiki/thread-model.html setConnected(true); LOG.debug("{}: Session to remote device established with {}", id, remoteSessionCapabilities); final BaseSchema baseSchema = resolveBaseSchema(remoteSessionCapabilities.isNotificationsSupported()); final NetconfDeviceRpc initRpc = new NetconfDeviceRpc(baseSchema.getEffectiveModelContext(), listener, new NetconfMessageTransformer(baseSchema.getMountPointContext(), false, baseSchema)); - final ListenableFuture sourceResolverFuture = processingExecutor.submit( - new DeviceSourcesResolver(id, baseSchema, initRpc, remoteSessionCapabilities, stateSchemasResolver)); + final var sourceResolverFuture = Futures.submit(new DeviceSourcesResolver(id, baseSchema, initRpc, + remoteSessionCapabilities, stateSchemasResolver), processingExecutor); if (shouldListenOnSchemaChange(remoteSessionCapabilities)) { registerToBaseNetconfStream(initRpc, listener); } - // Set up the SchemaContext for the device - final ListenableFuture futureSchema = Futures.transformAsync(sourceResolverFuture, + // Set up the EffectiveModelContext for the device + final var futureSchema = Futures.transformAsync(sourceResolverFuture, deviceSources -> assembleSchemaContext(deviceSources, remoteSessionCapabilities), processingExecutor); // Potentially acquire mount point list and interpret it - final ListenableFuture futureContext = Futures.transformAsync(futureSchema, + final var netconfDeviceSchemaFuture = Futures.transformAsync(futureSchema, result -> Futures.transform(createMountPointContext(result.modelContext(), baseSchema, listener), mount -> new NetconfDeviceSchema(result.capabilities(), mount), processingExecutor), processingExecutor); + schemaFuturesList = Futures.allAsList(sourceResolverFuture, futureSchema, netconfDeviceSchemaFuture); - Futures.addCallback(futureContext, new FutureCallback<>() { - @Override - public void onSuccess(final NetconfDeviceSchema result) { - handleSalInitializationSuccess(result, remoteSessionCapabilities, - getDeviceSpecificRpc(result.mountContext(), listener, baseSchema), listener); - } + Futures.addCallback(netconfDeviceSchemaFuture, new FutureCallback<>() { + @Override + public void onSuccess(final NetconfDeviceSchema result) { + handleSalInitializationSuccess(listener, result, remoteSessionCapabilities, + getDeviceSpecificRpc(result.mountContext(), listener, baseSchema)); + } - @Override - public void onFailure(final Throwable cause) { - LOG.warn("{}: Unexpected error resolving device sources", id, cause); - // FIXME: this causes salFacade to see onDeviceDisconnected() and then onDeviceFailed(), which is quite - // weird - handleSalInitializationFailure(cause, listener); - salFacade.onDeviceFailed(cause); - } - }, MoreExecutors.directExecutor()); + @Override + public void onFailure(final Throwable cause) { + // The method onRemoteSessionDown was called while the EffectiveModelContext for the device + // was being processed. + if (cause instanceof CancellationException) { + LOG.warn("{}: Device communicator was tear down since the schema setup started", id); + } else { + handleSalInitializationFailure(listener, cause); + } + } + }, MoreExecutors.directExecutor()); } private void registerToBaseNetconfStream(final NetconfDeviceRpc deviceRpc, @@ -192,7 +192,7 @@ public class NetconfDevice implements RemoteDevice { @Override public void onSuccess(final DOMRpcResult domRpcResult) { notificationHandler.addNotificationFilter(notification -> { - if (NetconfCapabilityChange.QNAME.equals(notification.getBody().getIdentifier().getNodeType())) { + if (NetconfCapabilityChange.QNAME.equals(notification.getBody().name().getNodeType())) { LOG.info("{}: Schemas change detected, reconnecting", id); // Only disconnect is enough, // the reconnecting nature of the connector will take care of reconnecting @@ -215,44 +215,46 @@ public class NetconfDevice implements RemoteDevice { return remoteSessionCapabilities.isNotificationsSupported() && reconnectOnSchemasChange; } - private synchronized void handleSalInitializationSuccess(final NetconfDeviceSchema deviceSchema, - final NetconfSessionPreferences remoteSessionCapabilities, final Rpcs deviceRpc, - final RemoteDeviceCommunicator listener) { - //NetconfDevice.SchemaSetup can complete after NetconfDeviceCommunicator was closed. In that case do nothing, - //since salFacade.onDeviceDisconnected was already called. - if (connected) { - final var mount = deviceSchema.mountContext(); - messageTransformer = new NetconfMessageTransformer(mount, true, - resolveBaseSchema(remoteSessionCapabilities.isNotificationsSupported())); - - // salFacade.onDeviceConnected has to be called before the notification handler is initialized - salFacade.onDeviceConnected(deviceSchema, remoteSessionCapabilities, - new RemoteDeviceServices(deviceRpc, deviceActionFactory == null ? null - : deviceActionFactory.createDeviceAction(messageTransformer, listener))); - notificationHandler.onRemoteSchemaUp(messageTransformer); - - LOG.info("{}: Netconf connector initialized successfully", id); - } else { + private synchronized void handleSalInitializationSuccess(final RemoteDeviceCommunicator listener, + final NetconfDeviceSchema deviceSchema, final NetconfSessionPreferences remoteSessionCapabilities, + final Rpcs deviceRpc) { + // NetconfDevice.SchemaSetup can complete after NetconfDeviceCommunicator was closed. In that case do nothing, + // since salFacade.onDeviceDisconnected was already called. + if (!connected) { LOG.warn("{}: Device communicator was closed before schema setup finished.", id); + return; } - } - private void handleSalInitializationFailure(final Throwable throwable, final RemoteDeviceCommunicator listener) { - LOG.error("{}: Initialization in sal failed, disconnecting from device", id, throwable); - listener.close(); - onRemoteSessionDown(); - resetMessageTransformer(); + final var messageTransformer = new NetconfMessageTransformer(deviceSchema.mountContext(), true, + resolveBaseSchema(remoteSessionCapabilities.isNotificationsSupported())); + + // Order is important here: salFacade has to see the device come up and then the notificationHandler can deliver + // whatever notifications have been held back + salFacade.onDeviceConnected(deviceSchema, remoteSessionCapabilities, + new RemoteDeviceServices(deviceRpc, deviceActionFactory == null ? null + : deviceActionFactory.createDeviceAction(messageTransformer, listener))); + notificationHandler.onRemoteSchemaUp(messageTransformer); + + LOG.info("{}: Netconf connector initialized successfully", id); } - /** - * Set the transformer to null as is in initial state. - */ - private void resetMessageTransformer() { - updateTransformer(null); + private void handleSalInitializationFailure(final RemoteDeviceCommunicator listener, final Throwable cause) { + LOG.warn("{}: Unexpected error resolving device sources", id, cause); + listener.close(); + cleanupInitialization(); + salFacade.onDeviceFailed(cause); } - private synchronized void updateTransformer(final NetconfMessageTransformer transformer) { - messageTransformer = transformer; + private synchronized void cleanupInitialization() { + connected = false; + if (schemaFuturesList != null && !schemaFuturesList.isDone()) { + if (!schemaFuturesList.cancel(true)) { + LOG.warn("The cleanup of Schema Futures for device {} was unsuccessful.", id); + } + } + notificationHandler.onRemoteSchemaDown(); + sourceRegistrations.forEach(Registration::close); + sourceRegistrations.clear(); } private synchronized void setConnected(final boolean connected) { @@ -271,7 +273,7 @@ public class NetconfDevice implements RemoteDevice { private ListenableFuture<@NonNull MountPointContext> createMountPointContext( final EffectiveModelContext schemaContext, final BaseSchema baseSchema, final NetconfDeviceCommunicator listener) { - final MountPointContext emptyContext = new EmptyMountPointContext(schemaContext); + final MountPointContext emptyContext = MountPointContext.of(schemaContext); if (schemaContext.findModule(SchemaMountConstants.RFC8528_MODULE).isEmpty()) { return Futures.immediateFuture(emptyContext); } @@ -303,19 +305,8 @@ public class NetconfDevice implements RemoteDevice { @Override public void onRemoteSessionDown() { - setConnected(false); - notificationHandler.onRemoteSchemaDown(); - + cleanupInitialization(); salFacade.onDeviceDisconnected(); - sourceRegistrations.forEach(Registration::close); - sourceRegistrations.clear(); - resetMessageTransformer(); - } - - @Override - public void onRemoteSessionFailed(final Throwable throwable) { - setConnected(false); - salFacade.onDeviceFailed(throwable); } @Override @@ -324,7 +315,7 @@ public class NetconfDevice implements RemoteDevice { } private BaseSchema resolveBaseSchema(final boolean notificationSupport) { - return notificationSupport ? baseSchemas.getBaseSchemaWithNotifications() : baseSchemas.getBaseSchema(); + return notificationSupport ? baseSchemas.baseSchemaWithNotifications() : baseSchemas.baseSchema(); } protected NetconfDeviceRpc getDeviceSpecificRpc(final MountPointContext result, @@ -373,6 +364,7 @@ public class NetconfDevice implements RemoteDevice { * A dedicated exception to indicate when we fail to setup an {@link EffectiveModelContext}. */ public static final class EmptySchemaContextException extends Exception { + @Serial private static final long serialVersionUID = 1L; public EmptySchemaContextException(final String message) {