X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=netconf%2Fnetconf-topology-singleton%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Ftopology%2Fsingleton%2Fimpl%2FNetconfNodeActorTest.java;h=5abb6ce320a87034961515730249c803a8321c80;hb=3eac4fad8be9aae42fd76037e1c0a8ef5dc7b608;hp=4e50b34943c0adb019b3a1c6ca1231fed336ada5;hpb=6d7e12bf3ef64e5004703a1d540e7e26f30a9595;p=netconf.git diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeActorTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeActorTest.java index 4e50b34943..5abb6ce320 100644 --- a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeActorTest.java +++ b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeActorTest.java @@ -8,51 +8,64 @@ package org.opendaylight.netconf.topology.singleton.impl; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.after; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.MockitoAnnotations.initMocks; -import static org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils.DEFAULT_SCHEMA_REPOSITORY; -import akka.actor.ActorContext; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; +import akka.actor.Status.Failure; +import akka.actor.Status.Success; +import akka.pattern.AskTimeoutException; import akka.pattern.Patterns; -import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; +import akka.testkit.javadsl.TestKit; import akka.util.Timeout; -import com.google.common.base.MoreObjects; -import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.google.common.io.ByteSource; import com.google.common.net.InetAddresses; -import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; -import java.io.ByteArrayInputStream; -import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; +import java.util.Collections; import java.util.List; +import java.util.Scanner; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatcher; import org.mockito.Mock; import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy; -import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService; import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; @@ -61,11 +74,15 @@ import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor; import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringRpcException; import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup; +import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder; import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint; import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData; import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized; +import org.opendaylight.netconf.topology.singleton.messages.NotMasterException; import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData; import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint; +import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint; +import org.opendaylight.yangtools.concepts.ObjectRegistration; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; @@ -80,12 +97,14 @@ import org.opendaylight.yangtools.yang.model.repo.api.RevisionSourceIdentifier; import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory; import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository; import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException; -import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceException; +import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter; import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier; import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource; import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource; import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration; import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry; +import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository; +import org.opendaylight.yangtools.yang.parser.rfc7950.repo.TextToASTTransformer; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; @@ -93,302 +112,467 @@ import scala.concurrent.duration.Duration; public class NetconfNodeActorTest { private static final Timeout TIMEOUT = new Timeout(Duration.create(5, "seconds")); - private static ActorSystem system; + private static final RevisionSourceIdentifier SOURCE_IDENTIFIER1 = RevisionSourceIdentifier.create("yang1"); + private static final RevisionSourceIdentifier SOURCE_IDENTIFIER2 = RevisionSourceIdentifier.create("yang2"); + + private ActorSystem system = ActorSystem.create(); + private final TestKit testKit = new TestKit(system); @Rule public final ExpectedException exception = ExpectedException.none(); private ActorRef masterRef; private RemoteDeviceId remoteDeviceId; + private final SharedSchemaRepository masterSchemaRepository = new SharedSchemaRepository("master"); + + @Mock + private DOMRpcService mockDOMRpcService; + + @Mock + private DOMMountPointService mockMountPointService; + + @Mock + private DOMMountPointService.DOMMountPointBuilder mockMountPointBuilder; + + @Mock + private ObjectRegistration mockMountPointReg; + + @Mock + private DOMDataBroker mockDOMDataBroker; + + @Mock + private SchemaSourceRegistration mockSchemaSourceReg1; @Mock - private DOMRpcService domRpcService; + private SchemaSourceRegistration mockSchemaSourceReg2; + + @Mock + private SchemaSourceRegistry mockRegistry; + @Mock - private DOMMountPointService mountPointService; + private SchemaContextFactory mockSchemaContextFactory; + @Mock - private DataBroker dataBroker; + private SchemaRepository mockSchemaRepository; + + @Mock + private SchemaContext mockSchemaContext; @Before public void setup() { initMocks(this); + remoteDeviceId = new RemoteDeviceId("netconf-topology", new InetSocketAddress(InetAddresses.forString("127.0.0.1"), 9999)); - final NetconfTopologySetup setup = mock(NetconfTopologySetup.class); - final Props props = NetconfNodeActor.props(setup, remoteDeviceId, DEFAULT_SCHEMA_REPOSITORY, - DEFAULT_SCHEMA_REPOSITORY, TIMEOUT, mountPointService); + masterSchemaRepository.registerSchemaSourceListener( + TextToASTTransformer.create(masterSchemaRepository, masterSchemaRepository)); - system = ActorSystem.create(); + final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create().setActorSystem(system) + .setIdleTimeout(Duration.apply(1, TimeUnit.SECONDS)).build(); + + final Props props = NetconfNodeActor.props(setup, remoteDeviceId, masterSchemaRepository, + masterSchemaRepository, TIMEOUT, mockMountPointService); masterRef = TestActorRef.create(system, props, "master_messages"); + + resetMountPointMocks(); + + doReturn(mockMountPointBuilder).when(mockMountPointService).createMountPoint(any()); + + doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1)); + doReturn(mockSchemaSourceReg2).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2)); + + doReturn(mockSchemaContextFactory).when(mockSchemaRepository) + .createSchemaContextFactory(any(SchemaSourceFilter.class)); } @After public void teardown() { - JavaTestKit.shutdownActorSystem(system); + TestKit.shutdownActorSystem(system, Boolean.TRUE); system = null; } @Test - public void testInitDataMessages() throws Exception { + public void testInitializeAndRefreshMasterData() { - final DOMDataBroker domDataBroker = mock(DOMDataBroker.class); - final List sourceIdentifiers = Lists.newArrayList(); + // Test CreateInitialMasterActorData. - /* Test init master data */ + initializeMaster(Lists.newArrayList()); - final Future initialDataToActor = - Patterns.ask(masterRef, new CreateInitialMasterActorData(domDataBroker, sourceIdentifiers, - domRpcService), TIMEOUT); + // Test RefreshSetupMasterActorData. - final Object success = Await.result(initialDataToActor, TIMEOUT.duration()); - assertTrue(success instanceof MasterActorDataInitialized); + final RemoteDeviceId newRemoteDeviceId = new RemoteDeviceId("netconf-topology2", + new InetSocketAddress(InetAddresses.forString("127.0.0.2"), 9999)); + final NetconfTopologySetup newSetup = NetconfTopologySetupBuilder.create().setActorSystem(system).build(); - /* Test refresh master data */ + masterRef.tell(new RefreshSetupMasterActorData(newSetup, newRemoteDeviceId), testKit.getRef()); - final RemoteDeviceId remoteDeviceId2 = new RemoteDeviceId("netconf-topology2", - new InetSocketAddress(InetAddresses.forString("127.0.0.2"), 9999)); + testKit.expectMsgClass(MasterActorDataInitialized.class); + } + + @Test + public void tesAskForMasterMountPoint() { + + // Test with master not setup yet. + + final TestKit kit = new TestKit(system); + + masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef()); - final NetconfTopologySetup setup2 = mock(NetconfTopologySetup.class); + final Failure failure = kit.expectMsgClass(Failure.class); + assertTrue(failure.cause() instanceof NotMasterException); - final Future refreshDataToActor = - Patterns.ask(masterRef, new RefreshSetupMasterActorData(setup2, remoteDeviceId2), - TIMEOUT); + // Now initialize - master should send the RegisterMountPoint message. - final Object success2 = Await.result(refreshDataToActor, TIMEOUT.duration()); - assertTrue(success2 instanceof MasterActorDataInitialized); + List sourceIdentifiers = Lists.newArrayList(RevisionSourceIdentifier.create("testID")); + initializeMaster(sourceIdentifiers); + masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef()); + + final RegisterMountPoint registerMountPoint = kit.expectMsgClass(RegisterMountPoint.class); + + assertEquals(sourceIdentifiers, registerMountPoint.getSourceIndentifiers()); } @Test - public void testRegisterMountPointMessage() throws Exception { + public void testRegisterAndUnregisterMountPoint() throws Exception { - final DOMDataBroker domDataBroker = mock(DOMDataBroker.class); - final List sourceIdentifiers = - Lists.newArrayList(RevisionSourceIdentifier.create("testID", Optional.absent())); + ActorRef slaveRef = registerSlaveMountPoint(); - // init master data + // Unregister - final Future initialDataToActor = - Patterns.ask(masterRef, new CreateInitialMasterActorData(domDataBroker, sourceIdentifiers, - domRpcService), TIMEOUT); + slaveRef.tell(new UnregisterSlaveMountPoint(), testKit.getRef()); - final Object successInit = Await.result(initialDataToActor, TIMEOUT.duration()); + verify(mockMountPointReg, timeout(5000)).close(); + verify(mockSchemaSourceReg1, timeout(1000)).close(); + verify(mockSchemaSourceReg2, timeout(1000)).close(); - assertTrue(successInit instanceof MasterActorDataInitialized); + // Test registration with another interleaved registration that completes while the first registration + // is resolving the schema context. - // test if slave get right identifiers from master + reset(mockSchemaSourceReg1, mockRegistry, mockSchemaRepository); + resetMountPointMocks(); - final Future registerMountPointFuture = - Patterns.ask(masterRef, new AskForMasterMountPoint(), - TIMEOUT); + doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1)); - final RegisterMountPoint success = - (RegisterMountPoint) Await.result(registerMountPointFuture, TIMEOUT.duration()); + doReturn(mockSchemaContextFactory).when(mockSchemaRepository) + .createSchemaContextFactory(any(SchemaSourceFilter.class)); - assertEquals(sourceIdentifiers, success.getSourceIndentifiers()); + final SchemaSourceRegistration newMockSchemaSourceReg = mock(SchemaSourceRegistration.class); - } + final SchemaContextFactory newMockSchemaContextFactory = mock(SchemaContextFactory.class); + doReturn(Futures.immediateFuture(mockSchemaContext)) + .when(newMockSchemaContextFactory).createSchemaContext(any()); + + doAnswer(unused -> { + SettableFuture future = SettableFuture.create(); + new Thread(() -> { + doReturn(newMockSchemaSourceReg).when(mockRegistry).registerSchemaSource(any(), + withSourceId(SOURCE_IDENTIFIER1)); + + doReturn(newMockSchemaContextFactory).when(mockSchemaRepository) + .createSchemaContextFactory(any(SchemaSourceFilter.class)); + + slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef), + testKit.getRef()); + + future.set(mockSchemaContext); + }).start(); + return future; + }).when(mockSchemaContextFactory).createSchemaContext(any()); + + doReturn(mockSchemaContextFactory).when(mockSchemaRepository) + .createSchemaContextFactory(any(SchemaSourceFilter.class)); + + slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef), testKit.getRef()); + + verify(mockMountPointBuilder, timeout(5000)).register(); + verify(mockMountPointBuilder, after(500)).addInitialSchemaContext(mockSchemaContext); + verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), any()); + verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any()); + verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any()); + verify(mockSchemaSourceReg1).close(); + verify(mockRegistry, times(2)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1)); + verify(mockSchemaRepository, times(2)).createSchemaContextFactory(any(SchemaSourceFilter.class)); + verifyNoMoreInteractions(mockMountPointBuilder, newMockSchemaSourceReg); + + // Stop the slave actor and verify schema source registrations are closed. - @Test - public void testReceiveRegisterMountpoint() throws Exception { - final NetconfTopologySetup setup = mock(NetconfTopologySetup.class); - final RevisionSourceIdentifier yang1 = RevisionSourceIdentifier.create("yang1"); - final RevisionSourceIdentifier yang2 = RevisionSourceIdentifier.create("yang2"); - final SchemaSourceRegistry registry = mock(SchemaSourceRegistry.class); - final SchemaRepository schemaRepository = mock(SchemaRepository.class); - final SchemaSourceRegistration regYang1 = mock(SchemaSourceRegistration.class); - final SchemaSourceRegistration regYang2 = mock(SchemaSourceRegistration.class); - doReturn(regYang1).when(registry).registerSchemaSource(any(), withSourceId(yang1)); - doReturn(regYang2).when(registry).registerSchemaSource(any(), withSourceId(yang2)); - final SchemaContextFactory schemaContextFactory = mock(SchemaContextFactory.class); - doReturn(schemaContextFactory).when(schemaRepository).createSchemaContextFactory(any()); - final SettableFuture schemaContextFuture = SettableFuture.create(); - final CheckedFuture checkedFuture = - Futures.makeChecked(schemaContextFuture, e -> new SchemaResolutionException("fail", e)); - doReturn(checkedFuture).when(schemaContextFactory).createSchemaContext(any()); - final ActorRef slaveRef = - system.actorOf(NetconfNodeActor.props(setup, remoteDeviceId, registry, schemaRepository, TIMEOUT, - mountPointService)); - final List sources = ImmutableList.of(yang1, yang2); - slaveRef.tell(new RegisterMountPoint(sources), masterRef); - - verify(registry, timeout(1000)).registerSchemaSource(any(), withSourceId(yang1)); - verify(registry, timeout(1000)).registerSchemaSource(any(), withSourceId(yang2)); - //stop actor final Future stopFuture = Patterns.gracefulStop(slaveRef, TIMEOUT.duration()); Await.result(stopFuture, TIMEOUT.duration()); - //provider should be deregistered - verify(regYang1).close(); - verify(regYang2).close(); + + verify(mockMountPointReg).close(); + verify(newMockSchemaSourceReg).close(); } + @SuppressWarnings("unchecked") @Test - public void testYangTextSchemaSourceRequestMessage() throws Exception { - final SchemaRepository schemaRepository = mock(SchemaRepository.class); - final SourceIdentifier sourceIdentifier = RevisionSourceIdentifier.create("testID", Optional.absent()); - final Props props = NetconfNodeActor.props(mock(NetconfTopologySetup.class), remoteDeviceId, - DEFAULT_SCHEMA_REPOSITORY, schemaRepository, TIMEOUT, mountPointService); + public void testRegisterMountPointWithSchemaFailures() throws Exception { + final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create().setActorSystem(system).build(); - final ActorRef actorRefSchemaRepo = TestActorRef.create(system, props, "master_mocked_schema_repository"); - final ActorContext actorContext = mock(ActorContext.class); - doReturn(system.dispatcher()).when(actorContext).dispatcher(); + final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props(setup, remoteDeviceId, mockRegistry, + mockSchemaRepository, TIMEOUT, mockMountPointService)); - final ProxyYangTextSourceProvider proxyYang = - new ProxyYangTextSourceProvider(actorRefSchemaRepo, actorContext, TIMEOUT); - // test if asking for source is resolved and sended back + // Test unrecoverable failure. - final YangTextSchemaSource yangTextSchemaSource = new YangTextSchemaSource(sourceIdentifier) { - @Override - protected MoreObjects.ToStringHelper addToStringAttributes( - final MoreObjects.ToStringHelper toStringHelper) { - return null; - } + doReturn(Futures.immediateFailedFuture(new SchemaResolutionException("mock"))) + .when(mockSchemaContextFactory).createSchemaContext(any()); - @Override - public InputStream openStream() throws IOException { - return new ByteArrayInputStream("YANG".getBytes()); - } - }; + slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2), + masterRef), testKit.getRef()); + testKit.expectMsgClass(Success.class); - final CheckedFuture result = - Futures.immediateCheckedFuture(yangTextSchemaSource); + verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1)); + verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2)); - doReturn(result).when(schemaRepository).getSchemaSource(sourceIdentifier, YangTextSchemaSource.class); + verify(mockMountPointBuilder, after(1000).never()).register(); + verify(mockSchemaSourceReg1, timeout(1000)).close(); + verify(mockSchemaSourceReg2, timeout(1000)).close(); - final Future resolvedSchema = - proxyYang.getYangTextSchemaSource(sourceIdentifier); + // Test recoverable AskTimeoutException - schema context resolution should be retried. - final YangTextSchemaSourceSerializationProxy success = Await.result(resolvedSchema, TIMEOUT.duration()); + reset(mockSchemaSourceReg1, mockSchemaSourceReg2); - assertEquals(sourceIdentifier, success.getRepresentation().getIdentifier()); - assertEquals("YANG", convertStreamToString(success.getRepresentation().openStream())); + doReturn(Futures.immediateFailedFuture(new SchemaResolutionException("mock", + new AskTimeoutException("timeout")))) + .doReturn(Futures.immediateFuture(mockSchemaContext)) + .when(mockSchemaContextFactory).createSchemaContext(any()); + slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2), + masterRef), testKit.getRef()); - // test if asking for source is missing - exception.expect(MissingSchemaSourceException.class); + testKit.expectMsgClass(Success.class); + + verify(mockMountPointBuilder, timeout(5000)).register(); + verifyNoMoreInteractions(mockSchemaSourceReg1, mockSchemaSourceReg2); + + // Test AskTimeoutException with an interleaved successful registration. The first schema context resolution + // attempt should not be retried. + + reset(mockSchemaSourceReg1, mockSchemaSourceReg2, mockSchemaRepository, mockSchemaContextFactory); + resetMountPointMocks(); + + final SchemaContextFactory mockSchemaContextFactorySuccess = mock(SchemaContextFactory.class); + doReturn(Futures.immediateFuture(mockSchemaContext)) + .when(mockSchemaContextFactorySuccess).createSchemaContext(any()); - final SchemaSourceException schemaSourceException = - new MissingSchemaSourceException("Fail", sourceIdentifier); + doAnswer(unused -> { + SettableFuture future = SettableFuture.create(); + new Thread(() -> { + doReturn(mockSchemaContextFactorySuccess).when(mockSchemaRepository) + .createSchemaContextFactory(any(SchemaSourceFilter.class)); - final CheckedFuture resultFail = - Futures.immediateFailedCheckedFuture(schemaSourceException); + slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2), + masterRef), testKit.getRef()); - doReturn(resultFail).when(schemaRepository).getSchemaSource(sourceIdentifier, YangTextSchemaSource.class); + future.setException(new SchemaResolutionException("mock", new AskTimeoutException("timeout"))); + }).start(); + return future; + }).when(mockSchemaContextFactory).createSchemaContext(any()); - final Future failedSchema = - proxyYang.getYangTextSchemaSource(sourceIdentifier); + doReturn(mockSchemaContextFactory).when(mockSchemaRepository) + .createSchemaContextFactory(any(SchemaSourceFilter.class)); - Await.result(failedSchema, TIMEOUT.duration()); + slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2), + masterRef), testKit.getRef()); + verify(mockMountPointBuilder, timeout(5000)).register(); + verify(mockSchemaRepository, times(2)).createSchemaContextFactory(any(SchemaSourceFilter.class)); } @Test - public void testProxyDOMRpcService() throws Exception { + public void testYangTextSchemaSourceRequest() throws Exception { + final SourceIdentifier sourceIdentifier = RevisionSourceIdentifier.create("testID"); - final DOMDataBroker domDataBroker = mock(DOMDataBroker.class); - final List sourceIdentifiers = - Lists.newArrayList(RevisionSourceIdentifier.create("testID", Optional.absent())); + final ProxyYangTextSourceProvider proxyYangProvider = + new ProxyYangTextSourceProvider(masterRef, system.dispatcher(), TIMEOUT); + + final YangTextSchemaSource yangTextSchemaSource = YangTextSchemaSource.delegateForByteSource(sourceIdentifier, + ByteSource.wrap("YANG".getBytes(UTF_8))); + + // Test success. - // init master data + final SchemaSourceRegistration schemaSourceReg = masterSchemaRepository + .registerSchemaSource(id -> Futures.immediateFuture(yangTextSchemaSource), + PotentialSchemaSource.create(sourceIdentifier, YangTextSchemaSource.class, 1)); - final Future initialDataToActor = - Patterns.ask(masterRef, new CreateInitialMasterActorData(domDataBroker, sourceIdentifiers, - domRpcService), TIMEOUT); + final Future resolvedSchemaFuture = + proxyYangProvider.getYangTextSchemaSource(sourceIdentifier); - final Object successInit = Await.result(initialDataToActor, TIMEOUT.duration()); + final YangTextSchemaSourceSerializationProxy success = Await.result(resolvedSchemaFuture, TIMEOUT.duration()); - assertTrue(successInit instanceof MasterActorDataInitialized); + assertEquals(sourceIdentifier, success.getRepresentation().getIdentifier()); + assertEquals("YANG", convertStreamToString(success.getRepresentation().openStream())); - // test if slave get right identifiers from master + // Test missing source failure. - final ProxyDOMRpcService slaveDomRPCService = - new ProxyDOMRpcService(system, masterRef, remoteDeviceId, TIMEOUT); + exception.expect(MissingSchemaSourceException.class); - final SchemaPath schemaPath = SchemaPath.create(true, QName.create("TestQname")); + schemaSourceReg.close(); + + final Future failedSchemaFuture = + proxyYangProvider.getYangTextSchemaSource(sourceIdentifier); + + Await.result(failedSchemaFuture, TIMEOUT.duration()); + } + + @Test + @SuppressWarnings({"checkstyle:AvoidHidingCauseException", "checkstyle:IllegalThrows"}) + public void testSlaveInvokeRpc() throws Throwable { + + final List sourceIdentifiers = + Lists.newArrayList(RevisionSourceIdentifier.create("testID")); + + initializeMaster(sourceIdentifiers); + registerSlaveMountPoint(); + + ArgumentCaptor domRPCServiceCaptor = ArgumentCaptor.forClass(DOMRpcService.class); + verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), domRPCServiceCaptor.capture()); + + final DOMRpcService slaveDomRPCService = domRPCServiceCaptor.getValue(); + assertTrue(slaveDomRPCService instanceof ProxyDOMRpcService); + + final QName testQName = QName.create("", "TestQname"); + final SchemaPath schemaPath = SchemaPath.create(true, testQName); final NormalizedNode outputNode = ImmutableContainerNodeBuilder.create() - .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("TestQname"))) - .withChild(ImmutableNodes.leafNode(QName.create("NodeQname"), "foo")).build(); + .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(testQName)) + .withChild(ImmutableNodes.leafNode(testQName, "foo")).build(); final RpcError rpcError = RpcResultBuilder.newError(RpcError.ErrorType.RPC, null, "Rpc invocation failed."); - // EmptyResultResponse message - doReturn(Futures.immediateCheckedFuture(null)).when(domRpcService).invokeRpc(any(), any()); + // RPC with no response output. - final CheckedFuture resultFutureEmpty = - slaveDomRPCService.invokeRpc(schemaPath, outputNode); + doReturn(Futures.immediateCheckedFuture(null)).when(mockDOMRpcService).invokeRpc(any(), any()); - final Object resultNull = resultFutureEmpty.checkedGet(2, TimeUnit.SECONDS); + DOMRpcResult result = slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS); - assertEquals(null, resultNull); + assertEquals(null, result); - // InvokeRpcMessageReply message + // RPC with response output. doReturn(Futures.immediateCheckedFuture(new DefaultDOMRpcResult(outputNode))) - .when(domRpcService).invokeRpc(any(), any()); - - final CheckedFuture resultFutureNn = - slaveDomRPCService.invokeRpc(schemaPath, outputNode); + .when(mockDOMRpcService).invokeRpc(any(), any()); - final DOMRpcResult resultNn = resultFutureNn.checkedGet(2, TimeUnit.SECONDS); + result = slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS); - assertEquals(outputNode, resultNn.getResult()); - assertTrue(resultNn.getErrors().isEmpty()); + assertEquals(outputNode, result.getResult()); + assertTrue(result.getErrors().isEmpty()); - // InvokeRpcMessageReply message only error + // RPC with response error. doReturn(Futures.immediateCheckedFuture(new DefaultDOMRpcResult(rpcError))) - .when(domRpcService).invokeRpc(any(), any()); + .when(mockDOMRpcService).invokeRpc(any(), any()); - final CheckedFuture resultFutureError = - slaveDomRPCService.invokeRpc(schemaPath, outputNode); + result = slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS); - final DOMRpcResult resultError = resultFutureError.checkedGet(2, TimeUnit.SECONDS); + assertNull(result.getResult()); + assertEquals(rpcError, result.getErrors().iterator().next()); - assertNull(resultError.getResult()); - assertEquals(rpcError, resultError.getErrors().iterator().next()); - - // InvokeRpcMessageReply message error + result + // RPC with response output and error. doReturn(Futures.immediateCheckedFuture(new DefaultDOMRpcResult(outputNode, rpcError))) - .when(domRpcService).invokeRpc(any(), any()); - - final CheckedFuture resultFutureOutputError = - slaveDomRPCService.invokeRpc(schemaPath, outputNode); + .when(mockDOMRpcService).invokeRpc(any(), any()); - final DOMRpcResult resultOutputError = resultFutureOutputError.checkedGet(2, TimeUnit.SECONDS); + final DOMRpcResult resultOutputError = + slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS); assertEquals(outputNode, resultOutputError.getResult()); assertEquals(rpcError, resultOutputError.getErrors().iterator().next()); - // Throwable message + // RPC failure. exception.expect(DOMRpcException.class); - doReturn(Futures.immediateFailedCheckedFuture(new ClusteringRpcException(""))) - .when(domRpcService).invokeRpc(any(), any()); + doReturn(Futures.immediateFailedCheckedFuture(new ClusteringRpcException("mock"))) + .when(mockDOMRpcService).invokeRpc(any(), any()); + + try { + slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS); + } catch (ExecutionException e) { + throw e.getCause(); + } + } + + @Test + public void testSlaveNewTransactionRequests() { + + doReturn(mock(DOMDataReadOnlyTransaction.class)).when(mockDOMDataBroker).newReadOnlyTransaction(); + doReturn(mock(DOMDataReadWriteTransaction.class)).when(mockDOMDataBroker).newReadWriteTransaction(); + doReturn(mock(DOMDataWriteTransaction.class)).when(mockDOMDataBroker).newWriteOnlyTransaction(); - final CheckedFuture resultFutureThrowable = - slaveDomRPCService.invokeRpc(schemaPath, outputNode); + initializeMaster(Collections.emptyList()); + registerSlaveMountPoint(); - resultFutureThrowable.checkedGet(2, TimeUnit.SECONDS); + ArgumentCaptor domDataBrokerCaptor = ArgumentCaptor.forClass(DOMDataBroker.class); + verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), domDataBrokerCaptor.capture()); + final DOMDataBroker slaveDOMDataBroker = domDataBrokerCaptor.getValue(); + assertTrue(slaveDOMDataBroker instanceof ProxyDOMDataBroker); + + slaveDOMDataBroker.newReadOnlyTransaction(); + verify(mockDOMDataBroker).newReadOnlyTransaction(); + + slaveDOMDataBroker.newReadWriteTransaction(); + verify(mockDOMDataBroker).newReadWriteTransaction(); + + slaveDOMDataBroker.newWriteOnlyTransaction(); + verify(mockDOMDataBroker).newWriteOnlyTransaction(); + } + + private ActorRef registerSlaveMountPoint() { + final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props( + NetconfTopologySetupBuilder.create().setActorSystem(system).build(), remoteDeviceId, mockRegistry, + mockSchemaRepository, TIMEOUT, mockMountPointService)); + + doReturn(Futures.immediateFuture(mockSchemaContext)) + .when(mockSchemaContextFactory).createSchemaContext(any()); + + slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2), + masterRef), testKit.getRef()); + + verify(mockMountPointBuilder, timeout(5000)).register(); + verify(mockMountPointBuilder).addInitialSchemaContext(mockSchemaContext); + verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), any()); + verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any()); + verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any()); + + testKit.expectMsgClass(Success.class); + + return slaveRef; + } + + private void initializeMaster(List sourceIdentifiers) { + masterRef.tell(new CreateInitialMasterActorData(mockDOMDataBroker, sourceIdentifiers, + mockDOMRpcService), testKit.getRef()); + + testKit.expectMsgClass(MasterActorDataInitialized.class); + } + + private void resetMountPointMocks() { + reset(mockMountPointReg, mockMountPointBuilder); + + doNothing().when(mockMountPointReg).close(); + + doReturn(mockMountPointBuilder).when(mockMountPointBuilder).addInitialSchemaContext(any()); + doReturn(mockMountPointBuilder).when(mockMountPointBuilder).addService(any(), any()); + doReturn(mockMountPointReg).when(mockMountPointBuilder).register(); } private PotentialSchemaSource withSourceId(final SourceIdentifier identifier) { - return argThat(new ArgumentMatcher() { + return argThat(new ArgumentMatcher>() { @Override public boolean matches(final Object argument) { - final PotentialSchemaSource potentialSchemaSource = (PotentialSchemaSource) argument; + final PotentialSchemaSource potentialSchemaSource = (PotentialSchemaSource) argument; return identifier.equals(potentialSchemaSource.getSourceIdentifier()); } }); } - private String convertStreamToString(final java.io.InputStream is) { - final java.util.Scanner s = new java.util.Scanner(is).useDelimiter("\\A"); - return s.hasNext() ? s.next() : ""; + private String convertStreamToString(final InputStream is) { + try (Scanner scanner = new Scanner(is)) { + return scanner.useDelimiter("\\A").hasNext() ? scanner.next() : ""; + } } - }