package org.opendaylight.netconf.topology.singleton.impl;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.after;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.MockitoAnnotations.initMocks;
-import static org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils.DEFAULT_SCHEMA_REPOSITORY;
-import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
+import akka.actor.Status.Failure;
+import akka.actor.Status.Success;
+import akka.pattern.AskTimeoutException;
import akka.pattern.Patterns;
-import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
+import akka.testkit.javadsl.TestKit;
import akka.util.Timeout;
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.io.ByteSource;
+import com.google.common.net.InetAddresses;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
import java.io.InputStream;
-import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
+import java.util.Collections;
import java.util.List;
+import java.util.Scanner;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
+import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.Mock;
import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringRpcException;
import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
+import org.opendaylight.netconf.topology.singleton.messages.NotMasterException;
import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
+import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
+import org.opendaylight.yangtools.concepts.ObjectRegistration;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
-import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceException;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter;
import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
+import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository;
+import org.opendaylight.yangtools.yang.parser.rfc7950.repo.TextToASTTransformer;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
public class NetconfNodeActorTest {
private static final Timeout TIMEOUT = new Timeout(Duration.create(5, "seconds"));
- private static ActorSystem system;
+ private static final RevisionSourceIdentifier SOURCE_IDENTIFIER1 = RevisionSourceIdentifier.create("yang1");
+ private static final RevisionSourceIdentifier SOURCE_IDENTIFIER2 = RevisionSourceIdentifier.create("yang2");
+
+ private ActorSystem system = ActorSystem.create();
+ private final TestKit testKit = new TestKit(system);
@Rule
public final ExpectedException exception = ExpectedException.none();
private ActorRef masterRef;
private RemoteDeviceId remoteDeviceId;
+ private final SharedSchemaRepository masterSchemaRepository = new SharedSchemaRepository("master");
+
+ @Mock
+ private DOMRpcService mockDOMRpcService;
@Mock
- private DOMRpcService domRpcService;
+ private DOMMountPointService mockMountPointService;
+
@Mock
- private DOMMountPointService mountPointService;
+ private DOMMountPointService.DOMMountPointBuilder mockMountPointBuilder;
+
+ @Mock
+ private ObjectRegistration<DOMMountPoint> mockMountPointReg;
+
+ @Mock
+ private DOMDataBroker mockDOMDataBroker;
+
+ @Mock
+ private SchemaSourceRegistration<?> mockSchemaSourceReg1;
+
+ @Mock
+ private SchemaSourceRegistration<?> mockSchemaSourceReg2;
+
@Mock
- private DataBroker dataBroker;
+ private SchemaSourceRegistry mockRegistry;
+
+ @Mock
+ private SchemaContextFactory mockSchemaContextFactory;
+
+ @Mock
+ private SchemaRepository mockSchemaRepository;
+
+ @Mock
+ private SchemaContext mockSchemaContext;
@Before
- public void setup() throws UnknownHostException {
- initMocks
- (this);
+ public void setup() {
+ initMocks(this);
+
remoteDeviceId = new RemoteDeviceId("netconf-topology",
- new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9999));
- final NetconfTopologySetup setup = mock(NetconfTopologySetup.class);
+ new InetSocketAddress(InetAddresses.forString("127.0.0.1"), 9999));
+
+ masterSchemaRepository.registerSchemaSourceListener(
+ TextToASTTransformer.create(masterSchemaRepository, masterSchemaRepository));
- final Props props = NetconfNodeActor.props(setup, remoteDeviceId, DEFAULT_SCHEMA_REPOSITORY,
- DEFAULT_SCHEMA_REPOSITORY, TIMEOUT, mountPointService);
+ final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create().setActorSystem(system)
+ .setIdleTimeout(Duration.apply(1, TimeUnit.SECONDS)).build();
- system = ActorSystem.create();
+ final Props props = NetconfNodeActor.props(setup, remoteDeviceId, masterSchemaRepository,
+ masterSchemaRepository, TIMEOUT, mockMountPointService);
masterRef = TestActorRef.create(system, props, "master_messages");
+
+ resetMountPointMocks();
+
+ doReturn(mockMountPointBuilder).when(mockMountPointService).createMountPoint(any());
+
+ doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
+ doReturn(mockSchemaSourceReg2).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
+
+ doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
+ .createSchemaContextFactory(any(SchemaSourceFilter.class));
}
@After
public void teardown() {
- JavaTestKit.shutdownActorSystem(system);
+ TestKit.shutdownActorSystem(system, Boolean.TRUE);
system = null;
}
@Test
- public void testInitDataMessages() throws Exception {
+ public void testInitializeAndRefreshMasterData() {
- final DOMDataBroker domDataBroker = mock(DOMDataBroker.class);
- final List<SourceIdentifier> sourceIdentifiers = Lists.newArrayList();
+ // Test CreateInitialMasterActorData.
- /* Test init master data */
+ initializeMaster(Lists.newArrayList());
- final Future<Object> initialDataToActor =
- Patterns.ask(masterRef, new CreateInitialMasterActorData(domDataBroker, sourceIdentifiers,
- domRpcService), TIMEOUT);
+ // Test RefreshSetupMasterActorData.
- final Object success = Await.result(initialDataToActor, TIMEOUT.duration());
- assertTrue(success instanceof MasterActorDataInitialized);
+ final RemoteDeviceId newRemoteDeviceId = new RemoteDeviceId("netconf-topology2",
+ new InetSocketAddress(InetAddresses.forString("127.0.0.2"), 9999));
+ final NetconfTopologySetup newSetup = NetconfTopologySetupBuilder.create().setActorSystem(system).build();
- /* Test refresh master data */
+ masterRef.tell(new RefreshSetupMasterActorData(newSetup, newRemoteDeviceId), testKit.getRef());
- final RemoteDeviceId remoteDeviceId2 = new RemoteDeviceId("netconf-topology2",
- new InetSocketAddress(InetAddress.getByName("127.0.0.2"), 9999));
+ testKit.expectMsgClass(MasterActorDataInitialized.class);
+ }
+
+ @Test
+ public void tesAskForMasterMountPoint() {
+
+ // Test with master not setup yet.
+
+ final TestKit kit = new TestKit(system);
+
+ masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef());
+
+ final Failure failure = kit.expectMsgClass(Failure.class);
+ assertTrue(failure.cause() instanceof NotMasterException);
- final NetconfTopologySetup setup2 = mock(NetconfTopologySetup.class);
+ // Now initialize - master should send the RegisterMountPoint message.
- final Future<Object> refreshDataToActor =
- Patterns.ask(masterRef, new RefreshSetupMasterActorData(setup2, remoteDeviceId2),
- TIMEOUT);
+ List<SourceIdentifier> sourceIdentifiers = Lists.newArrayList(RevisionSourceIdentifier.create("testID"));
+ initializeMaster(sourceIdentifiers);
- final Object success2 = Await.result(refreshDataToActor, TIMEOUT.duration());
- assertTrue(success2 instanceof MasterActorDataInitialized);
+ masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef());
+ final RegisterMountPoint registerMountPoint = kit.expectMsgClass(RegisterMountPoint.class);
+
+ assertEquals(sourceIdentifiers, registerMountPoint.getSourceIndentifiers());
}
@Test
- public void testRegisterMountPointMessage() throws Exception {
+ public void testRegisterAndUnregisterMountPoint() throws Exception {
- final DOMDataBroker domDataBroker = mock(DOMDataBroker.class);
- final List<SourceIdentifier> sourceIdentifiers =
- Lists.newArrayList(RevisionSourceIdentifier.create("testID", Optional.absent()));
+ ActorRef slaveRef = registerSlaveMountPoint();
- // init master data
+ // Unregister
- final Future<Object> initialDataToActor =
- Patterns.ask(masterRef, new CreateInitialMasterActorData(domDataBroker, sourceIdentifiers,
- domRpcService), TIMEOUT);
+ slaveRef.tell(new UnregisterSlaveMountPoint(), testKit.getRef());
- final Object successInit = Await.result(initialDataToActor, TIMEOUT.duration());
+ verify(mockMountPointReg, timeout(5000)).close();
+ verify(mockSchemaSourceReg1, timeout(1000)).close();
+ verify(mockSchemaSourceReg2, timeout(1000)).close();
- assertTrue(successInit instanceof MasterActorDataInitialized);
+ // Test registration with another interleaved registration that completes while the first registration
+ // is resolving the schema context.
- // test if slave get right identifiers from master
+ reset(mockSchemaSourceReg1, mockRegistry, mockSchemaRepository);
+ resetMountPointMocks();
- final Future<Object> registerMountPointFuture =
- Patterns.ask(masterRef, new AskForMasterMountPoint(),
- TIMEOUT);
+ doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
- final RegisterMountPoint success =
- (RegisterMountPoint) Await.result(registerMountPointFuture, TIMEOUT.duration());
+ doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
+ .createSchemaContextFactory(any(SchemaSourceFilter.class));
- assertEquals(sourceIdentifiers, success.getSourceIndentifiers());
+ final SchemaSourceRegistration<?> newMockSchemaSourceReg = mock(SchemaSourceRegistration.class);
- }
+ final SchemaContextFactory newMockSchemaContextFactory = mock(SchemaContextFactory.class);
+ doReturn(Futures.immediateFuture(mockSchemaContext))
+ .when(newMockSchemaContextFactory).createSchemaContext(any());
+
+ doAnswer(unused -> {
+ SettableFuture<SchemaContext> future = SettableFuture.create();
+ new Thread(() -> {
+ doReturn(newMockSchemaSourceReg).when(mockRegistry).registerSchemaSource(any(),
+ withSourceId(SOURCE_IDENTIFIER1));
+
+ doReturn(newMockSchemaContextFactory).when(mockSchemaRepository)
+ .createSchemaContextFactory(any(SchemaSourceFilter.class));
+
+ slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef),
+ testKit.getRef());
+
+ future.set(mockSchemaContext);
+ }).start();
+ return future;
+ }).when(mockSchemaContextFactory).createSchemaContext(any());
+
+ doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
+ .createSchemaContextFactory(any(SchemaSourceFilter.class));
+
+ slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef), testKit.getRef());
+
+ verify(mockMountPointBuilder, timeout(5000)).register();
+ verify(mockMountPointBuilder, after(500)).addInitialSchemaContext(mockSchemaContext);
+ verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), any());
+ verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
+ verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
+ verify(mockSchemaSourceReg1).close();
+ verify(mockRegistry, times(2)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
+ verify(mockSchemaRepository, times(2)).createSchemaContextFactory(any(SchemaSourceFilter.class));
+ verifyNoMoreInteractions(mockMountPointBuilder, newMockSchemaSourceReg);
+
+ // Stop the slave actor and verify schema source registrations are closed.
- @Test
- public void testReceiveRegisterMountpoint() throws Exception {
- final NetconfTopologySetup setup = mock(NetconfTopologySetup.class);
- final RevisionSourceIdentifier yang1 = RevisionSourceIdentifier.create("yang1");
- final RevisionSourceIdentifier yang2 = RevisionSourceIdentifier.create("yang2");
- final SchemaSourceRegistry registry = mock(SchemaSourceRegistry.class);
- final SchemaRepository schemaRepository = mock(SchemaRepository.class);
- final SchemaSourceRegistration regYang1 = mock(SchemaSourceRegistration.class);
- final SchemaSourceRegistration regYang2 = mock(SchemaSourceRegistration.class);
- doReturn(regYang1).when(registry).registerSchemaSource(any(), withSourceId(yang1));
- doReturn(regYang2).when(registry).registerSchemaSource(any(), withSourceId(yang2));
- final SchemaContextFactory schemaContextFactory = mock(SchemaContextFactory.class);
- doReturn(schemaContextFactory).when(schemaRepository).createSchemaContextFactory(any());
- final SettableFuture<SchemaContext> schemaContextFuture = SettableFuture.create();
- final CheckedFuture<SchemaContext, SchemaResolutionException> checkedFuture =
- Futures.makeChecked(schemaContextFuture, e -> new SchemaResolutionException("fail", e));
- doReturn(checkedFuture).when(schemaContextFactory).createSchemaContext(any());
- final ActorRef slaveRef =
- system.actorOf(NetconfNodeActor.props(setup, remoteDeviceId, registry, schemaRepository, TIMEOUT,
- mountPointService));
- final List<SourceIdentifier> sources = ImmutableList.of(yang1, yang2);
- slaveRef.tell(new RegisterMountPoint(sources), masterRef);
-
- verify(registry, timeout(1000)).registerSchemaSource(any(), withSourceId(yang1));
- verify(registry, timeout(1000)).registerSchemaSource(any(), withSourceId(yang2));
- //stop actor
final Future<Boolean> stopFuture = Patterns.gracefulStop(slaveRef, TIMEOUT.duration());
Await.result(stopFuture, TIMEOUT.duration());
- //provider should be deregistered
- verify(regYang1).close();
- verify(regYang2).close();
+
+ verify(mockMountPointReg).close();
+ verify(newMockSchemaSourceReg).close();
}
+ @SuppressWarnings("unchecked")
@Test
- public void testYangTextSchemaSourceRequestMessage() throws Exception {
- final SchemaRepository schemaRepository = mock(SchemaRepository.class);
- final SourceIdentifier sourceIdentifier = RevisionSourceIdentifier.create("testID", Optional.absent());
- final Props props = NetconfNodeActor.props(mock(NetconfTopologySetup.class), remoteDeviceId,
- DEFAULT_SCHEMA_REPOSITORY, schemaRepository, TIMEOUT, mountPointService);
+ public void testRegisterMountPointWithSchemaFailures() throws Exception {
+ final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create().setActorSystem(system).build();
- final ActorRef actorRefSchemaRepo = TestActorRef.create(system, props, "master_mocked_schema_repository");
- final ActorContext actorContext = mock(ActorContext.class);
- doReturn(system.dispatcher()).when(actorContext).dispatcher();
+ final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props(setup, remoteDeviceId, mockRegistry,
+ mockSchemaRepository, TIMEOUT, mockMountPointService));
- final ProxyYangTextSourceProvider proxyYang =
- new ProxyYangTextSourceProvider(actorRefSchemaRepo, actorContext, TIMEOUT);
- // test if asking for source is resolved and sended back
+ // Test unrecoverable failure.
- final YangTextSchemaSource yangTextSchemaSource = new YangTextSchemaSource(sourceIdentifier) {
- @Override
- protected MoreObjects.ToStringHelper addToStringAttributes(final MoreObjects.ToStringHelper toStringHelper) {
- return null;
- }
+ doReturn(Futures.immediateFailedFuture(new SchemaResolutionException("mock")))
+ .when(mockSchemaContextFactory).createSchemaContext(any());
- @Override
- public InputStream openStream() throws IOException {
- return new ByteArrayInputStream("YANG".getBytes());
- }
- };
+ slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
+ masterRef), testKit.getRef());
+ testKit.expectMsgClass(Success.class);
- final CheckedFuture<YangTextSchemaSource, SchemaSourceException> result =
- Futures.immediateCheckedFuture(yangTextSchemaSource);
+ verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
+ verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
- doReturn(result).when(schemaRepository).getSchemaSource(sourceIdentifier, YangTextSchemaSource.class);
+ verify(mockMountPointBuilder, after(1000).never()).register();
+ verify(mockSchemaSourceReg1, timeout(1000)).close();
+ verify(mockSchemaSourceReg2, timeout(1000)).close();
- final Future<YangTextSchemaSourceSerializationProxy> resolvedSchema =
- proxyYang.getYangTextSchemaSource(sourceIdentifier);
+ // Test recoverable AskTimeoutException - schema context resolution should be retried.
- final YangTextSchemaSourceSerializationProxy success = Await.result(resolvedSchema, TIMEOUT.duration());
+ reset(mockSchemaSourceReg1, mockSchemaSourceReg2);
- assertEquals(sourceIdentifier, success.getRepresentation().getIdentifier());
- assertEquals("YANG", convertStreamToString(success.getRepresentation().openStream()));
+ doReturn(Futures.immediateFailedFuture(new SchemaResolutionException("mock",
+ new AskTimeoutException("timeout"))))
+ .doReturn(Futures.immediateFuture(mockSchemaContext))
+ .when(mockSchemaContextFactory).createSchemaContext(any());
+ slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
+ masterRef), testKit.getRef());
- // test if asking for source is missing
- exception.expect(MissingSchemaSourceException.class);
+ testKit.expectMsgClass(Success.class);
+
+ verify(mockMountPointBuilder, timeout(5000)).register();
+ verifyNoMoreInteractions(mockSchemaSourceReg1, mockSchemaSourceReg2);
+
+ // Test AskTimeoutException with an interleaved successful registration. The first schema context resolution
+ // attempt should not be retried.
- final SchemaSourceException schemaSourceException =
- new MissingSchemaSourceException("Fail", sourceIdentifier);
+ reset(mockSchemaSourceReg1, mockSchemaSourceReg2, mockSchemaRepository, mockSchemaContextFactory);
+ resetMountPointMocks();
- final CheckedFuture<YangTextSchemaSource, SchemaSourceException> resultFail =
- Futures.immediateFailedCheckedFuture(schemaSourceException);
+ final SchemaContextFactory mockSchemaContextFactorySuccess = mock(SchemaContextFactory.class);
+ doReturn(Futures.immediateFuture(mockSchemaContext))
+ .when(mockSchemaContextFactorySuccess).createSchemaContext(any());
- doReturn(resultFail).when(schemaRepository).getSchemaSource(sourceIdentifier, YangTextSchemaSource.class);
+ doAnswer(unused -> {
+ SettableFuture<SchemaContext> future = SettableFuture.create();
+ new Thread(() -> {
+ doReturn(mockSchemaContextFactorySuccess).when(mockSchemaRepository)
+ .createSchemaContextFactory(any(SchemaSourceFilter.class));
- final Future<YangTextSchemaSourceSerializationProxy> failedSchema =
- proxyYang.getYangTextSchemaSource(sourceIdentifier);
+ slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
+ masterRef), testKit.getRef());
- Await.result(failedSchema, TIMEOUT.duration());
+ future.setException(new SchemaResolutionException("mock", new AskTimeoutException("timeout")));
+ }).start();
+ return future;
+ }).when(mockSchemaContextFactory).createSchemaContext(any());
+ doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
+ .createSchemaContextFactory(any(SchemaSourceFilter.class));
+
+ slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
+ masterRef), testKit.getRef());
+
+ verify(mockMountPointBuilder, timeout(5000)).register();
+ verify(mockSchemaRepository, times(2)).createSchemaContextFactory(any(SchemaSourceFilter.class));
}
@Test
- public void testProxyDOMRpcService() throws Exception {
+ public void testYangTextSchemaSourceRequest() throws Exception {
+ final SourceIdentifier sourceIdentifier = RevisionSourceIdentifier.create("testID");
- final DOMDataBroker domDataBroker = mock(DOMDataBroker.class);
- final List<SourceIdentifier> sourceIdentifiers =
- Lists.newArrayList(RevisionSourceIdentifier.create("testID", Optional.absent()));
+ final ProxyYangTextSourceProvider proxyYangProvider =
+ new ProxyYangTextSourceProvider(masterRef, system.dispatcher(), TIMEOUT);
- // init master data
+ final YangTextSchemaSource yangTextSchemaSource = YangTextSchemaSource.delegateForByteSource(sourceIdentifier,
+ ByteSource.wrap("YANG".getBytes(UTF_8)));
- final Future<Object> initialDataToActor =
- Patterns.ask(masterRef, new CreateInitialMasterActorData(domDataBroker, sourceIdentifiers,
- domRpcService), TIMEOUT);
+ // Test success.
- final Object successInit = Await.result(initialDataToActor, TIMEOUT.duration());
+ final SchemaSourceRegistration<YangTextSchemaSource> schemaSourceReg = masterSchemaRepository
+ .registerSchemaSource(id -> Futures.immediateFuture(yangTextSchemaSource),
+ PotentialSchemaSource.create(sourceIdentifier, YangTextSchemaSource.class, 1));
- assertTrue(successInit instanceof MasterActorDataInitialized);
+ final Future<YangTextSchemaSourceSerializationProxy> resolvedSchemaFuture =
+ proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
- // test if slave get right identifiers from master
+ final YangTextSchemaSourceSerializationProxy success = Await.result(resolvedSchemaFuture, TIMEOUT.duration());
+
+ assertEquals(sourceIdentifier, success.getRepresentation().getIdentifier());
+ assertEquals("YANG", convertStreamToString(success.getRepresentation().openStream()));
+
+ // Test missing source failure.
+
+ exception.expect(MissingSchemaSourceException.class);
- final ProxyDOMRpcService slaveDomRPCService = new ProxyDOMRpcService(system, masterRef, remoteDeviceId, TIMEOUT);
+ schemaSourceReg.close();
- final SchemaPath schemaPath = SchemaPath.create(true, QName.create("TestQname"));
+ final Future<YangTextSchemaSourceSerializationProxy> failedSchemaFuture =
+ proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
+
+ Await.result(failedSchemaFuture, TIMEOUT.duration());
+ }
+
+ @Test
+ @SuppressWarnings({"checkstyle:AvoidHidingCauseException", "checkstyle:IllegalThrows"})
+ public void testSlaveInvokeRpc() throws Throwable {
+
+ final List<SourceIdentifier> sourceIdentifiers =
+ Lists.newArrayList(RevisionSourceIdentifier.create("testID"));
+
+ initializeMaster(sourceIdentifiers);
+ registerSlaveMountPoint();
+
+ ArgumentCaptor<DOMRpcService> domRPCServiceCaptor = ArgumentCaptor.forClass(DOMRpcService.class);
+ verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), domRPCServiceCaptor.capture());
+
+ final DOMRpcService slaveDomRPCService = domRPCServiceCaptor.getValue();
+ assertTrue(slaveDomRPCService instanceof ProxyDOMRpcService);
+
+ final QName testQName = QName.create("", "TestQname");
+ final SchemaPath schemaPath = SchemaPath.create(true, testQName);
final NormalizedNode<?, ?> outputNode = ImmutableContainerNodeBuilder.create()
- .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("TestQname")))
- .withChild(ImmutableNodes.leafNode(QName.create("NodeQname"), "foo")).build();
+ .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(testQName))
+ .withChild(ImmutableNodes.leafNode(testQName, "foo")).build();
final RpcError rpcError = RpcResultBuilder.newError(RpcError.ErrorType.RPC, null, "Rpc invocation failed.");
- // EmptyResultResponse message
-
- doReturn(Futures.immediateCheckedFuture(null)).when(domRpcService).invokeRpc(any(), any());
- final CheckedFuture<DOMRpcResult, DOMRpcException> resultFutureEmpty =
- slaveDomRPCService.invokeRpc(schemaPath, outputNode);
+ // RPC with no response output.
- final Object resultNull = resultFutureEmpty.checkedGet(2, TimeUnit.SECONDS);
+ doReturn(Futures.immediateCheckedFuture(null)).when(mockDOMRpcService).invokeRpc(any(), any());
- assertEquals(null, resultNull);
+ DOMRpcResult result = slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS);
- // InvokeRpcMessageReply message
+ assertEquals(null, result);
- doReturn(Futures.immediateCheckedFuture(new DefaultDOMRpcResult(outputNode))).
- when(domRpcService).invokeRpc(any(), any());
+ // RPC with response output.
- final CheckedFuture<DOMRpcResult, DOMRpcException> resultFutureNn =
- slaveDomRPCService.invokeRpc(schemaPath, outputNode);
+ doReturn(Futures.immediateCheckedFuture(new DefaultDOMRpcResult(outputNode)))
+ .when(mockDOMRpcService).invokeRpc(any(), any());
- final DOMRpcResult resultNn = resultFutureNn.checkedGet(2, TimeUnit.SECONDS);
+ result = slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS);
- assertEquals(outputNode, resultNn.getResult());
- assertTrue(resultNn.getErrors().isEmpty());
+ assertEquals(outputNode, result.getResult());
+ assertTrue(result.getErrors().isEmpty());
- // InvokeRpcMessageReply message only error
+ // RPC with response error.
doReturn(Futures.immediateCheckedFuture(new DefaultDOMRpcResult(rpcError)))
- .when(domRpcService).invokeRpc(any(), any());
-
- final CheckedFuture<DOMRpcResult, DOMRpcException> resultFutureError =
- slaveDomRPCService.invokeRpc(schemaPath, outputNode);
+ .when(mockDOMRpcService).invokeRpc(any(), any());
- final DOMRpcResult resultError = resultFutureError.checkedGet(2, TimeUnit.SECONDS);
+ result = slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS);
- assertNull(resultError.getResult());
- assertEquals(rpcError, resultError.getErrors().iterator().next());
+ assertNull(result.getResult());
+ assertEquals(rpcError, result.getErrors().iterator().next());
- // InvokeRpcMessageReply message error + result
+ // RPC with response output and error.
doReturn(Futures.immediateCheckedFuture(new DefaultDOMRpcResult(outputNode, rpcError)))
- .when(domRpcService).invokeRpc(any(), any());
+ .when(mockDOMRpcService).invokeRpc(any(), any());
- final CheckedFuture<DOMRpcResult, DOMRpcException> resultFutureOutputError =
- slaveDomRPCService.invokeRpc(schemaPath, outputNode);
-
- final DOMRpcResult resultOutputError = resultFutureOutputError.checkedGet(2, TimeUnit.SECONDS);
+ final DOMRpcResult resultOutputError =
+ slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS);
assertEquals(outputNode, resultOutputError.getResult());
assertEquals(rpcError, resultOutputError.getErrors().iterator().next());
- // Throwable message
+ // RPC failure.
exception.expect(DOMRpcException.class);
- doReturn(Futures.immediateFailedCheckedFuture(new ClusteringRpcException("")))
- .when(domRpcService).invokeRpc(any(), any());
+ doReturn(Futures.immediateFailedCheckedFuture(new ClusteringRpcException("mock")))
+ .when(mockDOMRpcService).invokeRpc(any(), any());
+
+ try {
+ slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS);
+ } catch (ExecutionException e) {
+ throw e.getCause();
+ }
+ }
+
+ @Test
+ public void testSlaveNewTransactionRequests() {
+
+ doReturn(mock(DOMDataReadOnlyTransaction.class)).when(mockDOMDataBroker).newReadOnlyTransaction();
+ doReturn(mock(DOMDataReadWriteTransaction.class)).when(mockDOMDataBroker).newReadWriteTransaction();
+ doReturn(mock(DOMDataWriteTransaction.class)).when(mockDOMDataBroker).newWriteOnlyTransaction();
+
+ initializeMaster(Collections.emptyList());
+ registerSlaveMountPoint();
+
+ ArgumentCaptor<DOMDataBroker> domDataBrokerCaptor = ArgumentCaptor.forClass(DOMDataBroker.class);
+ verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), domDataBrokerCaptor.capture());
+
+ final DOMDataBroker slaveDOMDataBroker = domDataBrokerCaptor.getValue();
+ assertTrue(slaveDOMDataBroker instanceof ProxyDOMDataBroker);
+
+ slaveDOMDataBroker.newReadOnlyTransaction();
+ verify(mockDOMDataBroker).newReadOnlyTransaction();
+
+ slaveDOMDataBroker.newReadWriteTransaction();
+ verify(mockDOMDataBroker).newReadWriteTransaction();
+
+ slaveDOMDataBroker.newWriteOnlyTransaction();
+ verify(mockDOMDataBroker).newWriteOnlyTransaction();
+ }
+
+ private ActorRef registerSlaveMountPoint() {
+ final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props(
+ NetconfTopologySetupBuilder.create().setActorSystem(system).build(), remoteDeviceId, mockRegistry,
+ mockSchemaRepository, TIMEOUT, mockMountPointService));
+
+ doReturn(Futures.immediateFuture(mockSchemaContext))
+ .when(mockSchemaContextFactory).createSchemaContext(any());
+
+ slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
+ masterRef), testKit.getRef());
+
+ verify(mockMountPointBuilder, timeout(5000)).register();
+ verify(mockMountPointBuilder).addInitialSchemaContext(mockSchemaContext);
+ verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), any());
+ verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
+ verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
+
+ testKit.expectMsgClass(Success.class);
- final CheckedFuture<DOMRpcResult, DOMRpcException> resultFutureThrowable =
- slaveDomRPCService.invokeRpc(schemaPath, outputNode);
+ return slaveRef;
+ }
+
+ private void initializeMaster(List<SourceIdentifier> sourceIdentifiers) {
+ masterRef.tell(new CreateInitialMasterActorData(mockDOMDataBroker, sourceIdentifiers,
+ mockDOMRpcService), testKit.getRef());
+
+ testKit.expectMsgClass(MasterActorDataInitialized.class);
+ }
+
+ private void resetMountPointMocks() {
+ reset(mockMountPointReg, mockMountPointBuilder);
- resultFutureThrowable.checkedGet(2, TimeUnit.SECONDS);
+ doNothing().when(mockMountPointReg).close();
+ doReturn(mockMountPointBuilder).when(mockMountPointBuilder).addInitialSchemaContext(any());
+ doReturn(mockMountPointBuilder).when(mockMountPointBuilder).addService(any(), any());
+ doReturn(mockMountPointReg).when(mockMountPointBuilder).register();
}
private PotentialSchemaSource<?> withSourceId(final SourceIdentifier identifier) {
- return argThat(new ArgumentMatcher<PotentialSchemaSource>() {
+ return argThat(new ArgumentMatcher<PotentialSchemaSource<?>>() {
@Override
public boolean matches(final Object argument) {
- final PotentialSchemaSource potentialSchemaSource = (PotentialSchemaSource) argument;
+ final PotentialSchemaSource<?> potentialSchemaSource = (PotentialSchemaSource<?>) argument;
return identifier.equals(potentialSchemaSource.getSourceIdentifier());
}
});
}
- private String convertStreamToString(final java.io.InputStream is) {
- final java.util.Scanner s = new java.util.Scanner(is).useDelimiter("\\A");
- return s.hasNext() ? s.next() : "";
+ private String convertStreamToString(final InputStream is) {
+ try (Scanner scanner = new Scanner(is)) {
+ return scanner.useDelimiter("\\A").hasNext() ? scanner.next() : "";
+ }
}
-
}