import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
import org.opendaylight.yangtools.concepts.AbstractRegistration;
import org.opendaylight.yangtools.concepts.Registration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public final class NettyReplication {
+ private static final Logger LOG = LoggerFactory.getLogger(NettyReplication.class);
+
private static final class Disabled extends AbstractRegistration {
@Override
protected void removeRegistration() {
public static Registration createSink(final BootstrapSupport bootstrapSupport, final DOMDataBroker dataBroker,
final ClusterSingletonServiceProvider singletonService, final boolean enabled,
final InetAddress sourceAddress, final int sourcePort, final Duration reconnectDelay) {
+ LOG.debug("Sink {}", enabled ? "enabled" : "disabled");
return enabled ? singletonService.registerClusterSingletonService(new SinkSingletonService(bootstrapSupport,
dataBroker, new InetSocketAddress(sourceAddress, sourcePort), reconnectDelay)) : new Disabled();
}
public static Registration createSource(final BootstrapSupport bootstrapSupport, final DOMDataBroker dataBroker,
final ClusterSingletonServiceProvider singletonService, final boolean enabled, final int listenPort) {
+ LOG.debug("Source {}", enabled ? "enabled" : "disabled");
final DOMDataTreeChangeService dtcs = dataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
verify(dtcs != null, "Missing DOMDataTreeChangeService in broker %s", dataBroker);
<?xml version="1.0" encoding="UTF-8"?>
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
- xmlns:odl="http://opendaylight.org/xmlns/blueprint/v1.0.0"
xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.2.0">
- <cm:property-placeholder persistent-id="org.opendaylight.mdsal.replicate.netty.sink" update-strategy="reload">
+ <cm:property-placeholder persistent-id="org.opendaylight.mdsal.replicate.netty.sink" update-strategy="reload"
+ placeholder-prefix="$(" placeholder-suffix=")">
<cm:default-properties>
<cm:property name="enabled" value="false"/>
<cm:property name="source-host" value="127.0.0.1"/>
</cm:default-properties>
</cm:property-placeholder>
- <reference id="dataBroker" interface="org.opendaylight.mdsal.dom.api.DOMDataBroker" odl:type="default"/>
- <reference id="singletonServiceProvider" interface="org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider"
- odl:type="default"/>
- <reference id="bootstrapSupport" interface="org.opendaylight.mdsal.replicate.netty.BootstrapSupport"/>
-
<bean id="reconnectDelay" class="java.time.Duration" factory-method="ofMillis">
- <argument value="${reconnect-delay-millis}"/>
+ <argument value="$(reconnect-delay-millis)"/>
</bean>
<bean id="sourceAddress" class="java.net.InetAddress" factory-method="getByName">
- <argument value="${source-host}"/>
+ <argument value="$(source-host)"/>
</bean>
<bean id="nettyReplicationSink" class="org.opendaylight.mdsal.replicate.netty.NettyReplication"
<argument ref="bootstrapSupport"/>
<argument ref="dataBroker"/>
<argument ref="singletonServiceProvider"/>
- <argument value="${enabled}"/>
+ <argument value="$(enabled)"/>
<argument ref="sourceAddress"/>
- <argument value="${source-port}"/>
+ <argument value="$(source-port)"/>
<argument ref="reconnectDelay"/>
</bean>
*/
package org.opendaylight.mdsal.replicate.netty;
+import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import java.net.Inet4Address;
import java.time.Duration;
+import java.util.concurrent.ExecutionException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
import org.mockito.junit.MockitoJUnitRunner;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.WriteTransaction;
import org.opendaylight.mdsal.binding.dom.adapter.test.AbstractDataBrokerTest;
import org.opendaylight.mdsal.common.api.CommitInfo;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
import org.opendaylight.mdsal.eos.dom.simple.SimpleDOMEntityOwnershipService;
import org.opendaylight.mdsal.singleton.dom.impl.DOMClusterSingletonServiceProviderImpl;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.mdsal.core.general.entity.rev150930.Entity;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.mdsal.core.general.entity.rev150930.EntityBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.mdsal.core.general.entity.rev150930.EntityKey;
import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class IntegrationTest extends AbstractDataBrokerTest {
private static final int TEST_PORT = 4000;
+ private static final QName ENTITY_QNAME = Entity.QNAME;
+ private static final QName ENTITY_NAME_QNAME = QName.create(ENTITY_QNAME, "name");
+
private AbstractBootstrapSupport support;
private DOMClusterSingletonServiceProviderImpl css;
css.close();
}
+ /**
+ * Connect the Source and Sink. Verify that the initial state returned from Source was empty. After that make some
+ * changes to the Source's datastore and verify that they were all replicated to the Sink
+ */
@Test
- public void testSourceToSink() throws InterruptedException {
+ public void testSourceToSink() throws InterruptedException, ExecutionException {
// Make sure to start source...
final Registration source = NettyReplication.createSource(support, getDomBroker(), css, true, TEST_PORT);
// ... and give it some time start up and open up the port
final Registration sink = NettyReplication.createSink(support, sinkBroker, css, true,
Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO);
// ... and sync on it starting up
- verify(sinkBroker, timeout(1000)).createMergingTransactionChain(any());
-
- // FIXME: add a few writes to the broker so we have multiple transactions and verify deltas
- verify(sinkChain, timeout(2000)).newWriteOnlyTransaction();
+ // verify the connection was established and MSG_EMPTY_DATA was transferred
+ verify(sinkBroker, timeout(1000)).createMergingTransactionChain(any());
verify(sinkTx, timeout(1000)).put(eq(LogicalDatastoreType.CONFIGURATION), eq(YangInstanceIdentifier.empty()),
any(ContainerNode.class));
- verify(sinkTx, timeout(1000)).commit();
+
+ // generate some deltas
+ final int deltaCount = 5;
+ generateModification(getDataBroker(), deltaCount);
+
+ // verify that all the deltas were transferred and committed + 1 invocation from receiving MSG_EMPTY_DATA
+ verify(sinkChain, timeout(2000).times(deltaCount + 1)).newWriteOnlyTransaction();
+ verify(sinkTx, timeout(2000).times(deltaCount + 1)).commit();
+
+ sink.close();
+ source.close();
+ }
+
+ /**
+ * Add some data first and then start replication. Only 1 change is expected to return from the Source and it
+ * should contain the initial state of the Source's datastore.
+ */
+ @Test
+ public void testReplicateInitialState() throws InterruptedException, ExecutionException {
+ // add some data to datastore
+ final int deltaCount = 5;
+ generateModification(getDataBroker(), deltaCount);
+
+ // Make sure to start source...
+ final Registration source = NettyReplication.createSource(support, getDomBroker(), css, true, TEST_PORT);
+ // ... and give it some time start up and open up the port
+ Thread.sleep(1000);
+
+ // Mocking for sink...
+ final DOMTransactionChain sinkChain = mock(DOMTransactionChain.class);
+ final DOMDataTreeWriteTransaction sinkTx = mock(DOMDataTreeWriteTransaction.class);
+ doReturn(CommitInfo.emptyFluentFuture()).when(sinkTx).commit();
+ doReturn(sinkTx).when(sinkChain).newWriteOnlyTransaction();
+ final DOMDataBroker sinkBroker = mock(DOMDataBroker.class);
+ doReturn(sinkChain).when(sinkBroker).createMergingTransactionChain(any());
+
+ // Kick of the sink ...
+ final Registration sink = NettyReplication.createSink(support, sinkBroker, css, true,
+ Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO);
+ // ... and sync on it starting up
+
+ // verify the connection was established and MSG_EMPTY_DATA was transferred
+ verify(sinkBroker, timeout(1000)).createMergingTransactionChain(any());
+ verify(sinkChain, timeout(2000).times(1)).newWriteOnlyTransaction();
+
+ // verify that the initial data invoked onDataTreeChanged() and was transferred to sink
+ ArgumentCaptor<NormalizedNode<?, ?>> dataCaptor = ArgumentCaptor.forClass(NormalizedNode.class);
+ verify(sinkTx, timeout(2000).times(1)).put(any(), any(), dataCaptor.capture());
+ // verify that the initial state contains everything
+ NormalizedNode<?, ?> capturedInitialState = dataCaptor.getAllValues().iterator().next();
+ NormalizedNode<?, ?> expectedEntityState = generateNormalizedNodeForEntities(deltaCount);
+ assertEquals(expectedEntityState, capturedInitialState);
+
+ verify(sinkTx, timeout(2000).times(1)).commit();
sink.close();
source.close();
}
+
+ private static ContainerNode generateNormalizedNodeForEntities(final int amount) {
+ final CollectionNodeBuilder<MapEntryNode, MapNode> builder = ImmutableNodes.mapNodeBuilder(ENTITY_QNAME);
+ for (int i = 0; i < amount; i++) {
+ builder.withChild(ImmutableNodes.mapEntry(ENTITY_QNAME, ENTITY_NAME_QNAME, "testEntity" + i));
+ }
+
+ return Builders.containerBuilder()
+ .withNodeIdentifier(new NodeIdentifier(SchemaContext.NAME))
+ .withChild(builder.build())
+ .build();
+ }
+
+ private static void generateModification(final DataBroker broker, final int amount)
+ throws InterruptedException, ExecutionException {
+ for (int i = 0; i < amount; i++) {
+ final WriteTransaction writeTransaction = broker.newWriteOnlyTransaction();
+ final EntityKey key = new EntityKey("testEntity" + i);
+
+ writeTransaction.put(LogicalDatastoreType.CONFIGURATION,
+ InstanceIdentifier.builder(Entity.class, key).build(), new EntityBuilder().withKey(key).build());
+ writeTransaction.commit().get();
+ }
+ }
}