<version>5.0.12-SNAPSHOT</version>
</dependency>
+ <!-- Replication -->
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>mdsal-replicate-common</artifactId>
+ <version>5.0.12-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>mdsal-replicate-netty</artifactId>
+ <version>5.0.12-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>odl-mdsal-exp-replicate-common</artifactId>
+ <version>5.0.12-SNAPSHOT</version>
+ <classifier>features</classifier>
+ <type>xml</type>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>odl-mdsal-exp-replicate-netty</artifactId>
+ <version>5.0.12-SNAPSHOT</version>
+ <classifier>features</classifier>
+ <type>xml</type>
+ </dependency>
+
<!-- MODELS -->
<dependency>
<groupId>org.opendaylight.mdsal.model</groupId>
<artifactId>mdsal-yanglib-rfc8525</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.mdsal</groupId>
+ <artifactId>mdsal-replicate-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.mdsal</groupId>
+ <artifactId>mdsal-replicate-netty</artifactId>
+ </dependency>
+
<dependency>
<groupId>org.opendaylight.mdsal.model</groupId>
<artifactId>odl-uint24</artifactId>
<classifier>features</classifier>
<type>xml</type>
</dependency>
+
+ <dependency>
+ <groupId>org.opendaylight.mdsal</groupId>
+ <artifactId>odl-mdsal-exp-replicate-common</artifactId>
+ <classifier>features</classifier>
+ <type>xml</type>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.mdsal</groupId>
+ <artifactId>odl-mdsal-exp-replicate-netty</artifactId>
+ <classifier>features</classifier>
+ <type>xml</type>
+ </dependency>
</dependencies>
</project>
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+
+ This program and the accompanying materials are made available under the
+ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.opendaylight.mdsal</groupId>
+ <artifactId>feature-parent</artifactId>
+ <version>5.0.12-SNAPSHOT</version>
+ <relativePath>../feature-parent</relativePath>
+ </parent>
+
+ <artifactId>odl-mdsal-exp-replicate-common</artifactId>
+ <packaging>feature</packaging>
+
+ <name>OpenDaylight :: MD-SAL :: Replicate :: Common</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.opendaylight.mdsal</groupId>
+ <artifactId>odl-mdsal-dom-api</artifactId>
+ <classifier>features</classifier>
+ <type>xml</type>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.mdsal</groupId>
+ <artifactId>mdsal-replicate-common</artifactId>
+ </dependency>
+ </dependencies>
+</project>
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+
+ This program and the accompanying materials are made available under the
+ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.opendaylight.mdsal</groupId>
+ <artifactId>feature-parent</artifactId>
+ <version>5.0.12-SNAPSHOT</version>
+ <relativePath>../feature-parent</relativePath>
+ </parent>
+
+ <artifactId>odl-mdsal-exp-replicate-netty</artifactId>
+ <packaging>feature</packaging>
+
+ <name>OpenDaylight :: MD-SAL :: Replicate :: Netty</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.opendaylight.mdsal</groupId>
+ <artifactId>odl-mdsal-exp-replicate-common</artifactId>
+ <classifier>features</classifier>
+ <type>xml</type>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.mdsal</groupId>
+ <artifactId>mdsal-replicate-netty</artifactId>
+ </dependency>
+ </dependencies>
+</project>
<module>odl-mdsal-model-odl-uint24</module>
<module>odl-mdsal-uint24-netty</module>
+ <!-- Datastore replication -->
+ <module>odl-mdsal-exp-replicate-common</module>
+ <module>odl-mdsal-exp-replicate-netty</module>
+
<!-- Models -->
<!-- Standards -->
<module>odl-mdsal-model-rfc6991</module>
<module>singleton-service</module>
<module>trace</module>
+ <!-- Data store replicators -->
+ <module>replicate</module>
+
<!-- IETF YANG (Module) Library -->
<module>yanglib</module>
</modules>
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+
+ This program and the accompanying materials are made available under the
+ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.opendaylight.mdsal</groupId>
+ <artifactId>dom-parent</artifactId>
+ <version>5.0.12-SNAPSHOT</version>
+ <relativePath>../../dom/dom-parent</relativePath>
+ </parent>
+
+ <artifactId>mdsal-replicate-common</artifactId>
+ <packaging>bundle</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-data-codec-binfmt</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.mdsal</groupId>
+ <artifactId>mdsal-dom-spi</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.mdsal</groupId>
+ <artifactId>mdsal-singleton-common-api</artifactId>
+ </dependency>
+ </dependencies>
+
+ <scm>
+ <connection>scm:git:http://git.opendaylight.org/gerrit/mdsal.git</connection>
+ <developerConnection>scm:git:ssh://git.opendaylight.org:29418/mdsal.git</developerConnection>
+ <tag>HEAD</tag>
+ <url>https://wiki.opendaylight.org/view/MD-SAL:Main</url>
+ </scm>
+</project>
--- /dev/null
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.common;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Optional;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteOperations;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class DOMDataBrokerModification implements DataTreeModification {
+ private static final Logger LOG = LoggerFactory.getLogger(DOMDataBrokerModification.class);
+
+ private final DOMDataTreeWriteOperations transaction;
+ private final LogicalDatastoreType datastore;
+
+ DOMDataBrokerModification(final DOMDataTreeWriteOperations transaction, final LogicalDatastoreType datastore) {
+ this.transaction = requireNonNull(transaction);
+ this.datastore = requireNonNull(datastore);
+ }
+
+ @Override
+ public void delete(final YangInstanceIdentifier path) {
+ LOG.trace("BackupModification - DELETE - {}", path);
+ transaction.delete(datastore, path);
+ }
+
+ @Override
+ public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ LOG.trace("BackupModification - WRITE - {} - DATA: {}", path, data);
+ transaction.put(datastore, path, data);
+ }
+
+ @Override
+ public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void ready() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void applyToCursor(final DataTreeModificationCursor cursor) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Optional<NormalizedNode<?, ?>> readNode(final YangInstanceIdentifier path) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public DataTreeModification newModification() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public SchemaContext getSchemaContext() {
+ throw new UnsupportedOperationException();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.common;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Optional;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class DOMStoreModification implements DataTreeModification {
+ private static final Logger LOG = LoggerFactory.getLogger(DOMStoreModification.class);
+
+ private final DOMStoreWriteTransaction transaction;
+
+ DOMStoreModification(final DOMStoreWriteTransaction transaction) {
+ this.transaction = requireNonNull(transaction);
+ }
+
+ @Override
+ public void delete(final YangInstanceIdentifier path) {
+ LOG.trace("Delete {}", path);
+ transaction.delete(path);
+ }
+
+ @Override
+ public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ LOG.trace("Write {} data {}", path, data);
+ transaction.write(path, data);
+ }
+
+ @Override
+ public Optional<NormalizedNode<?, ?>> readNode(final YangInstanceIdentifier path) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public @NonNull DataTreeModification newModification() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public SchemaContext getSchemaContext() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void ready() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void applyToCursor(@NonNull final DataTreeModificationCursor cursor) {
+ throw new UnsupportedOperationException();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.common;
+
+import com.google.common.annotations.Beta;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteOperations;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
+
+@Beta
+public final class DataTreeCandidateUtils {
+ private DataTreeCandidateUtils() {
+ // Hidden on purpose
+ }
+
+ public static void applyToTransaction(final DOMDataTreeWriteOperations transaction,
+ final LogicalDatastoreType datastore, final DataTreeCandidate candidate) {
+ DataTreeCandidates.applyToModification(new DOMDataBrokerModification(transaction, datastore), candidate);
+ }
+
+ public static void applyToTransaction(final DOMStoreWriteTransaction transaction,
+ final DataTreeCandidate candidate) {
+ DataTreeCandidates.applyToModification(new DOMStoreModification(transaction), candidate);
+ }
+}
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+
+ This program and the accompanying materials are made available under the
+ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.opendaylight.mdsal</groupId>
+ <artifactId>dom-parent</artifactId>
+ <version>5.0.12-SNAPSHOT</version>
+ <relativePath>../../dom/dom-parent</relativePath>
+ </parent>
+
+ <artifactId>mdsal-replicate-netty</artifactId>
+ <packaging>bundle</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.opendaylight.mdsal</groupId>
+ <artifactId>mdsal-replicate-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport-native-epoll</artifactId>
+ <optional>true</optional>
+ </dependency>
+
+ </dependencies>
+
+ <scm>
+ <connection>scm:git:http://git.opendaylight.org/gerrit/mdsal.git</connection>
+ <developerConnection>scm:git:ssh://git.opendaylight.org:29418/mdsal.git</developerConnection>
+ <tag>HEAD</tag>
+ <url>https://wiki.opendaylight.org/view/MD-SAL:Main</url>
+ </scm>
+</project>
--- /dev/null
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+import static java.util.Objects.requireNonNull;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.socket.ServerSocketChannel;
+import java.util.concurrent.TimeUnit;
+import org.eclipse.jdt.annotation.NonNull;
+
+public abstract class AbstractBootstrapSupport implements AutoCloseable, BootstrapSupport {
+ private final Class<? extends Channel> channelClass;
+ private final Class<? extends ServerSocketChannel> serverChannelClass;
+ private final EventLoopGroup bossGroup;
+ private final EventLoopGroup workerGroup;
+
+ AbstractBootstrapSupport(final Class<? extends Channel> channelClass,
+ final Class<? extends ServerSocketChannel> serverChannelClass, final EventLoopGroup bossGroup,
+ final EventLoopGroup workerGroup) {
+ this.channelClass = requireNonNull(channelClass);
+ this.serverChannelClass = requireNonNull(serverChannelClass);
+ this.bossGroup = requireNonNull(bossGroup);
+ this.workerGroup = requireNonNull(workerGroup);
+ }
+
+ public static @NonNull BootstrapSupport create() {
+ if (Epoll.isAvailable()) {
+ return new EpollBootstrapSupport();
+ }
+ return new NioBootstrapSupport();
+ }
+
+ @Override
+ public final Bootstrap newBootstrap() {
+ return new Bootstrap().group(workerGroup).channel(channelClass);
+ }
+
+ @Override
+ public final ServerBootstrap newServerBootstrap() {
+ return new ServerBootstrap().group(bossGroup, workerGroup).channel(serverChannelClass);
+ }
+
+ @Override
+ public final void close() throws InterruptedException {
+ bossGroup.shutdownGracefully();
+ workerGroup.shutdownGracefully();
+
+ bossGroup.awaitTermination(10, TimeUnit.SECONDS);
+ workerGroup.awaitTermination(10, TimeUnit.SECONDS);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.codec.binfmt.DataTreeCandidateInputOutput;
+import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeDataOutput;
+import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion;
+
+abstract class AbstractSourceMessage {
+ private static final class Empty extends AbstractSourceMessage {
+ @Override
+ void encodeTo(final NormalizedNodeStreamVersion version, final List<Object> out) throws IOException {
+ out.add(Constants.EMPTY_DATA);
+ }
+ }
+
+ private static final class Deltas extends AbstractSourceMessage {
+ private final Collection<DataTreeCandidate> deltas;
+
+ Deltas(final Collection<DataTreeCandidate> deltas) {
+ this.deltas = requireNonNull(deltas);
+ }
+
+ @Override
+ void encodeTo(final NormalizedNodeStreamVersion version, final List<Object> out) throws IOException {
+ for (DataTreeCandidate candidate : deltas) {
+ try (DataOutputStream stream = new DataOutputStream(new SplittingOutputStream(out))) {
+ try (NormalizedNodeDataOutput output = version.newDataOutput(stream)) {
+ DataTreeCandidateInputOutput.writeDataTreeCandidate(output, candidate);
+ }
+ }
+ out.add(Constants.DTC_APPLY);
+ }
+ }
+ }
+
+ private static final AbstractSourceMessage EMPTY = new Empty();
+
+ static AbstractSourceMessage empty() {
+ return EMPTY;
+ }
+
+ static AbstractSourceMessage of(final Collection<DataTreeCandidate> deltas) {
+ return new Deltas(deltas);
+ }
+
+ abstract void encodeTo(NormalizedNodeStreamVersion version, List<Object> out) throws IOException;
+}
--- /dev/null
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+import com.google.common.annotations.Beta;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import org.eclipse.jdt.annotation.NonNull;
+
+@Beta
+public interface BootstrapSupport {
+
+ @NonNull Bootstrap newBootstrap();
+
+ @NonNull ServerBootstrap newServerBootstrap();
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+final class Constants {
+ /**
+ * Subscribe request message. This is the only valid initial message in the sink->source direction. Its payload is
+ * composed of a binary normalized node stream. The stream must contain a {@link LogicalDatastoreType} serialized
+ * via {@link LogicalDatastoreType#writeTo(java.io.DataOutput)} followed by a single {@link YangInstanceIdentifier}.
+ */
+ static final byte MSG_SUBSCRIBE_REQ = 1;
+ /**
+ * Initial data indicating non-presence of the subscribed path.
+ */
+ static final byte MSG_EMPTY_DATA = 2;
+ /**
+ * A chunk of the DataTreeCandidate serialization stream. May be followed by another chunk or
+ * {@link #MSG_DTC_APPLY}.
+ */
+ static final byte MSG_DTC_CHUNK = 3;
+ /**
+ * End-of-DataTreeCandidate serialization stream. The payload is empty.
+ */
+ static final byte MSG_DTC_APPLY = 4;
+
+ /**
+ * Length of the length field in each transmitted frame.
+ */
+ static final int LENGTH_FIELD_LENGTH = 4;
+ /**
+ * Maximum frame size allowed by encoding, 1MiB.
+ */
+ static final int LENGTH_FIELD_MAX = 1024 * 1024;
+
+ static final ByteBuf EMPTY_DATA = Unpooled.wrappedBuffer(new byte[] { MSG_EMPTY_DATA });
+ static final ByteBuf DTC_APPLY = Unpooled.wrappedBuffer(new byte[] { MSG_DTC_APPLY });
+
+ private Constants() {
+ // Hidden on purpose
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+import static java.util.Objects.requireNonNull;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageEncoder;
+import java.io.IOException;
+import java.util.List;
+import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion;
+
+final class DeltaEncoder extends MessageToMessageEncoder<AbstractSourceMessage> {
+ private final NormalizedNodeStreamVersion version;
+
+ DeltaEncoder(final NormalizedNodeStreamVersion version) {
+ this.version = requireNonNull(version);
+ }
+
+ @Override
+ protected void encode(final ChannelHandlerContext ctx, final AbstractSourceMessage msg, final List<Object> out)
+ throws IOException {
+ msg.encodeTo(version, out);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.epoll.EpollSocketChannel;
+
+final class EpollBootstrapSupport extends AbstractBootstrapSupport {
+ EpollBootstrapSupport() {
+ super(EpollSocketChannel.class, EpollServerSocketChannel.class, new EpollEventLoopGroup(),
+ new EpollEventLoopGroup());
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+
+final class MessageFrameDecoder extends LengthFieldBasedFrameDecoder {
+ MessageFrameDecoder() {
+ super(Constants.LENGTH_FIELD_MAX, 0, Constants.LENGTH_FIELD_LENGTH, 0, Constants.LENGTH_FIELD_LENGTH);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.handler.codec.LengthFieldPrepender;
+
+@Sharable
+final class MessageFrameEncoder extends LengthFieldPrepender {
+ private static final MessageFrameEncoder INSTANCE = new MessageFrameEncoder();
+
+ private MessageFrameEncoder() {
+ super(Constants.LENGTH_FIELD_LENGTH);
+ }
+
+ static MessageFrameEncoder instance() {
+ return INSTANCE;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+import static com.google.common.base.Verify.verify;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import org.opendaylight.mdsal.dom.api.DOMDataBroker;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
+import org.opendaylight.yangtools.concepts.AbstractRegistration;
+import org.opendaylight.yangtools.concepts.Registration;
+
+public final class NettyReplication {
+ private static final class Disabled extends AbstractRegistration {
+ @Override
+ protected void removeRegistration() {
+ // no-op
+ }
+ }
+
+ private NettyReplication() {
+ // Hidden on purpose
+ }
+
+ public static Registration createSink(final BootstrapSupport bootstrapSupport, final DOMDataBroker dataBroker,
+ final ClusterSingletonServiceProvider singletonService, final boolean enabled,
+ final InetAddress sourceAddress, final int sourcePort, final Duration reconnectDelay) {
+ return enabled ? singletonService.registerClusterSingletonService(new SinkSingletonService(bootstrapSupport,
+ dataBroker, new InetSocketAddress(sourceAddress, sourcePort), reconnectDelay)) : new Disabled();
+ }
+
+ public static Registration createSource(final BootstrapSupport bootstrapSupport, final DOMDataBroker dataBroker,
+ final ClusterSingletonServiceProvider singletonService, final boolean enabled, final int listenPort) {
+ final DOMDataTreeChangeService dtcs = dataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
+ verify(dtcs != null, "Missing DOMDataTreeChangeService in broker %s", dataBroker);
+
+ return enabled ? singletonService.registerClusterSingletonService(new SourceSingletonService(bootstrapSupport,
+ dtcs, listenPort)) : new Disabled();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
+final class NioBootstrapSupport extends AbstractBootstrapSupport {
+ NioBootstrapSupport() {
+ super(NioSocketChannel.class, NioServerSocketChannel.class, new NioEventLoopGroup(), new NioEventLoopGroup());
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Verify.verify;
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
+import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
+import org.opendaylight.mdsal.replicate.common.DataTreeCandidateUtils;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.ReusableStreamReceiver;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.codec.binfmt.DataTreeCandidateInputOutput;
+import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeDataInput;
+import org.opendaylight.yangtools.yang.data.impl.schema.ReusableImmutableNormalizedNodeStreamWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class SinkRequestHandler extends SimpleChannelInboundHandler<ByteBuf> {
+ private static final Logger LOG = LoggerFactory.getLogger(SinkRequestHandler.class);
+
+ private final ReusableStreamReceiver receiver = ReusableImmutableNormalizedNodeStreamWriter.create();
+ private final List<ByteBuf> chunks = new ArrayList<>();
+ private final DOMDataTreeIdentifier tree;
+ private final DOMTransactionChain chain;
+
+ SinkRequestHandler(final DOMDataTreeIdentifier tree, final DOMTransactionChain chain) {
+ this.tree = requireNonNull(tree);
+ this.chain = requireNonNull(chain);
+ }
+
+ @Override
+ protected void channelRead0(final ChannelHandlerContext ctx, final ByteBuf msg) throws IOException {
+ verify(msg.isReadable(), "Empty message received");
+
+ final short msgType = msg.readUnsignedByte();
+ final Channel channel = ctx.channel();
+ LOG.trace("Channel {} received message type {}", channel, msgType);
+ switch (msgType) {
+ case Constants.MSG_EMPTY_DATA:
+ handleEmptyData();
+ break;
+ case Constants.MSG_DTC_CHUNK:
+ chunks.add(msg);
+ break;
+ case Constants.MSG_DTC_APPLY:
+ handleDtcApply();
+ break;
+ default:
+ throw new IllegalStateException("Unexpected message type " + msgType);
+ }
+ }
+
+ private void handleEmptyData() {
+ final DOMDataTreeWriteTransaction tx = chain.newWriteOnlyTransaction();
+ tx.delete(tree.getDatastoreType(), tree.getRootIdentifier());
+ commit(tx);
+ }
+
+ private void handleDtcApply() throws IOException {
+ checkState(!chunks.isEmpty(), "No chunks to apply");
+
+ final ByteBuf bufs = Unpooled.wrappedBuffer(chunks.toArray(new ByteBuf[0]));
+ chunks.clear();
+
+ final DataTreeCandidate candidate;
+ try (ByteBufInputStream stream = new ByteBufInputStream(bufs)) {
+ candidate = DataTreeCandidateInputOutput.readDataTreeCandidate(NormalizedNodeDataInput.newDataInput(stream),
+ receiver);
+ }
+
+ final DOMDataTreeWriteTransaction tx = chain.newWriteOnlyTransaction();
+ DataTreeCandidateUtils.applyToTransaction(tx, tree.getDatastoreType(), candidate);
+ commit(tx);
+ }
+
+ private static void commit(final DOMDataTreeWriteTransaction tx) {
+ tx.commit().addCallback(new FutureCallback<CommitInfo>() {
+ @Override
+ public void onSuccess(final CommitInfo result) {
+ LOG.trace("Transaction committed with {}", result);
+ }
+
+ @Override
+ public void onFailure(final Throwable cause) {
+ // Handled by transaction chain listener
+ }
+ }, MoreExecutors.directExecutor());
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelOption;
+import io.netty.util.concurrent.Future;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.concurrent.ScheduledExecutorService;
+import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataBroker;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
+import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeDataOutput;
+import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class SinkSingletonService implements ClusterSingletonService {
+ private static final Logger LOG = LoggerFactory.getLogger(SinkSingletonService.class);
+ private static final ServiceGroupIdentifier SGID =
+ ServiceGroupIdentifier.create(SinkSingletonService.class.getName());
+ // TODO: allow different trees?
+ private static final DOMDataTreeIdentifier TREE = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
+ YangInstanceIdentifier.empty());
+ private static final ByteBuf TREE_REQUEST;
+
+ static {
+ try {
+ TREE_REQUEST = requestTree(TREE);
+ } catch (IOException e) {
+ throw new ExceptionInInitializerError(e);
+ }
+ }
+
+ private final BootstrapSupport bootstrapSupport;
+ private final DOMDataBroker dataBroker;
+ private final InetSocketAddress sourceAddress;
+ private final Duration reconnectDelay;
+
+ @GuardedBy("this")
+ private ChannelFuture futureChannel;
+
+ SinkSingletonService(final BootstrapSupport bootstrapSupport, final DOMDataBroker dataBroker,
+ final InetSocketAddress sourceAddress, final Duration reconnectDelay) {
+ this.bootstrapSupport = requireNonNull(bootstrapSupport);
+ this.dataBroker = requireNonNull(dataBroker);
+ this.sourceAddress = requireNonNull(sourceAddress);
+ this.reconnectDelay = requireNonNull(reconnectDelay);
+ }
+
+ @Override
+ public ServiceGroupIdentifier getIdentifier() {
+ return SGID;
+ }
+
+ @Override
+ public synchronized void instantiateServiceInstance() {
+ LOG.info("Replication sink started with source {}", sourceAddress);
+
+ final Bootstrap bs = bootstrapSupport.newBootstrap();
+ final ScheduledExecutorService group = bs.config().group();
+
+ futureChannel = bs
+ .option(ChannelOption.SO_KEEPALIVE, true)
+ .connect(sourceAddress, null);
+
+ futureChannel.addListener(compl -> channelResolved(compl, group));
+ }
+
+ @Override
+ public synchronized ListenableFuture<?> closeServiceInstance() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ private synchronized void channelResolved(final Future<?> completedFuture, final ScheduledExecutorService group) {
+ if (completedFuture != futureChannel) {
+ // Future changed, this callback is irrelevant
+ return;
+ }
+
+ final Channel channel = futureChannel.channel();
+ channel.pipeline()
+ .addLast("frameDecoder", new MessageFrameDecoder())
+ .addLast("requestHandler", new SinkRequestHandler(TREE, dataBroker.createMergingTransactionChain(
+ new SinkTransactionChainListener(channel))))
+ .addLast("frameEncoder", MessageFrameEncoder.instance());
+
+ channel.writeAndFlush(TREE_REQUEST);
+ }
+
+ private static ByteBuf requestTree(final DOMDataTreeIdentifier tree) throws IOException {
+ final ByteBuf ret = Unpooled.buffer();
+
+ try (ByteBufOutputStream stream = new ByteBufOutputStream(ret)) {
+ try (NormalizedNodeDataOutput output = NormalizedNodeStreamVersion.current().newDataOutput(stream)) {
+ tree.getDatastoreType().writeTo(output);
+ output.writeYangInstanceIdentifier(tree.getRootIdentifier());
+ }
+ }
+
+ return ret;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+import static java.util.Objects.requireNonNull;
+
+import io.netty.channel.Channel;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeTransaction;
+import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
+import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class SinkTransactionChainListener implements DOMTransactionChainListener {
+ private static final Logger LOG = LoggerFactory.getLogger(SinkTransactionChainListener.class);
+
+ private final Channel channel;
+
+ SinkTransactionChainListener(final Channel channel) {
+ this.channel = requireNonNull(channel);
+ }
+
+ @Override
+ public void onTransactionChainFailed(final DOMTransactionChain chain, final DOMDataTreeTransaction transaction,
+ final Throwable cause) {
+ LOG.error("Transaction chain for channel {} failed", channel, cause);
+ channel.close();
+ }
+
+ @Override
+ public void onTransactionChainSuccessful(final DOMTransactionChain chain) {
+ LOG.info("Transaction chain for channel {} completed", channel);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+import static com.google.common.base.Verify.verify;
+import static java.util.Objects.requireNonNull;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import java.io.IOException;
+import java.util.Collection;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.ClusteredDOMDataTreeChangeListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeDataInput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Final inbound handler on source side. Handles requests coming from sink and reacts to them.
+ */
+final class SourceRequestHandler extends SimpleChannelInboundHandler<ByteBuf> {
+ private static final Logger LOG = LoggerFactory.getLogger(SourceRequestHandler.class);
+
+ private final DOMDataTreeChangeService dtcs;
+
+ private ListenerRegistration<?> reg;
+
+ SourceRequestHandler(final DOMDataTreeChangeService dtcs) {
+ this.dtcs = requireNonNull(dtcs);
+ }
+
+ @Override
+ public void channelInactive(final ChannelHandlerContext ctx) {
+ LOG.trace("Channel {} going inactive", ctx.channel());
+ if (reg != null) {
+ reg.close();
+ reg = null;
+ }
+ ctx.fireChannelInactive();
+ }
+
+ @Override
+ protected void channelRead0(final ChannelHandlerContext ctx, final ByteBuf msg) throws IOException {
+ verify(msg.isReadable(), "Empty message received");
+
+ final short msgType = msg.readUnsignedByte();
+ final Channel channel = ctx.channel();
+ LOG.trace("Channel {} received message type {}", channel, msgType);
+ switch (msgType) {
+ case Constants.MSG_SUBSCRIBE_REQ:
+ subscribe(channel, msg);
+ break;
+ default:
+ throw new IllegalStateException("Unexpected message type " + msgType);
+ }
+ }
+
+ private void subscribe(final Channel channel, final ByteBuf msg) throws IOException {
+ verify(reg == null, "Unexpected subscription when already subscribed");
+
+ final DOMDataTreeIdentifier dataTree;
+ try (ByteBufInputStream input = new ByteBufInputStream(msg)) {
+ final NormalizedNodeDataInput normalizedInput = NormalizedNodeDataInput.newDataInput(input);
+
+ dataTree = new DOMDataTreeIdentifier(LogicalDatastoreType.readFrom(normalizedInput),
+ normalizedInput.readYangInstanceIdentifier());
+ }
+
+ LOG.info("Channel {} subscribing to {}", channel, dataTree);
+ reg = dtcs.registerDataTreeChangeListener(dataTree, new ClusteredDOMDataTreeChangeListener() {
+ @Override
+ public void onInitialData() {
+ channel.writeAndFlush(AbstractSourceMessage.empty());
+ }
+
+ @Override
+ public void onDataTreeChanged(final Collection<DataTreeCandidate> changes) {
+ channel.writeAndFlush(AbstractSourceMessage.of(changes));
+ }
+ });
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.socket.SocketChannel;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
+import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
+import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Cluster Singleton Service handler for delta stream source. Responsible for starting/stopping the delta stream source
+ * for a particular port.
+ */
+final class SourceSingletonService extends ChannelInitializer<SocketChannel> implements ClusterSingletonService {
+ private static final Logger LOG = LoggerFactory.getLogger(SourceSingletonService.class);
+ private static final ServiceGroupIdentifier SGID =
+ ServiceGroupIdentifier.create(SourceSingletonService.class.getName());
+
+ private final BootstrapSupport bootstrapSupport;
+ private final DOMDataTreeChangeService dtcs;
+ private final int listenPort;
+
+ @GuardedBy("this")
+ private final Collection<SocketChannel> children = new HashSet<>();
+ @GuardedBy("this")
+ private Channel serverChannel;
+
+ SourceSingletonService(final BootstrapSupport bootstrapSupport, final DOMDataTreeChangeService dtcs,
+ final int listenPort) {
+ this.bootstrapSupport = requireNonNull(bootstrapSupport);
+ this.dtcs = requireNonNull(dtcs);
+ this.listenPort = listenPort;
+ }
+
+ @Override
+ public ServiceGroupIdentifier getIdentifier() {
+ return SGID;
+ }
+
+ @Override
+ public synchronized void instantiateServiceInstance() {
+ final ChannelFuture future = bootstrapSupport.newServerBootstrap()
+ .option(ChannelOption.SO_BACKLOG, 3)
+ .childOption(ChannelOption.SO_KEEPALIVE, true)
+ .childHandler(this)
+ .bind(listenPort);
+
+ try {
+ future.sync();
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("Failed to bind port " + listenPort, e);
+ }
+
+ serverChannel = future.channel();
+ LOG.info("Replication source started on port {}", listenPort);
+ }
+
+ @Override
+ public synchronized ListenableFuture<?> closeServiceInstance() {
+ LOG.info("Replication source on port {} shutting down", listenPort);
+
+ final List<ListenableFuture<Void>> futures = new ArrayList<>();
+
+ // Close server channel
+ futures.add(closeChannel(serverChannel));
+ serverChannel = null;
+
+ // Close all child channels
+ for (SocketChannel channel : children) {
+ futures.add(closeChannel(channel));
+ }
+ children.clear();
+
+ final ListenableFuture<?> ret = Futures.nonCancellationPropagating(Futures.successfulAsList(futures));
+ ret.addListener(() -> {
+ LOG.info("Replication source on port {} shut down", listenPort);
+ }, MoreExecutors.directExecutor());
+ return ret;
+ }
+
+ @Override
+ public synchronized void initChannel(final SocketChannel ch) {
+ if (serverChannel == null) {
+ LOG.debug("Channel {} established while shutting down, closing it", ch);
+ ch.close();
+ return;
+ }
+
+ ch.pipeline()
+ .addLast("frameDecoder", new MessageFrameDecoder())
+ .addLast("requestHandler", new SourceRequestHandler(dtcs))
+ .addLast("dtclHandler", new DeltaEncoder(NormalizedNodeStreamVersion.current()))
+ .addLast("frameEncoder", MessageFrameEncoder.instance());
+ children.add(ch);
+
+ LOG.debug("Channel {} established", ch);
+ }
+
+ private static ListenableFuture<Void> closeChannel(final Channel ch) {
+ final SettableFuture<Void> ret = SettableFuture.create();
+ ch.closeFuture().addListener(chf -> {
+ final Throwable cause = chf.cause();
+ if (cause != null) {
+ ret.setException(cause);
+ } else {
+ ret.set(null);
+ }
+ });
+
+ ch.close();
+ return ret;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+import static com.google.common.base.Verify.verify;
+import static java.util.Objects.requireNonNull;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * An OutputStream which makes sure to slice messages to a maximum size. This prevents array reallocations and
+ * GC thrashing on huge objects.
+ */
+final class SplittingOutputStream extends OutputStream {
+ private static final int INIT_BUF = 4096;
+
+ static {
+ verify(INIT_BUF <= Constants.LENGTH_FIELD_MAX);
+ }
+
+ private final List<Object> out;
+
+ private ByteBuf buf;
+
+ SplittingOutputStream(final List<Object> out) {
+ this.out = requireNonNull(out);
+ allocBuffer();
+ }
+
+ @Override
+ @SuppressWarnings("checkstyle:parameterName")
+ public void write(final int b) throws IOException {
+ buf.writeByte(b);
+ checkSend();
+ }
+
+ @Override
+ @SuppressWarnings("checkstyle:parameterName")
+ public void write(final byte[] b, final int off, final int len) throws IOException {
+ Objects.checkFromIndexSize(off, len, b.length);
+
+ int left = len;
+ int ptr = off;
+ while (left > 0) {
+ final int chunk = Math.min(Constants.LENGTH_FIELD_MAX - buf.writerIndex(), left);
+
+ buf.writeBytes(b, ptr, chunk);
+ ptr += chunk;
+ left -= chunk;
+ checkSend();
+ }
+ }
+
+ @Override
+ public void close() {
+ if (buf.writerIndex() != 0) {
+ out.add(buf);
+ }
+ buf = null;
+ }
+
+ private void allocBuffer() {
+ // FIXME: use buffer allocator?
+ buf = Unpooled.buffer(INIT_BUF, Constants.LENGTH_FIELD_MAX);
+ buf.writeByte(Constants.MSG_DTC_CHUNK);
+ }
+
+ private void checkSend() {
+ if (buf.writerIndex() == Constants.LENGTH_FIELD_MAX) {
+ out.add(buf);
+ allocBuffer();
+ }
+ }
+}
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0">
+ <bean id="bootstrapSupport" class="org.opendaylight.mdsal.replicate.netty.AbstractBootstrapSupport"
+ factory-method="create" destroy-method="close"/>
+ <service ref="bootstrapSupport" interface="org.opendaylight.mdsal.replicate.netty.BootstrapSupport"/>
+</blueprint>
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
+ xmlns:odl="http://opendaylight.org/xmlns/blueprint/v1.0.0"
+ xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.2.0">
+ <cm:property-placeholder persistent-id="org.opendaylight.mdsal.replicate.netty.sink" update-strategy="reload">
+ <cm:default-properties>
+ <cm:property name="enabled" value="false"/>
+ <cm:property name="source-host" value="127.0.0.1"/>
+ <cm:property name="source-port" value="9999"/>
+ <cm:property name="reconnect-delay-millis" value="3000"/>
+ </cm:default-properties>
+ </cm:property-placeholder>
+
+ <reference id="dataBroker" interface="org.opendaylight.mdsal.dom.api.DOMDataBroker" odl:type="default"/>
+ <reference id="singletonServiceProvider" interface="org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider"
+ odl:type="default"/>
+ <reference id="bootstrapSupport" interface="org.opendaylight.mdsal.replicate.netty.BootstrapSupport"/>
+
+ <bean id="reconnectDelay" class="java.time.Duration" factory-method="ofMillis">
+ <argument value="${reconnect-delay-millis}"/>
+ </bean>
+
+ <bean id="sourceAddress" class="java.net.InetAddress" factory-method="getByName">
+ <argument value="${source-host}"/>
+ </bean>
+
+ <bean id="nettyReplicationSink" class="org.opendaylight.mdsal.replicate.netty.NettyReplication"
+ factory-method="createSink" destroy-method="close">
+ <argument ref="bootstrapSupport"/>
+ <argument ref="dataBroker"/>
+ <argument ref="singletonServiceProvider"/>
+ <argument value="${enabled}"/>
+ <argument ref="sourceAddress"/>
+ <argument value="${source-port}"/>
+ <argument ref="reconnectDelay"/>
+ </bean>
+
+</blueprint>
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
+ xmlns:odl="http://opendaylight.org/xmlns/blueprint/v1.0.0"
+ xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.2.0">
+ <cm:property-placeholder persistent-id="org.opendaylight.mdsal.replicate.netty.source" update-strategy="reload">
+ <cm:default-properties>
+ <cm:property name="enabled" value="false"/>
+ <cm:property name="listen-port" value="9999"/>
+ </cm:default-properties>
+ </cm:property-placeholder>
+
+ <reference id="dataBroker" interface="org.opendaylight.mdsal.dom.api.DOMDataBroker" odl:type="default"/>
+ <reference id="singletonServiceProvider" interface="org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider"
+ odl:type="default"/>
+ <reference id="bootstrapSupport" interface="org.opendaylight.mdsal.replicate.netty.BootstrapSupport"/>
+
+ <bean id="nettyReplicationSource" class="org.opendaylight.mdsal.replicate.netty.NettyReplication"
+ factory-method="createSource" destroy-method="close">
+ <argument ref="bootstrapSupport"/>
+ <argument ref="dataBroker"/>
+ <argument ref="singletonServiceProvider"/>
+ <argument value="${enabled}"/>
+ <argument value="${listen-port}"/>
+ </bean>
+</blueprint>
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+
+ This program and the accompanying materials are made available under the
+ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.opendaylight.odlparent</groupId>
+ <artifactId>odlparent-lite</artifactId>
+ <version>6.0.7</version>
+ <relativePath/>
+ </parent>
+
+ <groupId>org.opendaylight.mdsal</groupId>
+ <artifactId>mdsal-replicate</artifactId>
+ <version>5.0.12-SNAPSHOT</version>
+ <packaging>pom</packaging>
+
+ <properties>
+ <maven.deploy.skip>true</maven.deploy.skip>
+ <maven.install.skip>true</maven.install.skip>
+ </properties>
+
+ <modules>
+ <module>mdsal-replicate-common</module>
+ <module>mdsal-replicate-netty</module>
+ </modules>
+</project>