X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=netconf%2Fnetconf-topology-singleton%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Ftopology%2Fsingleton%2Fimpl%2FNetconfNodeActorTest.java;h=8065c32f17c1ad01cb427bf9b85b131b6821a02e;hb=8e59d67f1b7580c2135cbcc229d4c377c8cc1b09;hp=3bb0125c700dfeee95de8f3bb6500cfc6150664c;hpb=c8fa0b47c985456271ee71ed18f2d25e93225cad;p=netconf.git diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeActorTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeActorTest.java index 3bb0125c70..8065c32f17 100644 --- a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeActorTest.java +++ b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeActorTest.java @@ -5,68 +5,86 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.netconf.topology.singleton.impl; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.argThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.after; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.MockitoAnnotations.initMocks; import static org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils.DEFAULT_SCHEMA_REPOSITORY; -import akka.actor.ActorContext; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; +import akka.actor.Status.Failure; +import akka.actor.Status.Success; +import akka.pattern.AskTimeoutException; import akka.pattern.Patterns; -import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; +import akka.testkit.javadsl.TestKit; import akka.util.Timeout; -import com.google.common.base.MoreObjects; -import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.io.ByteSource; +import com.google.common.net.InetAddresses; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; -import java.io.ByteArrayInputStream; -import java.io.IOException; import java.io.InputStream; -import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Scanner; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; -import org.mockito.ArgumentMatcher; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy; -import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; -import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; -import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult; +import org.opendaylight.mdsal.dom.api.DOMDataBroker; +import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction; +import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction; +import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction; +import org.opendaylight.mdsal.dom.api.DOMMountPoint; +import org.opendaylight.mdsal.dom.api.DOMMountPointService; +import org.opendaylight.mdsal.dom.api.DOMNotificationService; +import org.opendaylight.mdsal.dom.api.DOMRpcException; +import org.opendaylight.mdsal.dom.api.DOMRpcResult; +import org.opendaylight.mdsal.dom.api.DOMRpcService; +import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult; +import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice.SchemaResourcesDTO; import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor; import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringRpcException; import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup; +import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder; import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint; import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData; import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized; +import org.opendaylight.netconf.topology.singleton.messages.NotMasterException; import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData; import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint; +import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint; +import org.opendaylight.yangtools.concepts.ObjectRegistration; +import org.opendaylight.yangtools.util.concurrent.FluentFutures; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; @@ -81,12 +99,14 @@ import org.opendaylight.yangtools.yang.model.repo.api.RevisionSourceIdentifier; import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory; import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository; import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException; -import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceException; +import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter; import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier; import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource; import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource; import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration; import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry; +import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository; +import org.opendaylight.yangtools.yang.parser.rfc7950.repo.TextToASTTransformer; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; @@ -94,301 +114,493 @@ import scala.concurrent.duration.Duration; public class NetconfNodeActorTest { private static final Timeout TIMEOUT = new Timeout(Duration.create(5, "seconds")); - private static ActorSystem system; + private static final RevisionSourceIdentifier SOURCE_IDENTIFIER1 = RevisionSourceIdentifier.create("yang1"); + private static final RevisionSourceIdentifier SOURCE_IDENTIFIER2 = RevisionSourceIdentifier.create("yang2"); + + private ActorSystem system = ActorSystem.create(); + private final TestKit testKit = new TestKit(system); @Rule public final ExpectedException exception = ExpectedException.none(); private ActorRef masterRef; private RemoteDeviceId remoteDeviceId; + private final SharedSchemaRepository masterSchemaRepository = new SharedSchemaRepository("master"); @Mock - private DOMRpcService domRpcService; + private DOMRpcService mockDOMRpcService; + @Mock - private DOMMountPointService mountPointService; + private DOMMountPointService mockMountPointService; + @Mock - private DataBroker dataBroker; + private DOMMountPointService.DOMMountPointBuilder mockMountPointBuilder; + + @Mock + private ObjectRegistration mockMountPointReg; + + @Mock + private DOMDataBroker mockDOMDataBroker; + + @Mock + private SchemaSourceRegistration mockSchemaSourceReg1; + + @Mock + private SchemaSourceRegistration mockSchemaSourceReg2; + + @Mock + private SchemaSourceRegistry mockRegistry; + + @Mock + private SchemaContextFactory mockSchemaContextFactory; + + @Mock + private SchemaRepository mockSchemaRepository; + + @Mock + private SchemaContext mockSchemaContext; + + @Mock + private SchemaResourcesDTO schemaResourceDTO; @Before - public void setup() throws UnknownHostException { - initMocks - (this); + public void setup() { + initMocks(this); + remoteDeviceId = new RemoteDeviceId("netconf-topology", - new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9999)); - final NetconfTopologySetup setup = mock(NetconfTopologySetup.class); + new InetSocketAddress(InetAddresses.forString("127.0.0.1"), 9999)); + + masterSchemaRepository.registerSchemaSourceListener( + TextToASTTransformer.create(masterSchemaRepository, masterSchemaRepository)); - final Props props = NetconfNodeActor.props(setup, remoteDeviceId, DEFAULT_SCHEMA_REPOSITORY, - DEFAULT_SCHEMA_REPOSITORY, TIMEOUT, mountPointService); + doReturn(masterSchemaRepository).when(schemaResourceDTO).getSchemaRepository(); + doReturn(mockRegistry).when(schemaResourceDTO).getSchemaRegistry(); + final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create().setActorSystem(system) + .setIdleTimeout(Duration.apply(1, TimeUnit.SECONDS)).setSchemaResourceDTO(schemaResourceDTO).build(); - system = ActorSystem.create(); + final Props props = NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT, mockMountPointService); masterRef = TestActorRef.create(system, props, "master_messages"); + + resetMountPointMocks(); + + doReturn(mockMountPointBuilder).when(mockMountPointService).createMountPoint(any()); + + doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1)); + doReturn(mockSchemaSourceReg2).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2)); + + doReturn(mockSchemaContextFactory).when(mockSchemaRepository) + .createSchemaContextFactory(any(SchemaSourceFilter.class)); } @After public void teardown() { - JavaTestKit.shutdownActorSystem(system); + TestKit.shutdownActorSystem(system, true); system = null; } @Test - public void testInitDataMessages() throws Exception { + public void testInitializeAndRefreshMasterData() { + + // Test CreateInitialMasterActorData. - final DOMDataBroker domDataBroker = mock(DOMDataBroker.class); - final List sourceIdentifiers = Lists.newArrayList(); + initializeMaster(new ArrayList<>()); - /* Test init master data */ + // Test RefreshSetupMasterActorData. - final Future initialDataToActor = - Patterns.ask(masterRef, new CreateInitialMasterActorData(domDataBroker, sourceIdentifiers, - domRpcService), TIMEOUT); + final RemoteDeviceId newRemoteDeviceId = new RemoteDeviceId("netconf-topology2", + new InetSocketAddress(InetAddresses.forString("127.0.0.2"), 9999)); - final Object success = Await.result(initialDataToActor, TIMEOUT.duration()); - assertTrue(success instanceof MasterActorDataInitialized); + final NetconfTopologySetup newSetup = NetconfTopologySetupBuilder.create() + .setSchemaResourceDTO(schemaResourceDTO).setActorSystem(system).build(); + masterRef.tell(new RefreshSetupMasterActorData(newSetup, newRemoteDeviceId), testKit.getRef()); - /* Test refresh master data */ + testKit.expectMsgClass(MasterActorDataInitialized.class); + } + + @Test + public void tesAskForMasterMountPoint() { - final RemoteDeviceId remoteDeviceId2 = new RemoteDeviceId("netconf-topology2", - new InetSocketAddress(InetAddress.getByName("127.0.0.2"), 9999)); + // Test with master not setup yet. - final NetconfTopologySetup setup2 = mock(NetconfTopologySetup.class); + final TestKit kit = new TestKit(system); - final Future refreshDataToActor = - Patterns.ask(masterRef, new RefreshSetupMasterActorData(setup2, remoteDeviceId2), - TIMEOUT); + masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef()); - final Object success2 = Await.result(refreshDataToActor, TIMEOUT.duration()); - assertTrue(success2 instanceof MasterActorDataInitialized); + final Failure failure = kit.expectMsgClass(Failure.class); + assertTrue(failure.cause() instanceof NotMasterException); + // Now initialize - master should send the RegisterMountPoint message. + + List sourceIdentifiers = Lists.newArrayList(RevisionSourceIdentifier.create("testID")); + initializeMaster(sourceIdentifiers); + + masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef()); + + final RegisterMountPoint registerMountPoint = kit.expectMsgClass(RegisterMountPoint.class); + + assertEquals(sourceIdentifiers, registerMountPoint.getSourceIndentifiers()); } @Test - public void testRegisterMountPointMessage() throws Exception { + public void testRegisterAndUnregisterMountPoint() throws Exception { - final DOMDataBroker domDataBroker = mock(DOMDataBroker.class); - final List sourceIdentifiers = - Lists.newArrayList(RevisionSourceIdentifier.create("testID", Optional.absent())); + ActorRef slaveRef = registerSlaveMountPoint(); - // init master data + // Unregister - final Future initialDataToActor = - Patterns.ask(masterRef, new CreateInitialMasterActorData(domDataBroker, sourceIdentifiers, - domRpcService), TIMEOUT); + slaveRef.tell(new UnregisterSlaveMountPoint(), testKit.getRef()); - final Object successInit = Await.result(initialDataToActor, TIMEOUT.duration()); + verify(mockMountPointReg, timeout(5000)).close(); + verify(mockSchemaSourceReg1, timeout(1000)).close(); + verify(mockSchemaSourceReg2, timeout(1000)).close(); - assertTrue(successInit instanceof MasterActorDataInitialized); + // Test registration with another interleaved registration that completes while the first registration + // is resolving the schema context. - // test if slave get right identifiers from master + reset(mockSchemaSourceReg1, mockRegistry, mockSchemaRepository); + resetMountPointMocks(); - final Future registerMountPointFuture = - Patterns.ask(masterRef, new AskForMasterMountPoint(), - TIMEOUT); + doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1)); - final RegisterMountPoint success = - (RegisterMountPoint) Await.result(registerMountPointFuture, TIMEOUT.duration()); + doReturn(mockSchemaContextFactory).when(mockSchemaRepository) + .createSchemaContextFactory(any(SchemaSourceFilter.class)); - assertEquals(sourceIdentifiers, success.getSourceIndentifiers()); + final SchemaSourceRegistration newMockSchemaSourceReg = mock(SchemaSourceRegistration.class); - } + final SchemaContextFactory newMockSchemaContextFactory = mock(SchemaContextFactory.class); + doReturn(Futures.immediateFuture(mockSchemaContext)) + .when(newMockSchemaContextFactory).createSchemaContext(any()); + + doAnswer(unused -> { + SettableFuture future = SettableFuture.create(); + new Thread(() -> { + doReturn(newMockSchemaSourceReg).when(mockRegistry).registerSchemaSource(any(), + withSourceId(SOURCE_IDENTIFIER1)); + + doReturn(newMockSchemaContextFactory).when(mockSchemaRepository) + .createSchemaContextFactory(any(SchemaSourceFilter.class)); + + slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef), + testKit.getRef()); + + future.set(mockSchemaContext); + }).start(); + return future; + }).when(mockSchemaContextFactory).createSchemaContext(any()); + + doReturn(mockSchemaContextFactory).when(mockSchemaRepository) + .createSchemaContextFactory(any(SchemaSourceFilter.class)); + + slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef), testKit.getRef()); + + verify(mockMountPointBuilder, timeout(5000)).register(); + verify(mockMountPointBuilder, after(500)).addInitialSchemaContext(mockSchemaContext); + verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), any()); + verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any()); + verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any()); + verify(mockSchemaSourceReg1).close(); + verify(mockRegistry, times(2)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1)); + verify(mockSchemaRepository, times(2)).createSchemaContextFactory(any(SchemaSourceFilter.class)); + verifyNoMoreInteractions(mockMountPointBuilder, newMockSchemaSourceReg); + + // Stop the slave actor and verify schema source registrations are closed. - @Test - public void testReceiveRegisterMountpoint() throws Exception { - final NetconfTopologySetup setup = mock(NetconfTopologySetup.class); - final RevisionSourceIdentifier yang1 = RevisionSourceIdentifier.create("yang1"); - final RevisionSourceIdentifier yang2 = RevisionSourceIdentifier.create("yang2"); - final SchemaSourceRegistry registry = mock(SchemaSourceRegistry.class); - final SchemaRepository schemaRepository = mock(SchemaRepository.class); - final SchemaSourceRegistration regYang1 = mock(SchemaSourceRegistration.class); - final SchemaSourceRegistration regYang2 = mock(SchemaSourceRegistration.class); - doReturn(regYang1).when(registry).registerSchemaSource(any(), withSourceId(yang1)); - doReturn(regYang2).when(registry).registerSchemaSource(any(), withSourceId(yang2)); - final SchemaContextFactory schemaContextFactory = mock(SchemaContextFactory.class); - doReturn(schemaContextFactory).when(schemaRepository).createSchemaContextFactory(any()); - final SettableFuture schemaContextFuture = SettableFuture.create(); - final CheckedFuture checkedFuture = - Futures.makeChecked(schemaContextFuture, e -> new SchemaResolutionException("fail", e)); - doReturn(checkedFuture).when(schemaContextFactory).createSchemaContext(any()); - final ActorRef slaveRef = - system.actorOf(NetconfNodeActor.props(setup, remoteDeviceId, registry, schemaRepository, TIMEOUT, - mountPointService)); - final List sources = ImmutableList.of(yang1, yang2); - slaveRef.tell(new RegisterMountPoint(sources), masterRef); - - verify(registry, timeout(1000)).registerSchemaSource(any(), withSourceId(yang1)); - verify(registry, timeout(1000)).registerSchemaSource(any(), withSourceId(yang2)); - //stop actor final Future stopFuture = Patterns.gracefulStop(slaveRef, TIMEOUT.duration()); Await.result(stopFuture, TIMEOUT.duration()); - //provider should be deregistered - verify(regYang1).close(); - verify(regYang2).close(); + + verify(mockMountPointReg).close(); + verify(newMockSchemaSourceReg).close(); } + @SuppressWarnings("unchecked") @Test - public void testYangTextSchemaSourceRequestMessage() throws Exception { - final SchemaRepository schemaRepository = mock(SchemaRepository.class); - final SourceIdentifier sourceIdentifier = RevisionSourceIdentifier.create("testID", Optional.absent()); - final Props props = NetconfNodeActor.props(mock(NetconfTopologySetup.class), remoteDeviceId, - DEFAULT_SCHEMA_REPOSITORY, schemaRepository, TIMEOUT, mountPointService); + public void testRegisterMountPointWithSchemaFailures() throws Exception { + SchemaResourcesDTO schemaResourceDTO2 = mock(SchemaResourcesDTO.class); + doReturn(mockRegistry).when(schemaResourceDTO2).getSchemaRegistry(); + doReturn(mockSchemaRepository).when(schemaResourceDTO2).getSchemaRepository(); + final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create().setSchemaResourceDTO(schemaResourceDTO2) + .setActorSystem(system).build(); - final ActorRef actorRefSchemaRepo = TestActorRef.create(system, props, "master_mocked_schema_repository"); - final ActorContext actorContext = mock(ActorContext.class); - doReturn(system.dispatcher()).when(actorContext).dispatcher(); + final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT, + mockMountPointService)); - final ProxyYangTextSourceProvider proxyYang = - new ProxyYangTextSourceProvider(actorRefSchemaRepo, actorContext, TIMEOUT); - // test if asking for source is resolved and sended back + // Test unrecoverable failure. - final YangTextSchemaSource yangTextSchemaSource = new YangTextSchemaSource(sourceIdentifier) { - @Override - protected MoreObjects.ToStringHelper addToStringAttributes(final MoreObjects.ToStringHelper toStringHelper) { - return null; - } + doReturn(Futures.immediateFailedFuture(new SchemaResolutionException("mock"))) + .when(mockSchemaContextFactory).createSchemaContext(any()); - @Override - public InputStream openStream() throws IOException { - return new ByteArrayInputStream("YANG".getBytes()); - } - }; + slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2), + masterRef), testKit.getRef()); + testKit.expectMsgClass(Success.class); - final CheckedFuture result = - Futures.immediateCheckedFuture(yangTextSchemaSource); + verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1)); + verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2)); - doReturn(result).when(schemaRepository).getSchemaSource(sourceIdentifier, YangTextSchemaSource.class); + verify(mockMountPointBuilder, after(1000).never()).register(); + verify(mockSchemaSourceReg1, timeout(1000)).close(); + verify(mockSchemaSourceReg2, timeout(1000)).close(); - final Future resolvedSchema = - proxyYang.getYangTextSchemaSource(sourceIdentifier); + // Test recoverable AskTimeoutException - schema context resolution should be retried. - final YangTextSchemaSourceSerializationProxy success = Await.result(resolvedSchema, TIMEOUT.duration()); + reset(mockSchemaSourceReg1, mockSchemaSourceReg2); - assertEquals(sourceIdentifier, success.getRepresentation().getIdentifier()); - assertEquals("YANG", convertStreamToString(success.getRepresentation().openStream())); + doReturn(Futures.immediateFailedFuture(new SchemaResolutionException("mock", + new AskTimeoutException("timeout")))) + .doReturn(Futures.immediateFuture(mockSchemaContext)) + .when(mockSchemaContextFactory).createSchemaContext(any()); + slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2), + masterRef), testKit.getRef()); - // test if asking for source is missing - exception.expect(MissingSchemaSourceException.class); + testKit.expectMsgClass(Success.class); + + verify(mockMountPointBuilder, timeout(5000)).register(); + verifyNoMoreInteractions(mockSchemaSourceReg1, mockSchemaSourceReg2); + + // Test AskTimeoutException with an interleaved successful registration. The first schema context resolution + // attempt should not be retried. + + reset(mockSchemaSourceReg1, mockSchemaSourceReg2, mockSchemaRepository, mockSchemaContextFactory); + resetMountPointMocks(); - final SchemaSourceException schemaSourceException = - new MissingSchemaSourceException("Fail", sourceIdentifier); + final SchemaContextFactory mockSchemaContextFactorySuccess = mock(SchemaContextFactory.class); + doReturn(Futures.immediateFuture(mockSchemaContext)) + .when(mockSchemaContextFactorySuccess).createSchemaContext(any()); - final CheckedFuture resultFail = - Futures.immediateFailedCheckedFuture(schemaSourceException); + doAnswer(unused -> { + SettableFuture future = SettableFuture.create(); + new Thread(() -> { + doReturn(mockSchemaContextFactorySuccess).when(mockSchemaRepository) + .createSchemaContextFactory(any(SchemaSourceFilter.class)); - doReturn(resultFail).when(schemaRepository).getSchemaSource(sourceIdentifier, YangTextSchemaSource.class); + slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2), + masterRef), testKit.getRef()); - final Future failedSchema = - proxyYang.getYangTextSchemaSource(sourceIdentifier); + future.setException(new SchemaResolutionException("mock", new AskTimeoutException("timeout"))); + }).start(); + return future; + }).when(mockSchemaContextFactory).createSchemaContext(any()); - Await.result(failedSchema, TIMEOUT.duration()); + doReturn(mockSchemaContextFactory).when(mockSchemaRepository) + .createSchemaContextFactory(any(SchemaSourceFilter.class)); + slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2), + masterRef), testKit.getRef()); + + verify(mockMountPointBuilder, timeout(5000)).register(); + verify(mockSchemaRepository, times(2)).createSchemaContextFactory(any(SchemaSourceFilter.class)); + } + + @Test(expected = MissingSchemaSourceException.class) + public void testMissingSchemaSourceOnMissingProvider() throws Exception { + SchemaResourcesDTO schemaResourceDTO2 = mock(SchemaResourcesDTO.class); + doReturn(DEFAULT_SCHEMA_REPOSITORY).when(schemaResourceDTO2).getSchemaRegistry(); + doReturn(DEFAULT_SCHEMA_REPOSITORY).when(schemaResourceDTO2).getSchemaRepository(); + final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create().setActorSystem(system) + .setSchemaResourceDTO(schemaResourceDTO2).setIdleTimeout(Duration.apply(1, TimeUnit.SECONDS)).build(); + final Props props = NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT, mockMountPointService); + ActorRef actor = TestActorRef.create(system, props, "master_messages_2"); + + final SourceIdentifier sourceIdentifier = RevisionSourceIdentifier.create("testID"); + + final ProxyYangTextSourceProvider proxyYangProvider = + new ProxyYangTextSourceProvider(actor, system.dispatcher(), TIMEOUT); + + final Future resolvedSchemaFuture = + proxyYangProvider.getYangTextSchemaSource(sourceIdentifier); + Await.result(resolvedSchemaFuture, TIMEOUT.duration()); } @Test - public void testProxyDOMRpcService() throws Exception { + public void testYangTextSchemaSourceRequest() throws Exception { + final SourceIdentifier sourceIdentifier = RevisionSourceIdentifier.create("testID"); - final DOMDataBroker domDataBroker = mock(DOMDataBroker.class); - final List sourceIdentifiers = - Lists.newArrayList(RevisionSourceIdentifier.create("testID", Optional.absent())); + final ProxyYangTextSourceProvider proxyYangProvider = + new ProxyYangTextSourceProvider(masterRef, system.dispatcher(), TIMEOUT); - // init master data + final YangTextSchemaSource yangTextSchemaSource = YangTextSchemaSource.delegateForByteSource(sourceIdentifier, + ByteSource.wrap("YANG".getBytes(UTF_8))); - final Future initialDataToActor = - Patterns.ask(masterRef, new CreateInitialMasterActorData(domDataBroker, sourceIdentifiers, - domRpcService), TIMEOUT); + // Test success. - final Object successInit = Await.result(initialDataToActor, TIMEOUT.duration()); + final SchemaSourceRegistration schemaSourceReg = masterSchemaRepository + .registerSchemaSource(id -> Futures.immediateFuture(yangTextSchemaSource), + PotentialSchemaSource.create(sourceIdentifier, YangTextSchemaSource.class, 1)); - assertTrue(successInit instanceof MasterActorDataInitialized); + final Future resolvedSchemaFuture = + proxyYangProvider.getYangTextSchemaSource(sourceIdentifier); - // test if slave get right identifiers from master + final YangTextSchemaSourceSerializationProxy success = Await.result(resolvedSchemaFuture, TIMEOUT.duration()); - final ProxyDOMRpcService slaveDomRPCService = new ProxyDOMRpcService(system, masterRef, remoteDeviceId, TIMEOUT); + assertEquals(sourceIdentifier, success.getRepresentation().getIdentifier()); + assertEquals("YANG", convertStreamToString(success.getRepresentation().openStream())); - final SchemaPath schemaPath = SchemaPath.create(true, QName.create("TestQname")); - final NormalizedNode outputNode = ImmutableContainerNodeBuilder.create() - .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("TestQname"))) - .withChild(ImmutableNodes.leafNode(QName.create("NodeQname"), "foo")).build(); - final RpcError rpcError = RpcResultBuilder.newError(RpcError.ErrorType.RPC, null, "Rpc invocation failed."); - // EmptyResultResponse message + // Test missing source failure. + + exception.expect(MissingSchemaSourceException.class); + + schemaSourceReg.close(); + + final Future failedSchemaFuture = + proxyYangProvider.getYangTextSchemaSource(sourceIdentifier); - doReturn(Futures.immediateCheckedFuture(null)).when(domRpcService).invokeRpc(any(), any()); + Await.result(failedSchemaFuture, TIMEOUT.duration()); + } + + @Test + @SuppressWarnings({"checkstyle:AvoidHidingCauseException", "checkstyle:IllegalThrows"}) + public void testSlaveInvokeRpc() throws Throwable { - final CheckedFuture resultFutureEmpty = - slaveDomRPCService.invokeRpc(schemaPath, outputNode); + final List sourceIdentifiers = + Lists.newArrayList(RevisionSourceIdentifier.create("testID")); - final Object resultNull = resultFutureEmpty.checkedGet(2, TimeUnit.SECONDS); + initializeMaster(sourceIdentifiers); + registerSlaveMountPoint(); - assertEquals(null, resultNull); + ArgumentCaptor domRPCServiceCaptor = ArgumentCaptor.forClass(DOMRpcService.class); + verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), domRPCServiceCaptor.capture()); - // InvokeRpcMessageReply message + final DOMRpcService slaveDomRPCService = domRPCServiceCaptor.getValue(); + assertTrue(slaveDomRPCService instanceof ProxyDOMRpcService); - doReturn(Futures.immediateCheckedFuture(new DefaultDOMRpcResult(outputNode))). - when(domRpcService).invokeRpc(any(), any()); + final QName testQName = QName.create("", "TestQname"); + final SchemaPath schemaPath = SchemaPath.create(true, testQName); + final NormalizedNode outputNode = ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(testQName)) + .withChild(ImmutableNodes.leafNode(testQName, "foo")).build(); + final RpcError rpcError = RpcResultBuilder.newError(RpcError.ErrorType.RPC, null, "Rpc invocation failed."); - final CheckedFuture resultFutureNn = - slaveDomRPCService.invokeRpc(schemaPath, outputNode); + // RPC with no response output. - final DOMRpcResult resultNn = resultFutureNn.checkedGet(2, TimeUnit.SECONDS); + doReturn(FluentFutures.immediateNullFluentFuture()).when(mockDOMRpcService).invokeRpc(any(), any()); - assertEquals(outputNode, resultNn.getResult()); - assertTrue(resultNn.getErrors().isEmpty()); + DOMRpcResult result = slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS); - // InvokeRpcMessageReply message only error + assertEquals(null, result); - doReturn(Futures.immediateCheckedFuture(new DefaultDOMRpcResult(rpcError))) - .when(domRpcService).invokeRpc(any(), any()); + // RPC with response output. - final CheckedFuture resultFutureError = - slaveDomRPCService.invokeRpc(schemaPath, outputNode); + doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(outputNode))) + .when(mockDOMRpcService).invokeRpc(any(), any()); - final DOMRpcResult resultError = resultFutureError.checkedGet(2, TimeUnit.SECONDS); + result = slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS); - assertNull(resultError.getResult()); - assertEquals(rpcError, resultError.getErrors().iterator().next()); + assertEquals(outputNode, result.getResult()); + assertTrue(result.getErrors().isEmpty()); - // InvokeRpcMessageReply message error + result + // RPC with response error. - doReturn(Futures.immediateCheckedFuture(new DefaultDOMRpcResult(outputNode, rpcError))) - .when(domRpcService).invokeRpc(any(), any()); + doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(rpcError))) + .when(mockDOMRpcService).invokeRpc(any(), any()); - final CheckedFuture resultFutureOutputError = - slaveDomRPCService.invokeRpc(schemaPath, outputNode); + result = slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS); - final DOMRpcResult resultOutputError = resultFutureOutputError.checkedGet(2, TimeUnit.SECONDS); + assertNull(result.getResult()); + assertEquals(rpcError, result.getErrors().iterator().next()); + + // RPC with response output and error. + + doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(outputNode, rpcError))) + .when(mockDOMRpcService).invokeRpc(any(), any()); + + final DOMRpcResult resultOutputError = + slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS); assertEquals(outputNode, resultOutputError.getResult()); assertEquals(rpcError, resultOutputError.getErrors().iterator().next()); - // Throwable message + // RPC failure. exception.expect(DOMRpcException.class); - doReturn(Futures.immediateFailedCheckedFuture(new ClusteringRpcException(""))) - .when(domRpcService).invokeRpc(any(), any()); + doReturn(FluentFutures.immediateFailedFluentFuture(new ClusteringRpcException("mock"))) + .when(mockDOMRpcService).invokeRpc(any(), any()); + + try { + slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS); + } catch (ExecutionException e) { + throw e.getCause(); + } + } + + @Test + public void testSlaveNewTransactionRequests() { + + doReturn(mock(DOMDataTreeReadTransaction.class)).when(mockDOMDataBroker).newReadOnlyTransaction(); + doReturn(mock(DOMDataTreeReadWriteTransaction.class)).when(mockDOMDataBroker).newReadWriteTransaction(); + doReturn(mock(DOMDataTreeWriteTransaction.class)).when(mockDOMDataBroker).newWriteOnlyTransaction(); + + initializeMaster(Collections.emptyList()); + registerSlaveMountPoint(); + + ArgumentCaptor domDataBrokerCaptor = ArgumentCaptor.forClass(DOMDataBroker.class); + verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), domDataBrokerCaptor.capture()); + + final DOMDataBroker slaveDOMDataBroker = domDataBrokerCaptor.getValue(); + assertTrue(slaveDOMDataBroker instanceof ProxyDOMDataBroker); + + slaveDOMDataBroker.newReadOnlyTransaction(); + verify(mockDOMDataBroker).newReadOnlyTransaction(); - final CheckedFuture resultFutureThrowable = - slaveDomRPCService.invokeRpc(schemaPath, outputNode); + slaveDOMDataBroker.newReadWriteTransaction(); + verify(mockDOMDataBroker).newReadWriteTransaction(); - resultFutureThrowable.checkedGet(2, TimeUnit.SECONDS); + slaveDOMDataBroker.newWriteOnlyTransaction(); + verify(mockDOMDataBroker).newWriteOnlyTransaction(); + } + + private ActorRef registerSlaveMountPoint() { + SchemaResourcesDTO schemaResourceDTO2 = mock(SchemaResourcesDTO.class); + doReturn(mockRegistry).when(schemaResourceDTO2).getSchemaRegistry(); + doReturn(mockSchemaRepository).when(schemaResourceDTO2).getSchemaRepository(); + final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props( + NetconfTopologySetupBuilder.create().setSchemaResourceDTO(schemaResourceDTO2).setActorSystem(system) + .build(), remoteDeviceId, TIMEOUT, mockMountPointService)); + + doReturn(Futures.immediateFuture(mockSchemaContext)) + .when(mockSchemaContextFactory).createSchemaContext(any()); + + slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2), + masterRef), testKit.getRef()); + + verify(mockMountPointBuilder, timeout(5000)).register(); + verify(mockMountPointBuilder).addInitialSchemaContext(mockSchemaContext); + verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), any()); + verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any()); + verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any()); + + testKit.expectMsgClass(Success.class); + return slaveRef; } - private PotentialSchemaSource withSourceId(final SourceIdentifier identifier) { - return argThat(new ArgumentMatcher() { - @Override - public boolean matches(final Object argument) { - final PotentialSchemaSource potentialSchemaSource = (PotentialSchemaSource) argument; - return identifier.equals(potentialSchemaSource.getSourceIdentifier()); - } - }); + private void initializeMaster(final List sourceIdentifiers) { + masterRef.tell(new CreateInitialMasterActorData(mockDOMDataBroker, sourceIdentifiers, + mockDOMRpcService), testKit.getRef()); + + testKit.expectMsgClass(MasterActorDataInitialized.class); + } + + private void resetMountPointMocks() { + reset(mockMountPointReg, mockMountPointBuilder); + + doNothing().when(mockMountPointReg).close(); + + doReturn(mockMountPointBuilder).when(mockMountPointBuilder).addInitialSchemaContext(any()); + doReturn(mockMountPointBuilder).when(mockMountPointBuilder).addService(any(), any()); + doReturn(mockMountPointReg).when(mockMountPointBuilder).register(); } - private String convertStreamToString(final java.io.InputStream is) { - final java.util.Scanner s = new java.util.Scanner(is).useDelimiter("\\A"); - return s.hasNext() ? s.next() : ""; + private static PotentialSchemaSource withSourceId(final SourceIdentifier identifier) { + return argThat(argument -> identifier.equals(argument.getSourceIdentifier())); } + private static String convertStreamToString(final InputStream is) { + try (Scanner scanner = new Scanner(is)) { + return scanner.useDelimiter("\\A").hasNext() ? scanner.next() : ""; + } + } }