Bug 8197: Deregister schema sources on actor stop 98/55998/4
authorAndrej Mak <andrej.mak@pantheon.tech>
Mon, 24 Apr 2017 13:08:27 +0000 (15:08 +0200)
committerTomas Cere <tcere@cisco.com>
Thu, 27 Apr 2017 11:27:54 +0000 (11:27 +0000)
NetconfNodeActor registers schema source provider
to schema registry. When mountpoint is removed, this
registration should be removed too. If it isn't,
following issue can happen:

1. Create mountpoint for device1
2. Master actor for device1 is registered as mod-1.yang provider
3. Delete device1
4. Create mountpoint for device2
5. Master actor for device2 is registered as mod-1.yang provider
6. Register slave -
   schemaContextFactory.createSchemaContext(sourceIdentifiers)
   is called
7. Since dead device1 master is still registered as provider,
   ask in ProxyYangTextSourceProvider timeouts
8. After timeout device2 master is queried
9. Device 2 slave mountpoint registered

This delays slave mountpoint registration.

Change-Id: I060c8b1988ba7b54f9a93d7eb37adb5c5e48b23b
Signed-off-by: Andrej Mak <andrej.mak@pantheon.tech>
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeActorTest.java

index 571483afa9ba24e64a9c7b6f4529479620bf6a05..ab2b51f511cba87e700cb7dd8d0e61b72644be96 100644 (file)
@@ -16,6 +16,7 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import java.io.IOException;
 import java.util.List;
+import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
@@ -58,6 +59,7 @@ import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter;
 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
+import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -79,6 +81,7 @@ public class NetconfNodeActor extends UntypedActor {
     private DOMDataBroker deviceDataBroker;
     //readTxActor can be shared
     private ActorRef readTxActor;
+    private List<SchemaSourceRegistration<YangTextSchemaSource>> registeredSchemas;
 
     public static Props props(final NetconfTopologySetup setup,
                               final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
@@ -159,6 +162,11 @@ public class NetconfNodeActor extends UntypedActor {
         }
     }
 
+    @Override
+    public void postStop() throws Exception {
+        super.postStop();
+        closeSchemaSourceRegistrations();
+    }
 
     private void sendYangTextSchemaSourceProxy(final SourceIdentifier sourceIdentifier, final ActorRef sender) {
         final CheckedFuture<YangTextSchemaSource, SchemaSourceException> yangTextSchemaSource =
@@ -213,6 +221,7 @@ public class NetconfNodeActor extends UntypedActor {
         if (this.slaveSalManager != null) {
             slaveSalManager.close();
         }
+        closeSchemaSourceRegistrations();
         slaveSalManager = new SlaveSalFacade(id, setup.getDomBroker(), setup.getActorSystem());
 
         final CheckedFuture<SchemaContext, SchemaResolutionException> remoteSchemaContext =
@@ -244,9 +253,11 @@ public class NetconfNodeActor extends UntypedActor {
         final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider,
                 getContext().dispatcher());
 
-        sourceIdentifiers.forEach(sourceId ->
-                schemaRegistry.registerSchemaSource(remoteProvider, PotentialSchemaSource.create(sourceId,
-                        YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())));
+        registeredSchemas = sourceIdentifiers.stream()
+                .map(sourceId ->
+                        schemaRegistry.registerSchemaSource(remoteProvider, PotentialSchemaSource.create(sourceId,
+                                YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())))
+                .collect(Collectors.toList());
 
         final SchemaContextFactory schemaContextFactory
                 = schemaRepository.createSchemaContextFactory(SchemaSourceFilter.ALWAYS_ACCEPT);
@@ -254,4 +265,11 @@ public class NetconfNodeActor extends UntypedActor {
         return schemaContextFactory.createSchemaContext(sourceIdentifiers);
     }
 
+    private void closeSchemaSourceRegistrations() {
+        if (registeredSchemas != null) {
+            registeredSchemas.forEach(SchemaSourceRegistration::close);
+            registeredSchemas = null;
+        }
+    }
+
 }
index 63246e095198515b7ba2a6f582191c59e95f18cd..7a0c4b0733ee878c49fbcd3632acfdf7df419f37 100644 (file)
@@ -12,8 +12,11 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
 import static org.mockito.MockitoAnnotations.initMocks;
 import static org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils.DEFAULT_SCHEMA_REPOSITORY;
 
@@ -27,9 +30,11 @@ import akka.testkit.TestActorRef;
 import akka.util.Timeout;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -43,6 +48,7 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
+import org.mockito.ArgumentMatcher;
 import org.mockito.Mock;
 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
@@ -50,6 +56,7 @@ import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
 import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult;
+import org.opendaylight.controller.sal.core.api.Broker;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
 import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
 import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringRpcException;
@@ -66,13 +73,19 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException;
 import org.opendaylight.yangtools.yang.model.repo.api.RevisionSourceIdentifier;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceException;
 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
+import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
+import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
+import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
@@ -175,6 +188,39 @@ public class NetconfNodeActorTest {
 
     }
 
+    @Test
+    public void testReceiveRegisterMountpoint() throws Exception {
+        final NetconfTopologySetup setup = mock(NetconfTopologySetup.class);
+        doReturn(mock(Broker.class)).when(setup).getDomBroker();
+        final RevisionSourceIdentifier yang1 = RevisionSourceIdentifier.create("yang1");
+        final RevisionSourceIdentifier yang2 = RevisionSourceIdentifier.create("yang2");
+        final SchemaSourceRegistry registry = mock(SchemaSourceRegistry.class);
+        final SchemaRepository schemaRepository = mock(SchemaRepository.class);
+        final SchemaSourceRegistration regYang1 = mock(SchemaSourceRegistration.class);
+        final SchemaSourceRegistration regYang2 = mock(SchemaSourceRegistration.class);
+        doReturn(regYang1).when(registry).registerSchemaSource(any(), withSourceId(yang1));
+        doReturn(regYang2).when(registry).registerSchemaSource(any(), withSourceId(yang2));
+        final SchemaContextFactory schemaContextFactory = mock(SchemaContextFactory.class);
+        doReturn(schemaContextFactory).when(schemaRepository).createSchemaContextFactory(any());
+        final SettableFuture<SchemaContext> schemaContextFuture = SettableFuture.create();
+        final CheckedFuture<SchemaContext, SchemaResolutionException> checkedFuture =
+                Futures.makeChecked(schemaContextFuture, e -> new SchemaResolutionException("fail", e));
+        doReturn(checkedFuture).when(schemaContextFactory).createSchemaContext(any());
+        final ActorRef slaveRef =
+                system.actorOf(NetconfNodeActor.props(setup, remoteDeviceId, registry, schemaRepository));
+        final List<SourceIdentifier> sources = ImmutableList.of(yang1, yang2);
+        slaveRef.tell(new RegisterMountPoint(sources), masterRef);
+
+        verify(registry, timeout(1000)).registerSchemaSource(any(), withSourceId(yang1));
+        verify(registry, timeout(1000)).registerSchemaSource(any(), withSourceId(yang2));
+        //stop actor
+        final Future<Boolean> stopFuture = Patterns.gracefulStop(slaveRef, TIMEOUT.duration());
+        Await.result(stopFuture, TIMEOUT.duration());
+        //provider should be deregistered
+        verify(regYang1).close();
+        verify(regYang2).close();
+    }
+
     @Test
     public void testYangTextSchemaSourceRequestMessage() throws Exception {
         final SchemaRepository schemaRepository = mock(SchemaRepository.class);
@@ -192,7 +238,7 @@ public class NetconfNodeActorTest {
 
         final YangTextSchemaSource yangTextSchemaSource = new YangTextSchemaSource(sourceIdentifier) {
             @Override
-            protected MoreObjects.ToStringHelper addToStringAttributes(MoreObjects.ToStringHelper toStringHelper) {
+            protected MoreObjects.ToStringHelper addToStringAttributes(final MoreObjects.ToStringHelper toStringHelper) {
                 return null;
             }
 
@@ -325,8 +371,18 @@ public class NetconfNodeActorTest {
 
     }
 
-    private String convertStreamToString(java.io.InputStream is) {
-        java.util.Scanner s = new java.util.Scanner(is).useDelimiter("\\A");
+    private PotentialSchemaSource<?> withSourceId(final SourceIdentifier identifier) {
+        return argThat(new ArgumentMatcher<PotentialSchemaSource>() {
+            @Override
+            public boolean matches(final Object argument) {
+                final PotentialSchemaSource potentialSchemaSource = (PotentialSchemaSource) argument;
+                return identifier.equals(potentialSchemaSource.getSourceIdentifier());
+            }
+        });
+    }
+
+    private String convertStreamToString(final java.io.InputStream is) {
+        final java.util.Scanner s = new java.util.Scanner(is).useDelimiter("\\A");
         return s.hasNext() ? s.next() : "";
     }