import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.mdsal.core.general.entity.rev150930.Entity;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.mdsal.core.general.entity.rev150930.EntityBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.mdsal.core.general.entity.rev150930.EntityKey;
-import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.SystemMapNode;
-import org.opendaylight.yangtools.yang.data.api.schema.builder.CollectionNodeBuilder;
-import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
-import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.spi.node.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@RunWith(MockitoJUnitRunner.StrictStubs.class)
@Test
public void testSourceToSink() throws InterruptedException, ExecutionException {
// Make sure to start source...
- final Registration source = NettyReplicationSource.createSource(support, getDomBroker(), css, true, TEST_PORT,
- Duration.ZERO, 5);
- // ... and give it some time start up and open up the port
- Thread.sleep(1000);
-
- // Mocking for sink...
- final DOMTransactionChain sinkChain = mock(DOMTransactionChain.class);
- final DOMDataTreeWriteTransaction sinkTx = mock(DOMDataTreeWriteTransaction.class);
- doReturn(CommitInfo.emptyFluentFuture()).when(sinkTx).commit();
- doReturn(sinkTx).when(sinkChain).newWriteOnlyTransaction();
- final DOMDataBroker sinkBroker = mock(DOMDataBroker.class);
- doReturn(sinkChain).when(sinkBroker).createMergingTransactionChain();
-
- // Kick of the sink ...
- final Registration sink = NettyReplicationSink.createSink(support, sinkBroker, css, true,
- Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO, Duration.ZERO, 3);
- // ... and sync on it starting up
-
- // verify the connection was established and MSG_EMPTY_DATA was transferred
- verify(sinkBroker, timeout(1000)).createMergingTransactionChain();
- verify(sinkTx, timeout(1000)).put(eq(LogicalDatastoreType.CONFIGURATION), eq(YangInstanceIdentifier.of()),
- any(ContainerNode.class));
-
- // generate some deltas
- final int deltaCount = 5;
- generateModification(getDataBroker(), deltaCount);
-
- // verify that all the deltas were transferred and committed + 1 invocation from receiving MSG_EMPTY_DATA
- verify(sinkChain, timeout(2000).times(deltaCount + 1)).newWriteOnlyTransaction();
- verify(sinkTx, timeout(2000).times(deltaCount + 1)).commit();
-
- sink.close();
- source.close();
+ try (var source = NettyReplicationSource.createSource(support, getDomBroker(), css, true, TEST_PORT,
+ Duration.ZERO, 5)) {
+ // ... and give it some time start up and open up the port
+ Thread.sleep(1000);
+
+ // Mocking for sink...
+ final DOMTransactionChain sinkChain = mock(DOMTransactionChain.class);
+ final DOMDataTreeWriteTransaction sinkTx = mock(DOMDataTreeWriteTransaction.class);
+ doReturn(CommitInfo.emptyFluentFuture()).when(sinkTx).commit();
+ doReturn(sinkTx).when(sinkChain).newWriteOnlyTransaction();
+ final DOMDataBroker sinkBroker = mock(DOMDataBroker.class);
+ doReturn(sinkChain).when(sinkBroker).createMergingTransactionChain();
+
+ // Kick of the sink ...
+ try (var sink = NettyReplicationSink.createSink(support, sinkBroker, css, true,
+ Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO, Duration.ZERO, 3)) {
+ // ... and sync on it starting up
+
+ // verify the connection was established and MSG_EMPTY_DATA was transferred
+ verify(sinkBroker, timeout(1000)).createMergingTransactionChain();
+ verify(sinkTx, timeout(1000)).put(eq(LogicalDatastoreType.CONFIGURATION),
+ eq(YangInstanceIdentifier.of()), any(ContainerNode.class));
+
+ // generate some deltas
+ final int deltaCount = 5;
+ generateModification(getDataBroker(), deltaCount);
+
+ // verify that all the deltas were transferred and committed + 1 invocation from receiving
+ // MSG_EMPTY_DATA
+ verify(sinkChain, timeout(2000).times(deltaCount + 1)).newWriteOnlyTransaction();
+ verify(sinkTx, timeout(2000).times(deltaCount + 1)).commit();
+ }
+ }
}
/**
generateModification(getDataBroker(), deltaCount);
// Make sure to start source...
- final Registration source = NettyReplicationSource.createSource(support, getDomBroker(), css, true, TEST_PORT,
- Duration.ZERO, 5);
- // ... and give it some time start up and open up the port
- Thread.sleep(1000);
-
- // Mocking for sink...
- final DOMTransactionChain sinkChain = mock(DOMTransactionChain.class);
- final DOMDataTreeWriteTransaction sinkTx = mock(DOMDataTreeWriteTransaction.class);
- doReturn(CommitInfo.emptyFluentFuture()).when(sinkTx).commit();
- doReturn(sinkTx).when(sinkChain).newWriteOnlyTransaction();
- final DOMDataBroker sinkBroker = mock(DOMDataBroker.class);
- doReturn(sinkChain).when(sinkBroker).createMergingTransactionChain();
-
- // Kick of the sink ...
- final Registration sink = NettyReplicationSink.createSink(support, sinkBroker, css, true,
- Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO, Duration.ZERO, 3);
- // ... and sync on it starting up
-
- // verify the connection was established and MSG_EMPTY_DATA was transferred
- verify(sinkBroker, timeout(1000)).createMergingTransactionChain();
- verify(sinkChain, timeout(2000).times(1)).newWriteOnlyTransaction();
-
- // verify that the initial data invoked onDataTreeChanged() and was transferred to sink
- ArgumentCaptor<NormalizedNode> dataCaptor = ArgumentCaptor.forClass(NormalizedNode.class);
- verify(sinkTx, timeout(2000).times(1)).put(any(), any(), dataCaptor.capture());
- // verify that the initial state contains everything
- NormalizedNode capturedInitialState = dataCaptor.getAllValues().iterator().next();
- NormalizedNode expectedEntityState = generateNormalizedNodeForEntities(deltaCount);
- assertEquals(expectedEntityState, capturedInitialState);
-
- verify(sinkTx, timeout(2000).times(1)).commit();
-
- sink.close();
- source.close();
+ try (var source = NettyReplicationSource.createSource(support, getDomBroker(), css, true, TEST_PORT,
+ Duration.ZERO, 5)) {
+ // ... and give it some time start up and open up the port
+ Thread.sleep(1000);
+
+ // Mocking for sink...
+ final DOMTransactionChain sinkChain = mock(DOMTransactionChain.class);
+ final DOMDataTreeWriteTransaction sinkTx = mock(DOMDataTreeWriteTransaction.class);
+ doReturn(CommitInfo.emptyFluentFuture()).when(sinkTx).commit();
+ doReturn(sinkTx).when(sinkChain).newWriteOnlyTransaction();
+ final DOMDataBroker sinkBroker = mock(DOMDataBroker.class);
+ doReturn(sinkChain).when(sinkBroker).createMergingTransactionChain();
+
+ // Kick of the sink ...
+ try (var sink = NettyReplicationSink.createSink(support, sinkBroker, css, true,
+ Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO, Duration.ZERO, 3)) {
+ // ... and sync on it starting up
+
+ // verify the connection was established and MSG_EMPTY_DATA was transferred
+ verify(sinkBroker, timeout(1000)).createMergingTransactionChain();
+ verify(sinkChain, timeout(2000).times(1)).newWriteOnlyTransaction();
+
+ // verify that the initial data invoked onDataTreeChanged() and was transferred to sink
+ final var dataCaptor = ArgumentCaptor.forClass(NormalizedNode.class);
+ verify(sinkTx, timeout(2000).times(1)).put(any(), any(), dataCaptor.capture());
+ // verify that the initial state contains everything
+ NormalizedNode capturedInitialState = dataCaptor.getAllValues().iterator().next();
+ NormalizedNode expectedEntityState = generateNormalizedNodeForEntities(deltaCount);
+ assertEquals(expectedEntityState, capturedInitialState);
+
+ verify(sinkTx, timeout(2000).times(1)).commit();
+ }
+ }
}
private static ContainerNode generateNormalizedNodeForEntities(final int amount) {
- final CollectionNodeBuilder<MapEntryNode, SystemMapNode> builder = ImmutableNodes.mapNodeBuilder(ENTITY_QNAME);
+ final var builder = ImmutableNodes.newSystemMapBuilder().withNodeIdentifier(new NodeIdentifier(ENTITY_QNAME));
for (int i = 0; i < amount; i++) {
- builder.withChild(ImmutableNodes.mapEntry(ENTITY_QNAME, ENTITY_NAME_QNAME, "testEntity" + i));
+ final var name = "testEntity" + i;
+ builder.withChild(ImmutableNodes.newMapEntryBuilder()
+ .withNodeIdentifier(NodeIdentifierWithPredicates.of(ENTITY_QNAME, ENTITY_NAME_QNAME, name))
+ .build());
}
- return Builders.containerBuilder()
+ return ImmutableNodes.newContainerBuilder()
.withNodeIdentifier(new NodeIdentifier(SchemaContext.NAME))
.withChild(builder.build())
.build();