<optional>true</optional>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.mdsal</groupId>
+ <artifactId>mdsal-binding-dom-adapter</artifactId>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.mdsal</groupId>
+ <artifactId>mdsal-binding-test-utils</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.mdsal</groupId>
+ <artifactId>mdsal-eos-dom-simple</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.mdsal</groupId>
+ <artifactId>mdsal-singleton-dom-impl</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<scm>
this.workerGroup = requireNonNull(workerGroup);
}
- public static @NonNull BootstrapSupport create() {
+ public static @NonNull AbstractBootstrapSupport create() {
if (Epoll.isAvailable()) {
return new EpollBootstrapSupport();
}
*/
static final int LENGTH_FIELD_MAX = 1024 * 1024;
- static final ByteBuf EMPTY_DATA = Unpooled.wrappedBuffer(new byte[] { MSG_EMPTY_DATA });
- static final ByteBuf DTC_APPLY = Unpooled.wrappedBuffer(new byte[] { MSG_DTC_APPLY });
+ static final ByteBuf EMPTY_DATA = Unpooled.unreleasableBuffer(
+ Unpooled.wrappedBuffer(new byte[] { MSG_EMPTY_DATA }));
+ static final ByteBuf DTC_APPLY = Unpooled.unreleasableBuffer(Unpooled.wrappedBuffer(new byte[] { MSG_DTC_APPLY }));
private Constants() {
// Hidden on purpose
@Sharable
final class MessageFrameEncoder extends LengthFieldPrepender {
- private static final MessageFrameEncoder INSTANCE = new MessageFrameEncoder();
+ static final MessageFrameEncoder INSTANCE = new MessageFrameEncoder();
private MessageFrameEncoder() {
super(Constants.LENGTH_FIELD_LENGTH);
}
-
- static MessageFrameEncoder instance() {
- return INSTANCE;
- }
}
import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
import org.opendaylight.mdsal.replicate.common.DataTreeCandidateUtils;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.stream.ReusableStreamReceiver;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.codec.binfmt.DataTreeCandidateInputOutput;
import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeDataInput;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.data.impl.schema.ReusableImmutableNormalizedNodeStreamWriter;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
final class SinkRequestHandler extends SimpleChannelInboundHandler<ByteBuf> {
private static final Logger LOG = LoggerFactory.getLogger(SinkRequestHandler.class);
+ private static final ContainerNode EMPTY_ROOT = ImmutableNodes.containerNode(SchemaContext.NAME);
private final ReusableStreamReceiver receiver = ReusableImmutableNormalizedNodeStreamWriter.create();
private final List<ByteBuf> chunks = new ArrayList<>();
private void handleEmptyData() {
final DOMDataTreeWriteTransaction tx = chain.newWriteOnlyTransaction();
- tx.delete(tree.getDatastoreType(), tree.getRootIdentifier());
+
+ if (tree.getRootIdentifier().isEmpty()) {
+ tx.put(tree.getDatastoreType(), YangInstanceIdentifier.empty(), EMPTY_ROOT);
+ } else {
+ tx.delete(tree.getDatastoreType(), tree.getRootIdentifier());
+ }
commit(tx);
}
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
+import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.Future;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.checkerframework.checker.lock.qual.GuardedBy;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMDataBroker;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
+import org.opendaylight.yangtools.util.concurrent.FluentFutures;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeDataOutput;
import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-final class SinkSingletonService implements ClusterSingletonService {
+final class SinkSingletonService extends ChannelInitializer<SocketChannel> implements ClusterSingletonService {
private static final Logger LOG = LoggerFactory.getLogger(SinkSingletonService.class);
private static final ServiceGroupIdentifier SGID =
ServiceGroupIdentifier.create(SinkSingletonService.class.getName());
static {
try {
- TREE_REQUEST = requestTree(TREE);
+ TREE_REQUEST = Unpooled.unreleasableBuffer(requestTree(TREE));
} catch (IOException e) {
throw new ExceptionInInitializerError(e);
}
this.dataBroker = requireNonNull(dataBroker);
this.sourceAddress = requireNonNull(sourceAddress);
this.reconnectDelay = requireNonNull(reconnectDelay);
+ LOG.info("Replication sink from {} waiting for cluster-wide mastership", sourceAddress);
}
@Override
futureChannel = bs
.option(ChannelOption.SO_KEEPALIVE, true)
+ .handler(this)
.connect(sourceAddress, null);
futureChannel.addListener(compl -> channelResolved(compl, group));
@Override
public synchronized ListenableFuture<?> closeServiceInstance() {
- // TODO Auto-generated method stub
- return null;
+ // FIXME: initiate orderly shutdown
+ return FluentFutures.immediateNullFluentFuture();
+ }
+
+ @Override
+ protected void initChannel(final SocketChannel ch) {
+ ch.pipeline()
+ .addLast("frameDecoder", new MessageFrameDecoder())
+ .addLast("requestHandler", new SinkRequestHandler(TREE, dataBroker.createMergingTransactionChain(
+ new SinkTransactionChainListener(ch))))
+ .addLast("frameEncoder", MessageFrameEncoder.INSTANCE);
}
private synchronized void channelResolved(final Future<?> completedFuture, final ScheduledExecutorService group) {
- if (completedFuture != futureChannel) {
- // Future changed, this callback is irrelevant
+ final Throwable cause = completedFuture.cause();
+ if (cause != null) {
+ LOG.info("Failed to connect to source {}, reconnecting in {}", sourceAddress, reconnectDelay, cause);
+ group.schedule(() -> {
+ // FIXME: perform reconnect
+ }, reconnectDelay.toNanos(), TimeUnit.NANOSECONDS);
return;
}
- final Channel channel = futureChannel.channel();
- channel.pipeline()
- .addLast("frameDecoder", new MessageFrameDecoder())
- .addLast("requestHandler", new SinkRequestHandler(TREE, dataBroker.createMergingTransactionChain(
- new SinkTransactionChainListener(channel))))
- .addLast("frameEncoder", MessageFrameEncoder.instance());
-
- channel.writeAndFlush(TREE_REQUEST);
+ final Channel ch = futureChannel.channel();
+ LOG.info("Channel {} established", ch);
+ ch.writeAndFlush(TREE_REQUEST);
}
private static ByteBuf requestTree(final DOMDataTreeIdentifier tree) throws IOException {
final ByteBuf ret = Unpooled.buffer();
try (ByteBufOutputStream stream = new ByteBufOutputStream(ret)) {
+ stream.writeByte(Constants.MSG_SUBSCRIBE_REQ);
try (NormalizedNodeDataOutput output = NormalizedNodeStreamVersion.current().newDataOutput(stream)) {
tree.getDatastoreType().writeTo(output);
output.writeYangInstanceIdentifier(tree.getRootIdentifier());
reg = dtcs.registerDataTreeChangeListener(dataTree, new ClusteredDOMDataTreeChangeListener() {
@Override
public void onInitialData() {
+ LOG.debug("Channel {} tree {} has empty data", channel, dataTree);
channel.writeAndFlush(AbstractSourceMessage.empty());
}
@Override
public void onDataTreeChanged(final Collection<DataTreeCandidate> changes) {
+ LOG.debug("Channel {} tree {} has {} changes", channel, dataTree, changes.size());
channel.writeAndFlush(AbstractSourceMessage.of(changes));
}
});
this.bootstrapSupport = requireNonNull(bootstrapSupport);
this.dtcs = requireNonNull(dtcs);
this.listenPort = listenPort;
+ LOG.info("Replication source on port {} waiting for cluster-wide mastership", listenPort);
}
@Override
ch.pipeline()
.addLast("frameDecoder", new MessageFrameDecoder())
.addLast("requestHandler", new SourceRequestHandler(dtcs))
- .addLast("dtclHandler", new DeltaEncoder(NormalizedNodeStreamVersion.current()))
- .addLast("frameEncoder", MessageFrameEncoder.instance());
+ // Output, in reverse order
+ .addLast("frameEncoder", MessageFrameEncoder.INSTANCE)
+ .addLast("dtclHandler", new DeltaEncoder(NormalizedNodeStreamVersion.current()));
children.add(ch);
- LOG.debug("Channel {} established", ch);
+ LOG.info("Channel {} established", ch);
}
private static ListenableFuture<Void> closeChannel(final Channel ch) {
--- /dev/null
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+import java.net.Inet4Address;
+import java.time.Duration;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.opendaylight.mdsal.binding.dom.adapter.test.AbstractDataBrokerTest;
+import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataBroker;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
+import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
+import org.opendaylight.mdsal.eos.dom.simple.SimpleDOMEntityOwnershipService;
+import org.opendaylight.mdsal.singleton.dom.impl.DOMClusterSingletonServiceProviderImpl;
+import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class IntegrationTest extends AbstractDataBrokerTest {
+ private static final int TEST_PORT = 4000;
+
+ private AbstractBootstrapSupport support;
+ private DOMClusterSingletonServiceProviderImpl css;
+
+ @Before
+ public void before() {
+ support = AbstractBootstrapSupport.create();
+ css = new DOMClusterSingletonServiceProviderImpl(new SimpleDOMEntityOwnershipService());
+ css.initializeProvider();
+ }
+
+ @After
+ public void after() throws InterruptedException {
+ support.close();
+ css.close();
+ }
+
+ @Test
+ public void testSourceToSink() throws InterruptedException {
+ // Make sure to start source...
+ final Registration source = NettyReplication.createSource(support, getDomBroker(), css, true, TEST_PORT);
+ // ... and give it some time start up and open up the port
+ Thread.sleep(1000);
+
+ // Mocking for sink...
+ final DOMTransactionChain sinkChain = mock(DOMTransactionChain.class);
+ final DOMDataTreeWriteTransaction sinkTx = mock(DOMDataTreeWriteTransaction.class);
+ doReturn(CommitInfo.emptyFluentFuture()).when(sinkTx).commit();
+ doReturn(sinkTx).when(sinkChain).newWriteOnlyTransaction();
+ final DOMDataBroker sinkBroker = mock(DOMDataBroker.class);
+ doReturn(sinkChain).when(sinkBroker).createMergingTransactionChain(any());
+
+ // Kick of the sink ...
+ final Registration sink = NettyReplication.createSink(support, sinkBroker, css, true,
+ Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO);
+ // ... and sync on it starting up
+ verify(sinkBroker, timeout(1000)).createMergingTransactionChain(any());
+
+ // FIXME: add a few writes to the broker so we have multiple transactions and verify deltas
+
+ verify(sinkChain, timeout(2000)).newWriteOnlyTransaction();
+ verify(sinkTx, timeout(1000)).put(eq(LogicalDatastoreType.CONFIGURATION), eq(YangInstanceIdentifier.empty()),
+ any(ContainerNode.class));
+ verify(sinkTx, timeout(1000)).commit();
+
+ sink.close();
+ source.close();
+ }
+}