X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=netconf%2Fnetconf-topology-singleton%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Ftopology%2Fsingleton%2Fimpl%2FNetconfNodeActorTest.java;h=8065c32f17c1ad01cb427bf9b85b131b6821a02e;hb=8e59d67f1b7580c2135cbcc229d4c377c8cc1b09;hp=a77104e28dde8f96b2c1678ea9001bcdc80a933d;hpb=77eb72cb594acdcf302a9a49aeb9ced8281bab8c;p=netconf.git diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeActorTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeActorTest.java index a77104e28d..8065c32f17 100644 --- a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeActorTest.java +++ b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeActorTest.java @@ -5,55 +5,108 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.netconf.topology.singleton.impl; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.after; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.MockitoAnnotations.initMocks; import static org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils.DEFAULT_SCHEMA_REPOSITORY; -import akka.actor.ActorContext; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; +import akka.actor.Status.Failure; +import akka.actor.Status.Success; +import akka.pattern.AskTimeoutException; import akka.pattern.Patterns; -import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; +import akka.testkit.javadsl.TestKit; import akka.util.Timeout; -import com.google.common.base.MoreObjects; -import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.io.ByteSource; +import com.google.common.net.InetAddresses; import com.google.common.util.concurrent.Futures; -import java.io.ByteArrayInputStream; -import java.io.IOException; +import com.google.common.util.concurrent.SettableFuture; import java.io.InputStream; -import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Scanner; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy; -import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; +import org.opendaylight.mdsal.dom.api.DOMDataBroker; +import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction; +import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction; +import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction; +import org.opendaylight.mdsal.dom.api.DOMMountPoint; +import org.opendaylight.mdsal.dom.api.DOMMountPointService; +import org.opendaylight.mdsal.dom.api.DOMNotificationService; +import org.opendaylight.mdsal.dom.api.DOMRpcException; +import org.opendaylight.mdsal.dom.api.DOMRpcResult; +import org.opendaylight.mdsal.dom.api.DOMRpcService; +import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult; +import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice.SchemaResourcesDTO; import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor; +import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringRpcException; import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup; +import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder; import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint; import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData; import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized; +import org.opendaylight.netconf.topology.singleton.messages.NotMasterException; import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData; import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint; +import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint; +import org.opendaylight.yangtools.concepts.ObjectRegistration; +import org.opendaylight.yangtools.util.concurrent.FluentFutures; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.common.RpcResultBuilder; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException; +import org.opendaylight.yangtools.yang.model.repo.api.RevisionSourceIdentifier; +import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory; import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository; -import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceException; +import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException; +import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter; import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier; import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource; +import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource; +import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration; +import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry; +import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository; +import org.opendaylight.yangtools.yang.parser.rfc7950.repo.TextToASTTransformer; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; @@ -61,161 +114,493 @@ import scala.concurrent.duration.Duration; public class NetconfNodeActorTest { private static final Timeout TIMEOUT = new Timeout(Duration.create(5, "seconds")); - private static ActorSystem system; + private static final RevisionSourceIdentifier SOURCE_IDENTIFIER1 = RevisionSourceIdentifier.create("yang1"); + private static final RevisionSourceIdentifier SOURCE_IDENTIFIER2 = RevisionSourceIdentifier.create("yang2"); + + private ActorSystem system = ActorSystem.create(); + private final TestKit testKit = new TestKit(system); @Rule public final ExpectedException exception = ExpectedException.none(); private ActorRef masterRef; private RemoteDeviceId remoteDeviceId; + private final SharedSchemaRepository masterSchemaRepository = new SharedSchemaRepository("master"); + + @Mock + private DOMRpcService mockDOMRpcService; + + @Mock + private DOMMountPointService mockMountPointService; + + @Mock + private DOMMountPointService.DOMMountPointBuilder mockMountPointBuilder; + + @Mock + private ObjectRegistration mockMountPointReg; + + @Mock + private DOMDataBroker mockDOMDataBroker; + + @Mock + private SchemaSourceRegistration mockSchemaSourceReg1; + + @Mock + private SchemaSourceRegistration mockSchemaSourceReg2; + + @Mock + private SchemaSourceRegistry mockRegistry; + + @Mock + private SchemaContextFactory mockSchemaContextFactory; + + @Mock + private SchemaRepository mockSchemaRepository; + + @Mock + private SchemaContext mockSchemaContext; + + @Mock + private SchemaResourcesDTO schemaResourceDTO; @Before - public void setup() throws UnknownHostException { + public void setup() { + initMocks(this); remoteDeviceId = new RemoteDeviceId("netconf-topology", - new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9999)); + new InetSocketAddress(InetAddresses.forString("127.0.0.1"), 9999)); - final NetconfTopologySetup setup = mock(NetconfTopologySetup.class); + masterSchemaRepository.registerSchemaSourceListener( + TextToASTTransformer.create(masterSchemaRepository, masterSchemaRepository)); - final Props props = NetconfNodeActor.props(setup, remoteDeviceId, DEFAULT_SCHEMA_REPOSITORY, - DEFAULT_SCHEMA_REPOSITORY); + doReturn(masterSchemaRepository).when(schemaResourceDTO).getSchemaRepository(); + doReturn(mockRegistry).when(schemaResourceDTO).getSchemaRegistry(); + final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create().setActorSystem(system) + .setIdleTimeout(Duration.apply(1, TimeUnit.SECONDS)).setSchemaResourceDTO(schemaResourceDTO).build(); - system = ActorSystem.create(); + final Props props = NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT, mockMountPointService); masterRef = TestActorRef.create(system, props, "master_messages"); + + resetMountPointMocks(); + + doReturn(mockMountPointBuilder).when(mockMountPointService).createMountPoint(any()); + + doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1)); + doReturn(mockSchemaSourceReg2).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2)); + + doReturn(mockSchemaContextFactory).when(mockSchemaRepository) + .createSchemaContextFactory(any(SchemaSourceFilter.class)); } @After public void teardown() { - JavaTestKit.shutdownActorSystem(system); + TestKit.shutdownActorSystem(system, true); system = null; } @Test - public void testInitDataMessages() throws Exception { + public void testInitializeAndRefreshMasterData() { + + // Test CreateInitialMasterActorData. + + initializeMaster(new ArrayList<>()); + + // Test RefreshSetupMasterActorData. + + final RemoteDeviceId newRemoteDeviceId = new RemoteDeviceId("netconf-topology2", + new InetSocketAddress(InetAddresses.forString("127.0.0.2"), 9999)); - final DOMDataBroker domDataBroker = mock(DOMDataBroker.class); - final List sourceIdentifiers = Lists.newArrayList(); + final NetconfTopologySetup newSetup = NetconfTopologySetupBuilder.create() + .setSchemaResourceDTO(schemaResourceDTO).setActorSystem(system).build(); - /* Test init master data */ + masterRef.tell(new RefreshSetupMasterActorData(newSetup, newRemoteDeviceId), testKit.getRef()); - final Future initialDataToActor = - Patterns.ask(masterRef, new CreateInitialMasterActorData(domDataBroker, sourceIdentifiers), - TIMEOUT); + testKit.expectMsgClass(MasterActorDataInitialized.class); + } + + @Test + public void tesAskForMasterMountPoint() { + + // Test with master not setup yet. - final Object success = Await.result(initialDataToActor, TIMEOUT.duration()); - assertTrue(success instanceof MasterActorDataInitialized); + final TestKit kit = new TestKit(system); + masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef()); - /* Test refresh master data */ + final Failure failure = kit.expectMsgClass(Failure.class); + assertTrue(failure.cause() instanceof NotMasterException); - final RemoteDeviceId remoteDeviceId2 = new RemoteDeviceId("netconf-topology2", - new InetSocketAddress(InetAddress.getByName("127.0.0.2"), 9999)); + // Now initialize - master should send the RegisterMountPoint message. - final NetconfTopologySetup setup2 = mock(NetconfTopologySetup.class); + List sourceIdentifiers = Lists.newArrayList(RevisionSourceIdentifier.create("testID")); + initializeMaster(sourceIdentifiers); - final Future refreshDataToActor = - Patterns.ask(masterRef, new RefreshSetupMasterActorData(setup2, remoteDeviceId2), - TIMEOUT); + masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef()); - final Object success2 = Await.result(refreshDataToActor, TIMEOUT.duration()); - assertTrue(success2 instanceof MasterActorDataInitialized); + final RegisterMountPoint registerMountPoint = kit.expectMsgClass(RegisterMountPoint.class); + assertEquals(sourceIdentifiers, registerMountPoint.getSourceIndentifiers()); } @Test - public void testRegisterMountPointMessage() throws Exception { + public void testRegisterAndUnregisterMountPoint() throws Exception { - final DOMDataBroker domDataBroker = mock(DOMDataBroker.class); - final List sourceIdentifiers = - Lists.newArrayList(SourceIdentifier.create("testID", Optional.absent())); + ActorRef slaveRef = registerSlaveMountPoint(); + + // Unregister + + slaveRef.tell(new UnregisterSlaveMountPoint(), testKit.getRef()); - // init master data + verify(mockMountPointReg, timeout(5000)).close(); + verify(mockSchemaSourceReg1, timeout(1000)).close(); + verify(mockSchemaSourceReg2, timeout(1000)).close(); - final Future initialDataToActor = - Patterns.ask(masterRef, new CreateInitialMasterActorData(domDataBroker, sourceIdentifiers), - TIMEOUT); + // Test registration with another interleaved registration that completes while the first registration + // is resolving the schema context. - final Object successInit = Await.result(initialDataToActor, TIMEOUT.duration()); + reset(mockSchemaSourceReg1, mockRegistry, mockSchemaRepository); + resetMountPointMocks(); - assertTrue(successInit instanceof MasterActorDataInitialized); + doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1)); - // test if slave get right identifiers from master + doReturn(mockSchemaContextFactory).when(mockSchemaRepository) + .createSchemaContextFactory(any(SchemaSourceFilter.class)); - final Future registerMountPointFuture = - Patterns.ask(masterRef, new AskForMasterMountPoint(), - TIMEOUT); + final SchemaSourceRegistration newMockSchemaSourceReg = mock(SchemaSourceRegistration.class); - final RegisterMountPoint success = - (RegisterMountPoint) Await.result(registerMountPointFuture, TIMEOUT.duration()); + final SchemaContextFactory newMockSchemaContextFactory = mock(SchemaContextFactory.class); + doReturn(Futures.immediateFuture(mockSchemaContext)) + .when(newMockSchemaContextFactory).createSchemaContext(any()); - assertEquals(sourceIdentifiers, success.getSourceIndentifiers()); + doAnswer(unused -> { + SettableFuture future = SettableFuture.create(); + new Thread(() -> { + doReturn(newMockSchemaSourceReg).when(mockRegistry).registerSchemaSource(any(), + withSourceId(SOURCE_IDENTIFIER1)); + doReturn(newMockSchemaContextFactory).when(mockSchemaRepository) + .createSchemaContextFactory(any(SchemaSourceFilter.class)); + + slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef), + testKit.getRef()); + + future.set(mockSchemaContext); + }).start(); + return future; + }).when(mockSchemaContextFactory).createSchemaContext(any()); + + doReturn(mockSchemaContextFactory).when(mockSchemaRepository) + .createSchemaContextFactory(any(SchemaSourceFilter.class)); + + slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef), testKit.getRef()); + + verify(mockMountPointBuilder, timeout(5000)).register(); + verify(mockMountPointBuilder, after(500)).addInitialSchemaContext(mockSchemaContext); + verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), any()); + verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any()); + verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any()); + verify(mockSchemaSourceReg1).close(); + verify(mockRegistry, times(2)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1)); + verify(mockSchemaRepository, times(2)).createSchemaContextFactory(any(SchemaSourceFilter.class)); + verifyNoMoreInteractions(mockMountPointBuilder, newMockSchemaSourceReg); + + // Stop the slave actor and verify schema source registrations are closed. + + final Future stopFuture = Patterns.gracefulStop(slaveRef, TIMEOUT.duration()); + Await.result(stopFuture, TIMEOUT.duration()); + + verify(mockMountPointReg).close(); + verify(newMockSchemaSourceReg).close(); } + @SuppressWarnings("unchecked") @Test - public void testYangTextSchemaSourceRequestMessage() throws Exception { - final SchemaRepository schemaRepository = mock(SchemaRepository.class); - final SourceIdentifier sourceIdentifier = SourceIdentifier.create("testID", Optional.absent()); - final Props props = NetconfNodeActor.props(mock(NetconfTopologySetup.class), remoteDeviceId, - DEFAULT_SCHEMA_REPOSITORY, schemaRepository); + public void testRegisterMountPointWithSchemaFailures() throws Exception { + SchemaResourcesDTO schemaResourceDTO2 = mock(SchemaResourcesDTO.class); + doReturn(mockRegistry).when(schemaResourceDTO2).getSchemaRegistry(); + doReturn(mockSchemaRepository).when(schemaResourceDTO2).getSchemaRepository(); + final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create().setSchemaResourceDTO(schemaResourceDTO2) + .setActorSystem(system).build(); + + final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT, + mockMountPointService)); + + // Test unrecoverable failure. + + doReturn(Futures.immediateFailedFuture(new SchemaResolutionException("mock"))) + .when(mockSchemaContextFactory).createSchemaContext(any()); + + slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2), + masterRef), testKit.getRef()); + + testKit.expectMsgClass(Success.class); + + verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1)); + verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2)); + + verify(mockMountPointBuilder, after(1000).never()).register(); + verify(mockSchemaSourceReg1, timeout(1000)).close(); + verify(mockSchemaSourceReg2, timeout(1000)).close(); - final ActorRef actorRefSchemaRepo = TestActorRef.create(system, props, "master_mocked_schema_repository"); - final ActorContext actorContext = mock(ActorContext.class); - doReturn(system.dispatcher()).when(actorContext).dispatcher(); + // Test recoverable AskTimeoutException - schema context resolution should be retried. - final ProxyYangTextSourceProvider proxyYang = - new ProxyYangTextSourceProvider(actorRefSchemaRepo, actorContext); - // test if asking for source is resolved and sended back + reset(mockSchemaSourceReg1, mockSchemaSourceReg2); - final YangTextSchemaSource yangTextSchemaSource = new YangTextSchemaSource(sourceIdentifier) { - @Override - protected MoreObjects.ToStringHelper addToStringAttributes(MoreObjects.ToStringHelper toStringHelper) { - return null; - } + doReturn(Futures.immediateFailedFuture(new SchemaResolutionException("mock", + new AskTimeoutException("timeout")))) + .doReturn(Futures.immediateFuture(mockSchemaContext)) + .when(mockSchemaContextFactory).createSchemaContext(any()); - @Override - public InputStream openStream() throws IOException { - return new ByteArrayInputStream("YANG".getBytes()); - } - }; + slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2), + masterRef), testKit.getRef()); + testKit.expectMsgClass(Success.class); - final CheckedFuture result = - Futures.immediateCheckedFuture(yangTextSchemaSource); + verify(mockMountPointBuilder, timeout(5000)).register(); + verifyNoMoreInteractions(mockSchemaSourceReg1, mockSchemaSourceReg2); - doReturn(result).when(schemaRepository).getSchemaSource(sourceIdentifier, YangTextSchemaSource.class); + // Test AskTimeoutException with an interleaved successful registration. The first schema context resolution + // attempt should not be retried. - final Future resolvedSchema = - proxyYang.getYangTextSchemaSource(sourceIdentifier); + reset(mockSchemaSourceReg1, mockSchemaSourceReg2, mockSchemaRepository, mockSchemaContextFactory); + resetMountPointMocks(); - final YangTextSchemaSourceSerializationProxy success = Await.result(resolvedSchema, TIMEOUT.duration()); + final SchemaContextFactory mockSchemaContextFactorySuccess = mock(SchemaContextFactory.class); + doReturn(Futures.immediateFuture(mockSchemaContext)) + .when(mockSchemaContextFactorySuccess).createSchemaContext(any()); + + doAnswer(unused -> { + SettableFuture future = SettableFuture.create(); + new Thread(() -> { + doReturn(mockSchemaContextFactorySuccess).when(mockSchemaRepository) + .createSchemaContextFactory(any(SchemaSourceFilter.class)); + + slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2), + masterRef), testKit.getRef()); + + future.setException(new SchemaResolutionException("mock", new AskTimeoutException("timeout"))); + }).start(); + return future; + }).when(mockSchemaContextFactory).createSchemaContext(any()); + + doReturn(mockSchemaContextFactory).when(mockSchemaRepository) + .createSchemaContextFactory(any(SchemaSourceFilter.class)); + + slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2), + masterRef), testKit.getRef()); + + verify(mockMountPointBuilder, timeout(5000)).register(); + verify(mockSchemaRepository, times(2)).createSchemaContextFactory(any(SchemaSourceFilter.class)); + } + + @Test(expected = MissingSchemaSourceException.class) + public void testMissingSchemaSourceOnMissingProvider() throws Exception { + SchemaResourcesDTO schemaResourceDTO2 = mock(SchemaResourcesDTO.class); + doReturn(DEFAULT_SCHEMA_REPOSITORY).when(schemaResourceDTO2).getSchemaRegistry(); + doReturn(DEFAULT_SCHEMA_REPOSITORY).when(schemaResourceDTO2).getSchemaRepository(); + final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create().setActorSystem(system) + .setSchemaResourceDTO(schemaResourceDTO2).setIdleTimeout(Duration.apply(1, TimeUnit.SECONDS)).build(); + final Props props = NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT, mockMountPointService); + ActorRef actor = TestActorRef.create(system, props, "master_messages_2"); + + final SourceIdentifier sourceIdentifier = RevisionSourceIdentifier.create("testID"); + + final ProxyYangTextSourceProvider proxyYangProvider = + new ProxyYangTextSourceProvider(actor, system.dispatcher(), TIMEOUT); + + final Future resolvedSchemaFuture = + proxyYangProvider.getYangTextSchemaSource(sourceIdentifier); + Await.result(resolvedSchemaFuture, TIMEOUT.duration()); + } + + @Test + public void testYangTextSchemaSourceRequest() throws Exception { + final SourceIdentifier sourceIdentifier = RevisionSourceIdentifier.create("testID"); + + final ProxyYangTextSourceProvider proxyYangProvider = + new ProxyYangTextSourceProvider(masterRef, system.dispatcher(), TIMEOUT); + + final YangTextSchemaSource yangTextSchemaSource = YangTextSchemaSource.delegateForByteSource(sourceIdentifier, + ByteSource.wrap("YANG".getBytes(UTF_8))); + + // Test success. + + final SchemaSourceRegistration schemaSourceReg = masterSchemaRepository + .registerSchemaSource(id -> Futures.immediateFuture(yangTextSchemaSource), + PotentialSchemaSource.create(sourceIdentifier, YangTextSchemaSource.class, 1)); + + final Future resolvedSchemaFuture = + proxyYangProvider.getYangTextSchemaSource(sourceIdentifier); + + final YangTextSchemaSourceSerializationProxy success = Await.result(resolvedSchemaFuture, TIMEOUT.duration()); assertEquals(sourceIdentifier, success.getRepresentation().getIdentifier()); assertEquals("YANG", convertStreamToString(success.getRepresentation().openStream())); + // Test missing source failure. - // test if asking for source is missing exception.expect(MissingSchemaSourceException.class); - final SchemaSourceException schemaSourceException = - new MissingSchemaSourceException("Fail", sourceIdentifier); + schemaSourceReg.close(); + + final Future failedSchemaFuture = + proxyYangProvider.getYangTextSchemaSource(sourceIdentifier); + + Await.result(failedSchemaFuture, TIMEOUT.duration()); + } + + @Test + @SuppressWarnings({"checkstyle:AvoidHidingCauseException", "checkstyle:IllegalThrows"}) + public void testSlaveInvokeRpc() throws Throwable { + + final List sourceIdentifiers = + Lists.newArrayList(RevisionSourceIdentifier.create("testID")); + + initializeMaster(sourceIdentifiers); + registerSlaveMountPoint(); + + ArgumentCaptor domRPCServiceCaptor = ArgumentCaptor.forClass(DOMRpcService.class); + verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), domRPCServiceCaptor.capture()); + + final DOMRpcService slaveDomRPCService = domRPCServiceCaptor.getValue(); + assertTrue(slaveDomRPCService instanceof ProxyDOMRpcService); + + final QName testQName = QName.create("", "TestQname"); + final SchemaPath schemaPath = SchemaPath.create(true, testQName); + final NormalizedNode outputNode = ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(testQName)) + .withChild(ImmutableNodes.leafNode(testQName, "foo")).build(); + final RpcError rpcError = RpcResultBuilder.newError(RpcError.ErrorType.RPC, null, "Rpc invocation failed."); + + // RPC with no response output. + + doReturn(FluentFutures.immediateNullFluentFuture()).when(mockDOMRpcService).invokeRpc(any(), any()); + + DOMRpcResult result = slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS); + + assertEquals(null, result); + + // RPC with response output. + + doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(outputNode))) + .when(mockDOMRpcService).invokeRpc(any(), any()); + + result = slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS); + + assertEquals(outputNode, result.getResult()); + assertTrue(result.getErrors().isEmpty()); + + // RPC with response error. + + doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(rpcError))) + .when(mockDOMRpcService).invokeRpc(any(), any()); + + result = slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS); + + assertNull(result.getResult()); + assertEquals(rpcError, result.getErrors().iterator().next()); - final CheckedFuture resultFail = - Futures.immediateFailedCheckedFuture(schemaSourceException); + // RPC with response output and error. - doReturn(resultFail).when(schemaRepository).getSchemaSource(sourceIdentifier, YangTextSchemaSource.class); + doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(outputNode, rpcError))) + .when(mockDOMRpcService).invokeRpc(any(), any()); - final Future failedSchema = - proxyYang.getYangTextSchemaSource(sourceIdentifier); + final DOMRpcResult resultOutputError = + slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS); - Await.result(failedSchema, TIMEOUT.duration()); + assertEquals(outputNode, resultOutputError.getResult()); + assertEquals(rpcError, resultOutputError.getErrors().iterator().next()); + // RPC failure. + + exception.expect(DOMRpcException.class); + + doReturn(FluentFutures.immediateFailedFluentFuture(new ClusteringRpcException("mock"))) + .when(mockDOMRpcService).invokeRpc(any(), any()); + + try { + slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS); + } catch (ExecutionException e) { + throw e.getCause(); + } + } + + @Test + public void testSlaveNewTransactionRequests() { + + doReturn(mock(DOMDataTreeReadTransaction.class)).when(mockDOMDataBroker).newReadOnlyTransaction(); + doReturn(mock(DOMDataTreeReadWriteTransaction.class)).when(mockDOMDataBroker).newReadWriteTransaction(); + doReturn(mock(DOMDataTreeWriteTransaction.class)).when(mockDOMDataBroker).newWriteOnlyTransaction(); + + initializeMaster(Collections.emptyList()); + registerSlaveMountPoint(); + + ArgumentCaptor domDataBrokerCaptor = ArgumentCaptor.forClass(DOMDataBroker.class); + verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), domDataBrokerCaptor.capture()); + + final DOMDataBroker slaveDOMDataBroker = domDataBrokerCaptor.getValue(); + assertTrue(slaveDOMDataBroker instanceof ProxyDOMDataBroker); + + slaveDOMDataBroker.newReadOnlyTransaction(); + verify(mockDOMDataBroker).newReadOnlyTransaction(); + + slaveDOMDataBroker.newReadWriteTransaction(); + verify(mockDOMDataBroker).newReadWriteTransaction(); + + slaveDOMDataBroker.newWriteOnlyTransaction(); + verify(mockDOMDataBroker).newWriteOnlyTransaction(); + } + + private ActorRef registerSlaveMountPoint() { + SchemaResourcesDTO schemaResourceDTO2 = mock(SchemaResourcesDTO.class); + doReturn(mockRegistry).when(schemaResourceDTO2).getSchemaRegistry(); + doReturn(mockSchemaRepository).when(schemaResourceDTO2).getSchemaRepository(); + final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props( + NetconfTopologySetupBuilder.create().setSchemaResourceDTO(schemaResourceDTO2).setActorSystem(system) + .build(), remoteDeviceId, TIMEOUT, mockMountPointService)); + + doReturn(Futures.immediateFuture(mockSchemaContext)) + .when(mockSchemaContextFactory).createSchemaContext(any()); + + slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2), + masterRef), testKit.getRef()); + + verify(mockMountPointBuilder, timeout(5000)).register(); + verify(mockMountPointBuilder).addInitialSchemaContext(mockSchemaContext); + verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), any()); + verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any()); + verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any()); + + testKit.expectMsgClass(Success.class); + + return slaveRef; + } + + private void initializeMaster(final List sourceIdentifiers) { + masterRef.tell(new CreateInitialMasterActorData(mockDOMDataBroker, sourceIdentifiers, + mockDOMRpcService), testKit.getRef()); + + testKit.expectMsgClass(MasterActorDataInitialized.class); } - private String convertStreamToString(java.io.InputStream is) { - java.util.Scanner s = new java.util.Scanner(is).useDelimiter("\\A"); - return s.hasNext() ? s.next() : ""; + private void resetMountPointMocks() { + reset(mockMountPointReg, mockMountPointBuilder); + + doNothing().when(mockMountPointReg).close(); + + doReturn(mockMountPointBuilder).when(mockMountPointBuilder).addInitialSchemaContext(any()); + doReturn(mockMountPointBuilder).when(mockMountPointBuilder).addService(any(), any()); + doReturn(mockMountPointReg).when(mockMountPointBuilder).register(); } + private static PotentialSchemaSource withSourceId(final SourceIdentifier identifier) { + return argThat(argument -> identifier.equals(argument.getSourceIdentifier())); + } + + private static String convertStreamToString(final InputStream is) { + try (Scanner scanner = new Scanner(is)) { + return scanner.useDelimiter("\\A").hasNext() ? scanner.next() : ""; + } + } }