From: Robert Varga Date: Mon, 22 Jun 2020 12:44:30 +0000 (+0200) Subject: Add a unit test for netty replication X-Git-Tag: v4.0.16~10 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=mdsal.git;a=commitdiff_plain;h=625453e1d93ae3f66131bc1c87059b3a233fa073 Add a unit test for netty replication This is a simple unit test, which we can use to validate establishment of source connection. This flushes out a few bugs, which are also addressed. Change-Id: Ie8e5ae1772ed5dc544f434d7db44b2b31b54443b Signed-off-by: Robert Varga --- diff --git a/replicate/mdsal-replicate-netty/pom.xml b/replicate/mdsal-replicate-netty/pom.xml index ab576718aa..72c0b1b8db 100644 --- a/replicate/mdsal-replicate-netty/pom.xml +++ b/replicate/mdsal-replicate-netty/pom.xml @@ -38,6 +38,25 @@ true + + org.opendaylight.mdsal + mdsal-binding-dom-adapter + test-jar + + + org.opendaylight.mdsal + mdsal-binding-test-utils + + + org.opendaylight.mdsal + mdsal-eos-dom-simple + test + + + org.opendaylight.mdsal + mdsal-singleton-dom-impl + test + diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/AbstractBootstrapSupport.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/AbstractBootstrapSupport.java index 926a7761d3..a77a78c99f 100644 --- a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/AbstractBootstrapSupport.java +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/AbstractBootstrapSupport.java @@ -33,7 +33,7 @@ public abstract class AbstractBootstrapSupport implements AutoCloseable, Bootstr this.workerGroup = requireNonNull(workerGroup); } - public static @NonNull BootstrapSupport create() { + public static @NonNull AbstractBootstrapSupport create() { if (Epoll.isAvailable()) { return new EpollBootstrapSupport(); } diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/Constants.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/Constants.java index eddf8d6dba..c9611bec6c 100644 --- a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/Constants.java +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/Constants.java @@ -42,8 +42,9 @@ final class Constants { */ static final int LENGTH_FIELD_MAX = 1024 * 1024; - static final ByteBuf EMPTY_DATA = Unpooled.wrappedBuffer(new byte[] { MSG_EMPTY_DATA }); - static final ByteBuf DTC_APPLY = Unpooled.wrappedBuffer(new byte[] { MSG_DTC_APPLY }); + static final ByteBuf EMPTY_DATA = Unpooled.unreleasableBuffer( + Unpooled.wrappedBuffer(new byte[] { MSG_EMPTY_DATA })); + static final ByteBuf DTC_APPLY = Unpooled.unreleasableBuffer(Unpooled.wrappedBuffer(new byte[] { MSG_DTC_APPLY })); private Constants() { // Hidden on purpose diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/MessageFrameEncoder.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/MessageFrameEncoder.java index f6d934781f..da0f51574a 100644 --- a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/MessageFrameEncoder.java +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/MessageFrameEncoder.java @@ -12,13 +12,9 @@ import io.netty.handler.codec.LengthFieldPrepender; @Sharable final class MessageFrameEncoder extends LengthFieldPrepender { - private static final MessageFrameEncoder INSTANCE = new MessageFrameEncoder(); + static final MessageFrameEncoder INSTANCE = new MessageFrameEncoder(); private MessageFrameEncoder() { super(Constants.LENGTH_FIELD_LENGTH); } - - static MessageFrameEncoder instance() { - return INSTANCE; - } } diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkRequestHandler.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkRequestHandler.java index 7ead9c26e2..5a29573ade 100644 --- a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkRequestHandler.java +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkRequestHandler.java @@ -27,16 +27,21 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction; import org.opendaylight.mdsal.dom.api.DOMTransactionChain; import org.opendaylight.mdsal.replicate.common.DataTreeCandidateUtils; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.stream.ReusableStreamReceiver; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.codec.binfmt.DataTreeCandidateInputOutput; import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeDataInput; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.ReusableImmutableNormalizedNodeStreamWriter; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; final class SinkRequestHandler extends SimpleChannelInboundHandler { private static final Logger LOG = LoggerFactory.getLogger(SinkRequestHandler.class); + private static final ContainerNode EMPTY_ROOT = ImmutableNodes.containerNode(SchemaContext.NAME); private final ReusableStreamReceiver receiver = ReusableImmutableNormalizedNodeStreamWriter.create(); private final List chunks = new ArrayList<>(); @@ -72,7 +77,12 @@ final class SinkRequestHandler extends SimpleChannelInboundHandler { private void handleEmptyData() { final DOMDataTreeWriteTransaction tx = chain.newWriteOnlyTransaction(); - tx.delete(tree.getDatastoreType(), tree.getRootIdentifier()); + + if (tree.getRootIdentifier().isEmpty()) { + tx.put(tree.getDatastoreType(), YangInstanceIdentifier.empty(), EMPTY_ROOT); + } else { + tx.delete(tree.getDatastoreType(), tree.getRootIdentifier()); + } commit(tx); } diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkSingletonService.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkSingletonService.java index c2b5f4a838..f71e5af1fc 100644 --- a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkSingletonService.java +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkSingletonService.java @@ -16,25 +16,29 @@ import io.netty.buffer.ByteBufOutputStream; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; +import io.netty.channel.socket.SocketChannel; import io.netty.util.concurrent.Future; import java.io.IOException; import java.net.InetSocketAddress; import java.time.Duration; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.checkerframework.checker.lock.qual.GuardedBy; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.mdsal.dom.api.DOMDataBroker; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService; import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier; +import org.opendaylight.yangtools.util.concurrent.FluentFutures; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeDataOutput; import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -final class SinkSingletonService implements ClusterSingletonService { +final class SinkSingletonService extends ChannelInitializer implements ClusterSingletonService { private static final Logger LOG = LoggerFactory.getLogger(SinkSingletonService.class); private static final ServiceGroupIdentifier SGID = ServiceGroupIdentifier.create(SinkSingletonService.class.getName()); @@ -45,7 +49,7 @@ final class SinkSingletonService implements ClusterSingletonService { static { try { - TREE_REQUEST = requestTree(TREE); + TREE_REQUEST = Unpooled.unreleasableBuffer(requestTree(TREE)); } catch (IOException e) { throw new ExceptionInInitializerError(e); } @@ -65,6 +69,7 @@ final class SinkSingletonService implements ClusterSingletonService { this.dataBroker = requireNonNull(dataBroker); this.sourceAddress = requireNonNull(sourceAddress); this.reconnectDelay = requireNonNull(reconnectDelay); + LOG.info("Replication sink from {} waiting for cluster-wide mastership", sourceAddress); } @Override @@ -81,6 +86,7 @@ final class SinkSingletonService implements ClusterSingletonService { futureChannel = bs .option(ChannelOption.SO_KEEPALIVE, true) + .handler(this) .connect(sourceAddress, null); futureChannel.addListener(compl -> channelResolved(compl, group)); @@ -88,30 +94,39 @@ final class SinkSingletonService implements ClusterSingletonService { @Override public synchronized ListenableFuture closeServiceInstance() { - // TODO Auto-generated method stub - return null; + // FIXME: initiate orderly shutdown + return FluentFutures.immediateNullFluentFuture(); + } + + @Override + protected void initChannel(final SocketChannel ch) { + ch.pipeline() + .addLast("frameDecoder", new MessageFrameDecoder()) + .addLast("requestHandler", new SinkRequestHandler(TREE, dataBroker.createMergingTransactionChain( + new SinkTransactionChainListener(ch)))) + .addLast("frameEncoder", MessageFrameEncoder.INSTANCE); } private synchronized void channelResolved(final Future completedFuture, final ScheduledExecutorService group) { - if (completedFuture != futureChannel) { - // Future changed, this callback is irrelevant + final Throwable cause = completedFuture.cause(); + if (cause != null) { + LOG.info("Failed to connect to source {}, reconnecting in {}", sourceAddress, reconnectDelay, cause); + group.schedule(() -> { + // FIXME: perform reconnect + }, reconnectDelay.toNanos(), TimeUnit.NANOSECONDS); return; } - final Channel channel = futureChannel.channel(); - channel.pipeline() - .addLast("frameDecoder", new MessageFrameDecoder()) - .addLast("requestHandler", new SinkRequestHandler(TREE, dataBroker.createMergingTransactionChain( - new SinkTransactionChainListener(channel)))) - .addLast("frameEncoder", MessageFrameEncoder.instance()); - - channel.writeAndFlush(TREE_REQUEST); + final Channel ch = futureChannel.channel(); + LOG.info("Channel {} established", ch); + ch.writeAndFlush(TREE_REQUEST); } private static ByteBuf requestTree(final DOMDataTreeIdentifier tree) throws IOException { final ByteBuf ret = Unpooled.buffer(); try (ByteBufOutputStream stream = new ByteBufOutputStream(ret)) { + stream.writeByte(Constants.MSG_SUBSCRIBE_REQ); try (NormalizedNodeDataOutput output = NormalizedNodeStreamVersion.current().newDataOutput(stream)) { tree.getDatastoreType().writeTo(output); output.writeYangInstanceIdentifier(tree.getRootIdentifier()); diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceRequestHandler.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceRequestHandler.java index f302ca156d..b7811f0184 100644 --- a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceRequestHandler.java +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceRequestHandler.java @@ -82,11 +82,13 @@ final class SourceRequestHandler extends SimpleChannelInboundHandler { reg = dtcs.registerDataTreeChangeListener(dataTree, new ClusteredDOMDataTreeChangeListener() { @Override public void onInitialData() { + LOG.debug("Channel {} tree {} has empty data", channel, dataTree); channel.writeAndFlush(AbstractSourceMessage.empty()); } @Override public void onDataTreeChanged(final Collection changes) { + LOG.debug("Channel {} tree {} has {} changes", channel, dataTree, changes.size()); channel.writeAndFlush(AbstractSourceMessage.of(changes)); } }); diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceSingletonService.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceSingletonService.java index 415637269b..2d2b35ecfb 100644 --- a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceSingletonService.java +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceSingletonService.java @@ -53,6 +53,7 @@ final class SourceSingletonService extends ChannelInitializer imp this.bootstrapSupport = requireNonNull(bootstrapSupport); this.dtcs = requireNonNull(dtcs); this.listenPort = listenPort; + LOG.info("Replication source on port {} waiting for cluster-wide mastership", listenPort); } @Override @@ -112,11 +113,12 @@ final class SourceSingletonService extends ChannelInitializer imp ch.pipeline() .addLast("frameDecoder", new MessageFrameDecoder()) .addLast("requestHandler", new SourceRequestHandler(dtcs)) - .addLast("dtclHandler", new DeltaEncoder(NormalizedNodeStreamVersion.current())) - .addLast("frameEncoder", MessageFrameEncoder.instance()); + // Output, in reverse order + .addLast("frameEncoder", MessageFrameEncoder.INSTANCE) + .addLast("dtclHandler", new DeltaEncoder(NormalizedNodeStreamVersion.current())); children.add(ch); - LOG.debug("Channel {} established", ch); + LOG.info("Channel {} established", ch); } private static ListenableFuture closeChannel(final Channel ch) { diff --git a/replicate/mdsal-replicate-netty/src/test/java/org/opendaylight/mdsal/replicate/netty/IntegrationTest.java b/replicate/mdsal-replicate-netty/src/test/java/org/opendaylight/mdsal/replicate/netty/IntegrationTest.java new file mode 100644 index 0000000000..6261bd3f9f --- /dev/null +++ b/replicate/mdsal-replicate-netty/src/test/java/org/opendaylight/mdsal/replicate/netty/IntegrationTest.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.mdsal.replicate.netty; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; + +import java.net.Inet4Address; +import java.time.Duration; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; +import org.opendaylight.mdsal.binding.dom.adapter.test.AbstractDataBrokerTest; +import org.opendaylight.mdsal.common.api.CommitInfo; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.dom.api.DOMDataBroker; +import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction; +import org.opendaylight.mdsal.dom.api.DOMTransactionChain; +import org.opendaylight.mdsal.eos.dom.simple.SimpleDOMEntityOwnershipService; +import org.opendaylight.mdsal.singleton.dom.impl.DOMClusterSingletonServiceProviderImpl; +import org.opendaylight.yangtools.concepts.Registration; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; + +@RunWith(MockitoJUnitRunner.StrictStubs.class) +public class IntegrationTest extends AbstractDataBrokerTest { + private static final int TEST_PORT = 4000; + + private AbstractBootstrapSupport support; + private DOMClusterSingletonServiceProviderImpl css; + + @Before + public void before() { + support = AbstractBootstrapSupport.create(); + css = new DOMClusterSingletonServiceProviderImpl(new SimpleDOMEntityOwnershipService()); + css.initializeProvider(); + } + + @After + public void after() throws InterruptedException { + support.close(); + css.close(); + } + + @Test + public void testSourceToSink() throws InterruptedException { + // Make sure to start source... + final Registration source = NettyReplication.createSource(support, getDomBroker(), css, true, TEST_PORT); + // ... and give it some time start up and open up the port + Thread.sleep(1000); + + // Mocking for sink... + final DOMTransactionChain sinkChain = mock(DOMTransactionChain.class); + final DOMDataTreeWriteTransaction sinkTx = mock(DOMDataTreeWriteTransaction.class); + doReturn(CommitInfo.emptyFluentFuture()).when(sinkTx).commit(); + doReturn(sinkTx).when(sinkChain).newWriteOnlyTransaction(); + final DOMDataBroker sinkBroker = mock(DOMDataBroker.class); + doReturn(sinkChain).when(sinkBroker).createMergingTransactionChain(any()); + + // Kick of the sink ... + final Registration sink = NettyReplication.createSink(support, sinkBroker, css, true, + Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO); + // ... and sync on it starting up + verify(sinkBroker, timeout(1000)).createMergingTransactionChain(any()); + + // FIXME: add a few writes to the broker so we have multiple transactions and verify deltas + + verify(sinkChain, timeout(2000)).newWriteOnlyTransaction(); + verify(sinkTx, timeout(1000)).put(eq(LogicalDatastoreType.CONFIGURATION), eq(YangInstanceIdentifier.empty()), + any(ContainerNode.class)); + verify(sinkTx, timeout(1000)).commit(); + + sink.close(); + source.close(); + } +}