From b0c1740dbf07ed82e50b9ad2bef515f76a018050 Mon Sep 17 00:00:00 2001 From: Andrej Mak Date: Tue, 9 May 2017 13:17:48 +0200 Subject: [PATCH] Bug 8405: Add close check to NetconfDevice Since schema resolution runs in its own thread, it is possible, that handleSalInitializationSuccess is called when NetconfDeviceCommunicator was closed meanwhile. Add check to prevent this. Change-Id: If93d32b26f0b98c4c0d47fdd65fdb5104db20bc5 Signed-off-by: Andrej Mak --- .../sal/connect/netconf/NetconfDevice.java | 44 ++++++++---- .../connect/netconf/NetconfDeviceTest.java | 72 ++++++++++++++----- 2 files changed, 83 insertions(+), 33 deletions(-) diff --git a/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/NetconfDevice.java b/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/NetconfDevice.java index 45e67e3631..11efe5a25b 100644 --- a/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/NetconfDevice.java +++ b/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/NetconfDevice.java @@ -28,6 +28,7 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; +import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.md.sal.dom.api.DOMNotification; import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; @@ -81,7 +82,7 @@ public class NetconfDevice public static final Function QNAME_TO_SOURCE_ID_FUNCTION = input -> RevisionSourceIdentifier.create(input.getLocalName(), - Optional.fromNullable(input.getFormattedRevision())); + Optional.fromNullable(input.getFormattedRevision())); protected final RemoteDeviceId id; private final boolean reconnectOnSchemasChange; @@ -95,6 +96,8 @@ public class NetconfDevice private final NotificationHandler notificationHandler; protected final List> sourceRegistrations = Lists.newArrayList(); + @GuardedBy("this") + private boolean connected = false; // Message transformer is constructed once the schemas are available private MessageTransformer messageTransformer; @@ -135,6 +138,7 @@ public class NetconfDevice // Yang models are being downloaded in this method and it would cause a // deadlock if we used the netty thread // http://netty.io/wiki/thread-model.html + setConnected(true); LOG.debug("{}: Session to remote device established with {}", id, remoteSessionCapabilities); final NetconfDeviceRpc initRpc = @@ -170,12 +174,12 @@ public class NetconfDevice private void registerToBaseNetconfStream(final NetconfDeviceRpc deviceRpc, final NetconfDeviceCommunicator listener) { - // TODO check whether the model describing create subscription is present in schema + // TODO check whether the model describing create subscription is present in schema // Perhaps add a default schema context to support create-subscription if the model was not provided // (same as what we do for base netconf operations in transformer) final CheckedFuture rpcResultListenableFuture = deviceRpc.invokeRpc( - NetconfMessageTransformUtil.toPath(NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_QNAME), - NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_CONTENT); + NetconfMessageTransformUtil.toPath(NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_QNAME), + NetconfMessageTransformUtil.CREATE_SUBSCRIPTION_RPC_CONTENT); final NotificationHandler.NotificationFilter filter = new NotificationHandler.NotificationFilter() { @Override @@ -213,24 +217,30 @@ public class NetconfDevice return remoteSessionCapabilities.isNotificationsSupported() && reconnectOnSchemasChange; } - void handleSalInitializationSuccess(final SchemaContext result, + private synchronized void handleSalInitializationSuccess(final SchemaContext result, final NetconfSessionPreferences remoteSessionCapabilities, final DOMRpcService deviceRpc) { - final BaseSchema baseSchema = + //NetconfDevice.SchemaSetup can complete after NetconfDeviceCommunicator was closed. In that case do nothing, + //since salFacade.onDeviceDisconnected was already called. + if (connected) { + final BaseSchema baseSchema = remoteSessionCapabilities.isNotificationsSupported() ? BaseSchema.BASE_NETCONF_CTX_WITH_NOTIFICATIONS : BaseSchema.BASE_NETCONF_CTX; - messageTransformer = new NetconfMessageTransformer(result, true, baseSchema); + messageTransformer = new NetconfMessageTransformer(result, true, baseSchema); - updateTransformer(messageTransformer); - // salFacade.onDeviceConnected has to be called before the notification handler is initialized - salFacade.onDeviceConnected(result, remoteSessionCapabilities, deviceRpc); - notificationHandler.onRemoteSchemaUp(messageTransformer); + updateTransformer(messageTransformer); + // salFacade.onDeviceConnected has to be called before the notification handler is initialized + salFacade.onDeviceConnected(result, remoteSessionCapabilities, deviceRpc); + notificationHandler.onRemoteSchemaUp(messageTransformer); - LOG.info("{}: Netconf connector initialized successfully", id); + LOG.info("{}: Netconf connector initialized successfully", id); + } else { + LOG.warn("{}: Device communicator was closed before schema setup finished.", id); + } } - void handleSalInitializationFailure(final Throwable throwable, - final RemoteDeviceCommunicator listener) { + private void handleSalInitializationFailure(final Throwable throwable, + final RemoteDeviceCommunicator listener) { LOG.error("{}: Initialization in sal failed, disconnecting from device", id, throwable); listener.close(); onRemoteSessionDown(); @@ -248,6 +258,10 @@ public class NetconfDevice messageTransformer = transformer; } + private synchronized void setConnected(final boolean connected) { + this.connected = connected; + } + private void addProvidedSourcesToSchemaRegistry(final DeviceSources deviceSources) { final SchemaSourceProvider yangProvider = deviceSources.getSourceProvider(); for (final SourceIdentifier sourceId : deviceSources.getProvidedSources()) { @@ -259,6 +273,7 @@ public class NetconfDevice @Override public void onRemoteSessionDown() { + setConnected(false); notificationHandler.onRemoteSchemaDown(); salFacade.onDeviceDisconnected(); @@ -271,6 +286,7 @@ public class NetconfDevice @Override public void onRemoteSessionFailed(final Throwable throwable) { + setConnected(false); salFacade.onDeviceFailed(throwable); } diff --git a/netconf/sal-netconf-connector/src/test/java/org/opendaylight/netconf/sal/connect/netconf/NetconfDeviceTest.java b/netconf/sal-netconf-connector/src/test/java/org/opendaylight/netconf/sal/connect/netconf/NetconfDeviceTest.java index 318df85c8d..efa9d6f15c 100644 --- a/netconf/sal-netconf-connector/src/test/java/org/opendaylight/netconf/sal/connect/netconf/NetconfDeviceTest.java +++ b/netconf/sal-netconf-connector/src/test/java/org/opendaylight/netconf/sal/connect/netconf/NetconfDeviceTest.java @@ -12,6 +12,7 @@ import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyCollectionOf; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.after; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; @@ -26,6 +27,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.SettableFuture; import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; @@ -275,9 +277,13 @@ public class NetconfDeviceTest { public void testNotificationBeforeSchema() throws Exception { final RemoteDeviceHandler facade = getFacade(); final NetconfDeviceCommunicator listener = getListener(); - - final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO = new NetconfDevice.SchemaResourcesDTO( - getSchemaRegistry(), getSchemaRepository(), getSchemaFactory(), STATE_SCHEMAS_RESOLVER); + final SchemaContextFactory schemaContextProviderFactory = mock(SchemaContextFactory.class); + final SettableFuture schemaFuture = SettableFuture.create(); + doReturn(Futures.makeChecked(schemaFuture, e -> new SchemaResolutionException("fail"))) + .when(schemaContextProviderFactory).createSchemaContext(any(Collection.class)); + final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO = + new NetconfDevice.SchemaResourcesDTO(getSchemaRegistry(), getSchemaRepository(), + schemaContextProviderFactory, STATE_SCHEMAS_RESOLVER); final NetconfDevice device = new NetconfDeviceBuilder() .setReconnectOnSchemasChange(true) .setSchemaResourcesDTO(schemaResourcesDTO) @@ -286,19 +292,16 @@ public class NetconfDeviceTest { .setSalFacade(facade) .build(); - device.onNotification(NOTIFICATION); - device.onNotification(NOTIFICATION); - - verify(facade, times(0)).onNotification(any(DOMNotification.class)); - final NetconfSessionPreferences sessionCaps = getSessionCaps(true, Lists.newArrayList(TEST_CAPABILITY)); + device.onRemoteSessionUp(sessionCaps, listener); - final DOMRpcService deviceRpc = mock(DOMRpcService.class); - - device.handleSalInitializationSuccess( - NetconfToNotificationTest.getNotificationSchemaContext(getClass(), false), sessionCaps, deviceRpc); + device.onNotification(NOTIFICATION); + device.onNotification(NOTIFICATION); + verify(facade, times(0)).onNotification(any(DOMNotification.class)); + verify(facade, times(0)).onNotification(any(DOMNotification.class)); + schemaFuture.set(NetconfToNotificationTest.getNotificationSchemaContext(getClass(), false)); verify(facade, timeout(10000).times(2)).onNotification(any(DOMNotification.class)); device.onNotification(NOTIFICATION); @@ -339,6 +342,37 @@ public class NetconfDeviceTest { any(SchemaContext.class), any(NetconfSessionPreferences.class), any(DOMRpcService.class)); } + @Test + public void testNetconfDeviceDisconnectListenerCallCancellation() throws Exception { + final RemoteDeviceHandler facade = getFacade(); + final NetconfDeviceCommunicator listener = getListener(); + final SchemaContextFactory schemaContextProviderFactory = mock(SchemaContextFactory.class); + final SettableFuture schemaFuture = SettableFuture.create(); + doReturn(Futures.makeChecked(schemaFuture, e -> new SchemaResolutionException("fail"))) + .when(schemaContextProviderFactory).createSchemaContext(any(Collection.class)); + final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO + = new NetconfDevice.SchemaResourcesDTO(getSchemaRegistry(), getSchemaRepository(), + schemaContextProviderFactory, STATE_SCHEMAS_RESOLVER); + final NetconfDevice device = new NetconfDeviceBuilder() + .setReconnectOnSchemasChange(true) + .setSchemaResourcesDTO(schemaResourcesDTO) + .setGlobalProcessingExecutor(getExecutor()) + .setId(getId()) + .setSalFacade(facade) + .build(); + final NetconfSessionPreferences sessionCaps = getSessionCaps(true, + Lists.newArrayList(TEST_NAMESPACE + "?module=" + TEST_MODULE + "&revision=" + TEST_REVISION)); + //session up, start schema resolution + device.onRemoteSessionUp(sessionCaps, listener); + //session closed + device.onRemoteSessionDown(); + verify(facade, timeout(5000)).onDeviceDisconnected(); + //complete schema setup + schemaFuture.set(getSchema()); + //facade.onDeviceDisconnected() was called, so facade.onDeviceConnected() shouldn't be called anymore + verify(facade, after(1000).never()).onDeviceConnected(any(), any(), any()); + } + @Test public void testNetconfDeviceAvailableCapabilitiesBuilding() throws Exception { final RemoteDeviceHandler facade = getFacade(); @@ -355,22 +389,22 @@ public class NetconfDeviceTest { .setId(getId()) .setSalFacade(facade) .build(); - NetconfDevice netconfSpy = Mockito.spy(device); + final NetconfDevice netconfSpy = Mockito.spy(device); final NetconfSessionPreferences sessionCaps = getSessionCaps(true, Lists.newArrayList(TEST_NAMESPACE + "?module=" + TEST_MODULE + "&revision=" + TEST_REVISION)); - Map moduleBasedCaps = new HashMap<>(); + final Map moduleBasedCaps = new HashMap<>(); moduleBasedCaps.putAll(sessionCaps.getModuleBasedCapsOrigin()); moduleBasedCaps .put(QName.create("(test:qname:side:loading)test"), AvailableCapability.CapabilityOrigin.UserDefined); netconfSpy.onRemoteSessionUp(sessionCaps.replaceModuleCaps(moduleBasedCaps), listener); - ArgumentCaptor argument = ArgumentCaptor.forClass(NetconfSessionPreferences.class); - verify(netconfSpy, timeout(5000)).handleSalInitializationSuccess( - any(SchemaContext.class), (NetconfSessionPreferences) argument.capture(), any(DOMRpcService.class)); - NetconfDeviceCapabilities netconfDeviceCaps = - ((NetconfSessionPreferences) argument.getValue()).getNetconfDeviceCapabilities(); + final ArgumentCaptor argument = + ArgumentCaptor.forClass(NetconfSessionPreferences.class); + verify(facade, timeout(5000)) + .onDeviceConnected(any(SchemaContext.class), argument.capture(), any(DOMRpcService.class)); + final NetconfDeviceCapabilities netconfDeviceCaps = argument.getValue().getNetconfDeviceCapabilities(); netconfDeviceCaps.getResolvedCapabilities() .forEach(entry -> assertEquals("Builded 'AvailableCapability' schemas should match input capabilities.", -- 2.36.6