Add basic netty replication utility 15/90515/4
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 17 Jun 2020 10:29:12 +0000 (12:29 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Mon, 22 Jun 2020 08:54:18 +0000 (10:54 +0200)
This adds a source/sink datastore replication component based on
Netty TCP channels. The sink (consumer) connects to source (producer)
via a channel, specifies which data tree it wants replicated and
then listens for DataTreeCandidates to arrive from the source.

Change-Id: Ib283baa9a186ae2fb4ccf909b257006d4645de37
Signed-off-by: Tibor Král <tibor.kral@pantheon.tech>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit d7b666857c54c07d9bf5c8e5e38671151d89fb4c)

32 files changed:
artifacts/pom.xml
docs/pom.xml
features/features-mdsal-experimental/pom.xml
features/odl-mdsal-exp-replicate-common/pom.xml [new file with mode: 0644]
features/odl-mdsal-exp-replicate-netty/pom.xml [new file with mode: 0644]
features/pom.xml
pom.xml
replicate/mdsal-replicate-common/pom.xml [new file with mode: 0644]
replicate/mdsal-replicate-common/src/main/java/org/opendaylight/mdsal/replicate/common/DOMDataBrokerModification.java [new file with mode: 0644]
replicate/mdsal-replicate-common/src/main/java/org/opendaylight/mdsal/replicate/common/DOMStoreModification.java [new file with mode: 0644]
replicate/mdsal-replicate-common/src/main/java/org/opendaylight/mdsal/replicate/common/DataTreeCandidateUtils.java [new file with mode: 0644]
replicate/mdsal-replicate-netty/pom.xml [new file with mode: 0644]
replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/AbstractBootstrapSupport.java [new file with mode: 0644]
replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/AbstractSourceMessage.java [new file with mode: 0644]
replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/BootstrapSupport.java [new file with mode: 0644]
replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/Constants.java [new file with mode: 0644]
replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/DeltaEncoder.java [new file with mode: 0644]
replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/EpollBootstrapSupport.java [new file with mode: 0644]
replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/MessageFrameDecoder.java [new file with mode: 0644]
replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/MessageFrameEncoder.java [new file with mode: 0644]
replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/NettyReplication.java [new file with mode: 0644]
replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/NioBootstrapSupport.java [new file with mode: 0644]
replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkRequestHandler.java [new file with mode: 0644]
replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkSingletonService.java [new file with mode: 0644]
replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkTransactionChainListener.java [new file with mode: 0644]
replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceRequestHandler.java [new file with mode: 0644]
replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceSingletonService.java [new file with mode: 0644]
replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SplittingOutputStream.java [new file with mode: 0644]
replicate/mdsal-replicate-netty/src/main/resources/org/opendaylight/blueprint/netty-replication-common.xml [new file with mode: 0644]
replicate/mdsal-replicate-netty/src/main/resources/org/opendaylight/blueprint/netty-replication-sink.xml [new file with mode: 0644]
replicate/mdsal-replicate-netty/src/main/resources/org/opendaylight/blueprint/netty-replication-source.xml [new file with mode: 0644]
replicate/pom.xml [new file with mode: 0644]

index b5a508e5d054de1a273b1d2dac6f457cb5c85cdd..0bdb2991eb3c5cfece79b7cc4e06426f9f55d432 100644 (file)
                 <type>xml</type>
             </dependency>
 
+            <!-- Replication -->
+            <dependency>
+                <groupId>${project.groupId}</groupId>
+                <artifactId>mdsal-replicate-common</artifactId>
+                <version>4.0.15-SNAPSHOT</version>
+            </dependency>
+            <dependency>
+                <groupId>${project.groupId}</groupId>
+                <artifactId>mdsal-replicate-netty</artifactId>
+                <version>4.0.15-SNAPSHOT</version>
+            </dependency>
+            <dependency>
+                <groupId>${project.groupId}</groupId>
+                <artifactId>odl-mdsal-exp-replicate-common</artifactId>
+                <version>4.0.15-SNAPSHOT</version>
+                <classifier>features</classifier>
+                <type>xml</type>
+            </dependency>
+            <dependency>
+                <groupId>${project.groupId}</groupId>
+                <artifactId>odl-mdsal-exp-replicate-netty</artifactId>
+                <version>4.0.15-SNAPSHOT</version>
+                <classifier>features</classifier>
+                <type>xml</type>
+            </dependency>
+
             <!-- MODELS -->
             <dependency>
                 <groupId>org.opendaylight.mdsal.model</groupId>
index 0135adc70e8a8b3da3e57e5f887226c86f251780..d79e0ebe8c95fdc2b844360d1055cdd6a059fb74 100644 (file)
             <artifactId>mdsal-yanglib-rfc8525</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.opendaylight.mdsal</groupId>
+            <artifactId>mdsal-replicate-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.mdsal</groupId>
+            <artifactId>mdsal-replicate-netty</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>org.opendaylight.mdsal.model</groupId>
             <artifactId>opendaylight-l2-types</artifactId>
index 2e97ff1b4371b93dc35a3de464b47fa14eb3611a..c6077faf905872dc277a0b240585a54394f3877e 100644 (file)
             <classifier>features</classifier>
             <type>xml</type>
         </dependency>
+
+        <dependency>
+            <groupId>org.opendaylight.mdsal</groupId>
+            <artifactId>odl-mdsal-exp-replicate-common</artifactId>
+            <classifier>features</classifier>
+            <type>xml</type>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.mdsal</groupId>
+            <artifactId>odl-mdsal-exp-replicate-netty</artifactId>
+            <classifier>features</classifier>
+            <type>xml</type>
+        </dependency>
     </dependencies>
 </project>
diff --git a/features/odl-mdsal-exp-replicate-common/pom.xml b/features/odl-mdsal-exp-replicate-common/pom.xml
new file mode 100644 (file)
index 0000000..e0e49c6
--- /dev/null
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+
+ This program and the accompanying materials are made available under the
+ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.opendaylight.mdsal</groupId>
+        <artifactId>feature-parent</artifactId>
+        <version>4.0.15-SNAPSHOT</version>
+        <relativePath>../feature-parent</relativePath>
+    </parent>
+
+    <artifactId>odl-mdsal-exp-replicate-common</artifactId>
+    <packaging>feature</packaging>
+
+    <name>OpenDaylight :: MD-SAL :: Replicate :: Common</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.opendaylight.mdsal</groupId>
+            <artifactId>odl-mdsal-dom-api</artifactId>
+            <classifier>features</classifier>
+            <type>xml</type>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.mdsal</groupId>
+            <artifactId>mdsal-replicate-common</artifactId>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/features/odl-mdsal-exp-replicate-netty/pom.xml b/features/odl-mdsal-exp-replicate-netty/pom.xml
new file mode 100644 (file)
index 0000000..07fd3d1
--- /dev/null
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+
+ This program and the accompanying materials are made available under the
+ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.opendaylight.mdsal</groupId>
+        <artifactId>feature-parent</artifactId>
+        <version>4.0.15-SNAPSHOT</version>
+        <relativePath>../feature-parent</relativePath>
+    </parent>
+
+    <artifactId>odl-mdsal-exp-replicate-netty</artifactId>
+    <packaging>feature</packaging>
+
+    <name>OpenDaylight :: MD-SAL :: Replicate :: Netty</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.opendaylight.mdsal</groupId>
+            <artifactId>odl-mdsal-exp-replicate-common</artifactId>
+            <classifier>features</classifier>
+            <type>xml</type>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.mdsal</groupId>
+            <artifactId>mdsal-replicate-netty</artifactId>
+        </dependency>
+    </dependencies>
+</project>
index 26c4de36dff14b87b0260d4a71ad0139c94e0fbb..49582b36e08fc4b3fabaae687ebc728156a910a3 100644 (file)
         <module>odl-mdsal-exp-yanglib-rfc7895</module>
         <module>odl-mdsal-exp-yanglib-rfc8525</module>
 
+        <!-- Datastore replication -->
+        <module>odl-mdsal-exp-replicate-common</module>
+        <module>odl-mdsal-exp-replicate-netty</module>
+
         <!-- Models -->
         <!-- Standards -->
         <module>odl-mdsal-model-rfc6991</module>
diff --git a/pom.xml b/pom.xml
index 1590edd2c9fd40b99df39ad3656ea214b97e2549..f5927e747d948c01fb7ec8fdaa4414b359f13f74 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -35,6 +35,9 @@
         <module>singleton-service</module>
         <module>trace</module>
 
+        <!-- Data store replicators -->
+        <module>replicate</module>
+
         <!-- IETF YANG (Module) Library -->
         <module>yanglib</module>
     </modules>
diff --git a/replicate/mdsal-replicate-common/pom.xml b/replicate/mdsal-replicate-common/pom.xml
new file mode 100644 (file)
index 0000000..1e0e183
--- /dev/null
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+
+ This program and the accompanying materials are made available under the
+ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.opendaylight.mdsal</groupId>
+        <artifactId>dom-parent</artifactId>
+        <version>4.0.15-SNAPSHOT</version>
+        <relativePath>../../dom/dom-parent</relativePath>
+    </parent>
+
+    <artifactId>mdsal-replicate-common</artifactId>
+    <packaging>bundle</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.opendaylight.yangtools</groupId>
+            <artifactId>yang-data-codec-binfmt</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.mdsal</groupId>
+            <artifactId>mdsal-dom-spi</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.mdsal</groupId>
+            <artifactId>mdsal-singleton-common-api</artifactId>
+        </dependency>
+    </dependencies>
+
+    <scm>
+        <connection>scm:git:http://git.opendaylight.org/gerrit/mdsal.git</connection>
+        <developerConnection>scm:git:ssh://git.opendaylight.org:29418/mdsal.git</developerConnection>
+        <tag>HEAD</tag>
+        <url>https://wiki.opendaylight.org/view/MD-SAL:Main</url>
+    </scm>
+</project>
diff --git a/replicate/mdsal-replicate-common/src/main/java/org/opendaylight/mdsal/replicate/common/DOMDataBrokerModification.java b/replicate/mdsal-replicate-common/src/main/java/org/opendaylight/mdsal/replicate/common/DOMDataBrokerModification.java
new file mode 100644 (file)
index 0000000..40e2e69
--- /dev/null
@@ -0,0 +1,75 @@
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.common;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Optional;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteOperations;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class DOMDataBrokerModification implements DataTreeModification {
+    private static final Logger LOG = LoggerFactory.getLogger(DOMDataBrokerModification.class);
+
+    private final DOMDataTreeWriteOperations transaction;
+    private final LogicalDatastoreType datastore;
+
+    DOMDataBrokerModification(final DOMDataTreeWriteOperations transaction, final LogicalDatastoreType datastore) {
+        this.transaction = requireNonNull(transaction);
+        this.datastore = requireNonNull(datastore);
+    }
+
+    @Override
+    public void delete(final YangInstanceIdentifier path) {
+        LOG.trace("BackupModification - DELETE - {}", path);
+        transaction.delete(datastore, path);
+    }
+
+    @Override
+    public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        LOG.trace("BackupModification - WRITE - {} - DATA: {}", path, data);
+        transaction.put(datastore, path, data);
+    }
+
+    @Override
+    public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void ready() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void applyToCursor(final DataTreeModificationCursor cursor) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Optional<NormalizedNode<?, ?>> readNode(final YangInstanceIdentifier path) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public DataTreeModification newModification() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public SchemaContext getSchemaContext() {
+        throw new UnsupportedOperationException();
+    }
+}
diff --git a/replicate/mdsal-replicate-common/src/main/java/org/opendaylight/mdsal/replicate/common/DOMStoreModification.java b/replicate/mdsal-replicate-common/src/main/java/org/opendaylight/mdsal/replicate/common/DOMStoreModification.java
new file mode 100644 (file)
index 0000000..6d1d721
--- /dev/null
@@ -0,0 +1,73 @@
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.common;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Optional;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class DOMStoreModification implements DataTreeModification {
+    private static final Logger LOG = LoggerFactory.getLogger(DOMStoreModification.class);
+
+    private final DOMStoreWriteTransaction transaction;
+
+    DOMStoreModification(final DOMStoreWriteTransaction transaction) {
+        this.transaction = requireNonNull(transaction);
+    }
+
+    @Override
+    public void delete(final YangInstanceIdentifier path) {
+        LOG.trace("Delete {}", path);
+        transaction.delete(path);
+    }
+
+    @Override
+    public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        LOG.trace("Write {} data {}", path, data);
+        transaction.write(path, data);
+    }
+
+    @Override
+    public Optional<NormalizedNode<?, ?>> readNode(final YangInstanceIdentifier path) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public @NonNull DataTreeModification newModification() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public SchemaContext getSchemaContext() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void ready() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void applyToCursor(@NonNull final DataTreeModificationCursor cursor) {
+        throw new UnsupportedOperationException();
+    }
+}
diff --git a/replicate/mdsal-replicate-common/src/main/java/org/opendaylight/mdsal/replicate/common/DataTreeCandidateUtils.java b/replicate/mdsal-replicate-common/src/main/java/org/opendaylight/mdsal/replicate/common/DataTreeCandidateUtils.java
new file mode 100644 (file)
index 0000000..d7e5d5f
--- /dev/null
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.common;
+
+import com.google.common.annotations.Beta;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteOperations;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
+
+@Beta
+public final class DataTreeCandidateUtils {
+    private DataTreeCandidateUtils() {
+        // Hidden on purpose
+    }
+
+    public static void applyToTransaction(final DOMDataTreeWriteOperations transaction,
+            final LogicalDatastoreType datastore, final DataTreeCandidate candidate) {
+        DataTreeCandidates.applyToModification(new DOMDataBrokerModification(transaction, datastore), candidate);
+    }
+
+    public static void applyToTransaction(final DOMStoreWriteTransaction transaction,
+            final DataTreeCandidate candidate) {
+        DataTreeCandidates.applyToModification(new DOMStoreModification(transaction), candidate);
+    }
+}
diff --git a/replicate/mdsal-replicate-netty/pom.xml b/replicate/mdsal-replicate-netty/pom.xml
new file mode 100644 (file)
index 0000000..1b4b00e
--- /dev/null
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+
+ This program and the accompanying materials are made available under the
+ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.opendaylight.mdsal</groupId>
+        <artifactId>dom-parent</artifactId>
+        <version>4.0.15-SNAPSHOT</version>
+        <relativePath>../../dom/dom-parent</relativePath>
+    </parent>
+
+    <artifactId>mdsal-replicate-netty</artifactId>
+    <packaging>bundle</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.opendaylight.mdsal</groupId>
+            <artifactId>mdsal-replicate-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-handler</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-transport</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-transport-native-epoll</artifactId>
+            <optional>true</optional>
+        </dependency>
+
+    </dependencies>
+
+    <scm>
+        <connection>scm:git:http://git.opendaylight.org/gerrit/mdsal.git</connection>
+        <developerConnection>scm:git:ssh://git.opendaylight.org:29418/mdsal.git</developerConnection>
+        <tag>HEAD</tag>
+        <url>https://wiki.opendaylight.org/view/MD-SAL:Main</url>
+    </scm>
+</project>
diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/AbstractBootstrapSupport.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/AbstractBootstrapSupport.java
new file mode 100644 (file)
index 0000000..926a776
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+import static java.util.Objects.requireNonNull;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.socket.ServerSocketChannel;
+import java.util.concurrent.TimeUnit;
+import org.eclipse.jdt.annotation.NonNull;
+
+public abstract class AbstractBootstrapSupport implements AutoCloseable, BootstrapSupport {
+    private final Class<? extends Channel> channelClass;
+    private final Class<? extends ServerSocketChannel> serverChannelClass;
+    private final EventLoopGroup bossGroup;
+    private final EventLoopGroup workerGroup;
+
+    AbstractBootstrapSupport(final Class<? extends Channel> channelClass,
+            final Class<? extends ServerSocketChannel> serverChannelClass, final EventLoopGroup bossGroup,
+            final EventLoopGroup workerGroup) {
+        this.channelClass = requireNonNull(channelClass);
+        this.serverChannelClass = requireNonNull(serverChannelClass);
+        this.bossGroup = requireNonNull(bossGroup);
+        this.workerGroup = requireNonNull(workerGroup);
+    }
+
+    public static @NonNull BootstrapSupport create() {
+        if (Epoll.isAvailable()) {
+            return new EpollBootstrapSupport();
+        }
+        return new NioBootstrapSupport();
+    }
+
+    @Override
+    public final Bootstrap newBootstrap() {
+        return new Bootstrap().group(workerGroup).channel(channelClass);
+    }
+
+    @Override
+    public  final ServerBootstrap newServerBootstrap() {
+        return new ServerBootstrap().group(bossGroup, workerGroup).channel(serverChannelClass);
+    }
+
+    @Override
+    public final void close() throws InterruptedException {
+        bossGroup.shutdownGracefully();
+        workerGroup.shutdownGracefully();
+
+        bossGroup.awaitTermination(10, TimeUnit.SECONDS);
+        workerGroup.awaitTermination(10, TimeUnit.SECONDS);
+    }
+}
diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/AbstractSourceMessage.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/AbstractSourceMessage.java
new file mode 100644 (file)
index 0000000..0df9b42
--- /dev/null
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.codec.binfmt.DataTreeCandidateInputOutput;
+import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeDataOutput;
+import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion;
+
+abstract class AbstractSourceMessage {
+    private static final class Empty extends AbstractSourceMessage {
+        @Override
+        void encodeTo(final NormalizedNodeStreamVersion version, final List<Object> out) throws IOException {
+            out.add(Constants.EMPTY_DATA);
+        }
+    }
+
+    private static final class Deltas extends AbstractSourceMessage {
+        private final Collection<DataTreeCandidate> deltas;
+
+        Deltas(final Collection<DataTreeCandidate> deltas) {
+            this.deltas = requireNonNull(deltas);
+        }
+
+        @Override
+        void encodeTo(final NormalizedNodeStreamVersion version, final List<Object> out) throws IOException {
+            for (DataTreeCandidate candidate : deltas) {
+                try (DataOutputStream stream = new DataOutputStream(new SplittingOutputStream(out))) {
+                    try (NormalizedNodeDataOutput output = version.newDataOutput(stream)) {
+                        DataTreeCandidateInputOutput.writeDataTreeCandidate(output, candidate);
+                    }
+                }
+                out.add(Constants.DTC_APPLY);
+            }
+        }
+    }
+
+    private static final AbstractSourceMessage EMPTY = new Empty();
+
+    static AbstractSourceMessage empty() {
+        return EMPTY;
+    }
+
+    static AbstractSourceMessage of(final Collection<DataTreeCandidate> deltas) {
+        return new Deltas(deltas);
+    }
+
+    abstract void encodeTo(NormalizedNodeStreamVersion version, List<Object> out) throws IOException;
+}
diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/BootstrapSupport.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/BootstrapSupport.java
new file mode 100644 (file)
index 0000000..10c1be4
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+import com.google.common.annotations.Beta;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import org.eclipse.jdt.annotation.NonNull;
+
+@Beta
+public interface BootstrapSupport {
+
+    @NonNull Bootstrap newBootstrap();
+
+    @NonNull ServerBootstrap newServerBootstrap();
+}
\ No newline at end of file
diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/Constants.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/Constants.java
new file mode 100644 (file)
index 0000000..eddf8d6
--- /dev/null
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+final class Constants {
+    /**
+     * Subscribe request message. This is the only valid initial message in the sink->source direction. Its payload is
+     * composed of a binary normalized node stream. The stream must contain a {@link LogicalDatastoreType} serialized
+     * via {@link LogicalDatastoreType#writeTo(java.io.DataOutput)} followed by a single {@link YangInstanceIdentifier}.
+     */
+    static final byte MSG_SUBSCRIBE_REQ = 1;
+    /**
+     * Initial data indicating non-presence of the subscribed path.
+     */
+    static final byte MSG_EMPTY_DATA    = 2;
+    /**
+     * A chunk of the DataTreeCandidate serialization stream. May be followed by another chunk or
+     * {@link #MSG_DTC_APPLY}.
+     */
+    static final byte MSG_DTC_CHUNK     = 3;
+    /**
+     * End-of-DataTreeCandidate serialization stream. The payload is empty.
+     */
+    static final byte MSG_DTC_APPLY     = 4;
+
+    /**
+     * Length of the length field in each transmitted frame.
+     */
+    static final int LENGTH_FIELD_LENGTH = 4;
+    /**
+     * Maximum frame size allowed by encoding, 1MiB.
+     */
+    static final int LENGTH_FIELD_MAX    = 1024 * 1024;
+
+    static final ByteBuf EMPTY_DATA = Unpooled.wrappedBuffer(new byte[] { MSG_EMPTY_DATA });
+    static final ByteBuf DTC_APPLY = Unpooled.wrappedBuffer(new byte[] { MSG_DTC_APPLY });
+
+    private Constants() {
+        // Hidden on purpose
+    }
+}
diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/DeltaEncoder.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/DeltaEncoder.java
new file mode 100644 (file)
index 0000000..96d6740
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+import static java.util.Objects.requireNonNull;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageEncoder;
+import java.io.IOException;
+import java.util.List;
+import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion;
+
+final class DeltaEncoder extends MessageToMessageEncoder<AbstractSourceMessage> {
+    private final NormalizedNodeStreamVersion version;
+
+    DeltaEncoder(final NormalizedNodeStreamVersion version) {
+        this.version = requireNonNull(version);
+    }
+
+    @Override
+    protected void encode(final ChannelHandlerContext ctx, final AbstractSourceMessage msg, final List<Object> out)
+            throws IOException {
+        msg.encodeTo(version, out);
+    }
+}
diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/EpollBootstrapSupport.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/EpollBootstrapSupport.java
new file mode 100644 (file)
index 0000000..f5527c4
--- /dev/null
@@ -0,0 +1,19 @@
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.epoll.EpollSocketChannel;
+
+final class EpollBootstrapSupport extends AbstractBootstrapSupport {
+    EpollBootstrapSupport() {
+        super(EpollSocketChannel.class, EpollServerSocketChannel.class, new EpollEventLoopGroup(),
+            new EpollEventLoopGroup());
+    }
+}
\ No newline at end of file
diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/MessageFrameDecoder.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/MessageFrameDecoder.java
new file mode 100644 (file)
index 0000000..876847e
--- /dev/null
@@ -0,0 +1,16 @@
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+
+final class MessageFrameDecoder extends LengthFieldBasedFrameDecoder {
+    MessageFrameDecoder() {
+        super(Constants.LENGTH_FIELD_MAX, 0, Constants.LENGTH_FIELD_LENGTH, 0, Constants.LENGTH_FIELD_LENGTH);
+    }
+}
diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/MessageFrameEncoder.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/MessageFrameEncoder.java
new file mode 100644 (file)
index 0000000..f6d9347
--- /dev/null
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.handler.codec.LengthFieldPrepender;
+
+@Sharable
+final class MessageFrameEncoder extends LengthFieldPrepender {
+    private static final MessageFrameEncoder INSTANCE = new MessageFrameEncoder();
+
+    private MessageFrameEncoder() {
+        super(Constants.LENGTH_FIELD_LENGTH);
+    }
+
+    static MessageFrameEncoder instance() {
+        return INSTANCE;
+    }
+}
diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/NettyReplication.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/NettyReplication.java
new file mode 100644 (file)
index 0000000..7e396e8
--- /dev/null
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+import static com.google.common.base.Verify.verify;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import org.opendaylight.mdsal.dom.api.DOMDataBroker;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
+import org.opendaylight.yangtools.concepts.AbstractRegistration;
+import org.opendaylight.yangtools.concepts.Registration;
+
+public final class NettyReplication {
+    private static final class Disabled extends AbstractRegistration {
+        @Override
+        protected void removeRegistration() {
+            // no-op
+        }
+    }
+
+    private NettyReplication() {
+        // Hidden on purpose
+    }
+
+    public static Registration createSink(final BootstrapSupport bootstrapSupport, final DOMDataBroker dataBroker,
+            final ClusterSingletonServiceProvider singletonService, final boolean enabled,
+            final InetAddress sourceAddress, final int sourcePort, final Duration reconnectDelay) {
+        return enabled ? singletonService.registerClusterSingletonService(new SinkSingletonService(bootstrapSupport,
+            dataBroker, new InetSocketAddress(sourceAddress, sourcePort), reconnectDelay)) : new Disabled();
+    }
+
+    public static Registration createSource(final BootstrapSupport bootstrapSupport, final DOMDataBroker dataBroker,
+            final ClusterSingletonServiceProvider singletonService, final boolean enabled, final int listenPort) {
+        final DOMDataTreeChangeService dtcs = dataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
+        verify(dtcs != null, "Missing DOMDataTreeChangeService in broker %s", dataBroker);
+
+        return enabled ? singletonService.registerClusterSingletonService(new SourceSingletonService(bootstrapSupport,
+            dtcs, listenPort)) : new Disabled();
+    }
+}
diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/NioBootstrapSupport.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/NioBootstrapSupport.java
new file mode 100644 (file)
index 0000000..116aa6c
--- /dev/null
@@ -0,0 +1,18 @@
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
+final class NioBootstrapSupport extends AbstractBootstrapSupport {
+    NioBootstrapSupport() {
+        super(NioSocketChannel.class, NioServerSocketChannel.class, new NioEventLoopGroup(), new NioEventLoopGroup());
+    }
+}
\ No newline at end of file
diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkRequestHandler.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkRequestHandler.java
new file mode 100644 (file)
index 0000000..7ead9c2
--- /dev/null
@@ -0,0 +1,109 @@
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Verify.verify;
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
+import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
+import org.opendaylight.mdsal.replicate.common.DataTreeCandidateUtils;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.ReusableStreamReceiver;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.codec.binfmt.DataTreeCandidateInputOutput;
+import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeDataInput;
+import org.opendaylight.yangtools.yang.data.impl.schema.ReusableImmutableNormalizedNodeStreamWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class SinkRequestHandler extends SimpleChannelInboundHandler<ByteBuf> {
+    private static final Logger LOG = LoggerFactory.getLogger(SinkRequestHandler.class);
+
+    private final ReusableStreamReceiver receiver = ReusableImmutableNormalizedNodeStreamWriter.create();
+    private final List<ByteBuf> chunks = new ArrayList<>();
+    private final DOMDataTreeIdentifier tree;
+    private final DOMTransactionChain chain;
+
+    SinkRequestHandler(final DOMDataTreeIdentifier tree, final DOMTransactionChain chain) {
+        this.tree = requireNonNull(tree);
+        this.chain = requireNonNull(chain);
+    }
+
+    @Override
+    protected void channelRead0(final ChannelHandlerContext ctx, final ByteBuf msg) throws IOException {
+        verify(msg.isReadable(), "Empty message received");
+
+        final short msgType = msg.readUnsignedByte();
+        final Channel channel = ctx.channel();
+        LOG.trace("Channel {} received message type {}", channel, msgType);
+        switch (msgType) {
+            case Constants.MSG_EMPTY_DATA:
+                handleEmptyData();
+                break;
+            case Constants.MSG_DTC_CHUNK:
+                chunks.add(msg);
+                break;
+            case Constants.MSG_DTC_APPLY:
+                handleDtcApply();
+                break;
+            default:
+                throw new IllegalStateException("Unexpected message type " + msgType);
+        }
+    }
+
+    private void handleEmptyData() {
+        final DOMDataTreeWriteTransaction tx = chain.newWriteOnlyTransaction();
+        tx.delete(tree.getDatastoreType(), tree.getRootIdentifier());
+        commit(tx);
+    }
+
+    private void handleDtcApply() throws IOException {
+        checkState(!chunks.isEmpty(), "No chunks to apply");
+
+        final ByteBuf bufs = Unpooled.wrappedBuffer(chunks.toArray(new ByteBuf[0]));
+        chunks.clear();
+
+        final DataTreeCandidate candidate;
+        try (ByteBufInputStream stream = new ByteBufInputStream(bufs)) {
+            candidate = DataTreeCandidateInputOutput.readDataTreeCandidate(NormalizedNodeDataInput.newDataInput(stream),
+                receiver);
+        }
+
+        final DOMDataTreeWriteTransaction tx = chain.newWriteOnlyTransaction();
+        DataTreeCandidateUtils.applyToTransaction(tx, tree.getDatastoreType(), candidate);
+        commit(tx);
+    }
+
+    private static void commit(final DOMDataTreeWriteTransaction tx) {
+        tx.commit().addCallback(new FutureCallback<CommitInfo>() {
+            @Override
+            public void onSuccess(final CommitInfo result) {
+                LOG.trace("Transaction committed with {}", result);
+            }
+
+            @Override
+            public void onFailure(final Throwable cause) {
+                // Handled by transaction chain listener
+            }
+        }, MoreExecutors.directExecutor());
+    }
+}
diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkSingletonService.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkSingletonService.java
new file mode 100644 (file)
index 0000000..c2b5f4a
--- /dev/null
@@ -0,0 +1,123 @@
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelOption;
+import io.netty.util.concurrent.Future;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.concurrent.ScheduledExecutorService;
+import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataBroker;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
+import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeDataOutput;
+import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class SinkSingletonService implements ClusterSingletonService {
+    private static final Logger LOG = LoggerFactory.getLogger(SinkSingletonService.class);
+    private static final ServiceGroupIdentifier SGID =
+            ServiceGroupIdentifier.create(SinkSingletonService.class.getName());
+    // TODO: allow different trees?
+    private static final DOMDataTreeIdentifier TREE = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
+        YangInstanceIdentifier.empty());
+    private static final ByteBuf TREE_REQUEST;
+
+    static {
+        try {
+            TREE_REQUEST = requestTree(TREE);
+        } catch (IOException e) {
+            throw new ExceptionInInitializerError(e);
+        }
+    }
+
+    private final BootstrapSupport bootstrapSupport;
+    private final DOMDataBroker dataBroker;
+    private final InetSocketAddress sourceAddress;
+    private final Duration reconnectDelay;
+
+    @GuardedBy("this")
+    private ChannelFuture futureChannel;
+
+    SinkSingletonService(final BootstrapSupport bootstrapSupport, final DOMDataBroker dataBroker,
+            final InetSocketAddress sourceAddress, final Duration reconnectDelay) {
+        this.bootstrapSupport = requireNonNull(bootstrapSupport);
+        this.dataBroker = requireNonNull(dataBroker);
+        this.sourceAddress = requireNonNull(sourceAddress);
+        this.reconnectDelay = requireNonNull(reconnectDelay);
+    }
+
+    @Override
+    public ServiceGroupIdentifier getIdentifier() {
+        return SGID;
+    }
+
+    @Override
+    public synchronized void instantiateServiceInstance() {
+        LOG.info("Replication sink started with source {}", sourceAddress);
+
+        final Bootstrap bs = bootstrapSupport.newBootstrap();
+        final ScheduledExecutorService group = bs.config().group();
+
+        futureChannel = bs
+                .option(ChannelOption.SO_KEEPALIVE, true)
+                .connect(sourceAddress, null);
+
+        futureChannel.addListener(compl -> channelResolved(compl, group));
+    }
+
+    @Override
+    public synchronized ListenableFuture<?> closeServiceInstance() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    private synchronized void channelResolved(final Future<?> completedFuture, final ScheduledExecutorService group) {
+        if (completedFuture != futureChannel) {
+            // Future changed, this callback is irrelevant
+            return;
+        }
+
+        final Channel channel = futureChannel.channel();
+        channel.pipeline()
+            .addLast("frameDecoder", new MessageFrameDecoder())
+            .addLast("requestHandler", new SinkRequestHandler(TREE, dataBroker.createMergingTransactionChain(
+                new SinkTransactionChainListener(channel))))
+            .addLast("frameEncoder", MessageFrameEncoder.instance());
+
+        channel.writeAndFlush(TREE_REQUEST);
+    }
+
+    private static ByteBuf requestTree(final DOMDataTreeIdentifier tree) throws IOException {
+        final ByteBuf ret = Unpooled.buffer();
+
+        try (ByteBufOutputStream stream = new ByteBufOutputStream(ret)) {
+            try (NormalizedNodeDataOutput output = NormalizedNodeStreamVersion.current().newDataOutput(stream)) {
+                tree.getDatastoreType().writeTo(output);
+                output.writeYangInstanceIdentifier(tree.getRootIdentifier());
+            }
+        }
+
+        return ret;
+    }
+}
diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkTransactionChainListener.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkTransactionChainListener.java
new file mode 100644 (file)
index 0000000..8b6c139
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+import static java.util.Objects.requireNonNull;
+
+import io.netty.channel.Channel;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeTransaction;
+import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
+import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class SinkTransactionChainListener implements DOMTransactionChainListener {
+    private static final Logger LOG = LoggerFactory.getLogger(SinkTransactionChainListener.class);
+
+    private final Channel channel;
+
+    SinkTransactionChainListener(final Channel channel) {
+        this.channel = requireNonNull(channel);
+    }
+
+    @Override
+    public void onTransactionChainFailed(final DOMTransactionChain chain, final DOMDataTreeTransaction transaction,
+            final Throwable cause) {
+        LOG.error("Transaction chain for channel {} failed", channel, cause);
+        channel.close();
+    }
+
+    @Override
+    public void onTransactionChainSuccessful(final DOMTransactionChain chain) {
+        LOG.info("Transaction chain for channel {} completed", channel);
+    }
+}
diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceRequestHandler.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceRequestHandler.java
new file mode 100644 (file)
index 0000000..f302ca1
--- /dev/null
@@ -0,0 +1,94 @@
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+import static com.google.common.base.Verify.verify;
+import static java.util.Objects.requireNonNull;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import java.io.IOException;
+import java.util.Collection;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.ClusteredDOMDataTreeChangeListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeDataInput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Final inbound handler on source side. Handles requests coming from sink and reacts to them.
+ */
+final class SourceRequestHandler extends SimpleChannelInboundHandler<ByteBuf> {
+    private static final Logger LOG = LoggerFactory.getLogger(SourceRequestHandler.class);
+
+    private final DOMDataTreeChangeService dtcs;
+
+    private ListenerRegistration<?> reg;
+
+    SourceRequestHandler(final DOMDataTreeChangeService dtcs) {
+        this.dtcs = requireNonNull(dtcs);
+    }
+
+    @Override
+    public void channelInactive(final ChannelHandlerContext ctx) {
+        LOG.trace("Channel {} going inactive", ctx.channel());
+        if (reg != null) {
+            reg.close();
+            reg = null;
+        }
+        ctx.fireChannelInactive();
+    }
+
+    @Override
+    protected void channelRead0(final ChannelHandlerContext ctx, final ByteBuf msg) throws IOException {
+        verify(msg.isReadable(), "Empty message received");
+
+        final short msgType = msg.readUnsignedByte();
+        final Channel channel = ctx.channel();
+        LOG.trace("Channel {} received message type {}", channel, msgType);
+        switch (msgType) {
+            case Constants.MSG_SUBSCRIBE_REQ:
+                subscribe(channel, msg);
+                break;
+            default:
+                throw new IllegalStateException("Unexpected message type " + msgType);
+        }
+    }
+
+    private void subscribe(final Channel channel, final ByteBuf msg) throws IOException {
+        verify(reg == null, "Unexpected subscription when already subscribed");
+
+        final DOMDataTreeIdentifier dataTree;
+        try (ByteBufInputStream input = new ByteBufInputStream(msg)) {
+            final NormalizedNodeDataInput normalizedInput = NormalizedNodeDataInput.newDataInput(input);
+
+            dataTree = new DOMDataTreeIdentifier(LogicalDatastoreType.readFrom(normalizedInput),
+                normalizedInput.readYangInstanceIdentifier());
+        }
+
+        LOG.info("Channel {} subscribing to {}", channel, dataTree);
+        reg = dtcs.registerDataTreeChangeListener(dataTree, new ClusteredDOMDataTreeChangeListener() {
+            @Override
+            public void onInitialData() {
+                channel.writeAndFlush(AbstractSourceMessage.empty());
+            }
+
+            @Override
+            public void onDataTreeChanged(final Collection<DataTreeCandidate> changes) {
+                channel.writeAndFlush(AbstractSourceMessage.of(changes));
+            }
+        });
+    }
+}
diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceSingletonService.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceSingletonService.java
new file mode 100644 (file)
index 0000000..4156372
--- /dev/null
@@ -0,0 +1,136 @@
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.socket.SocketChannel;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
+import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
+import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Cluster Singleton Service handler for delta stream source. Responsible for starting/stopping the delta stream source
+ * for a particular port.
+ */
+final class SourceSingletonService extends ChannelInitializer<SocketChannel> implements ClusterSingletonService {
+    private static final Logger LOG = LoggerFactory.getLogger(SourceSingletonService.class);
+    private static final ServiceGroupIdentifier SGID =
+            ServiceGroupIdentifier.create(SourceSingletonService.class.getName());
+
+    private final BootstrapSupport bootstrapSupport;
+    private final DOMDataTreeChangeService dtcs;
+    private final int listenPort;
+
+    @GuardedBy("this")
+    private final Collection<SocketChannel> children = new HashSet<>();
+    @GuardedBy("this")
+    private Channel serverChannel;
+
+    SourceSingletonService(final BootstrapSupport bootstrapSupport, final DOMDataTreeChangeService dtcs,
+            final int listenPort) {
+        this.bootstrapSupport = requireNonNull(bootstrapSupport);
+        this.dtcs = requireNonNull(dtcs);
+        this.listenPort = listenPort;
+    }
+
+    @Override
+    public ServiceGroupIdentifier getIdentifier() {
+        return SGID;
+    }
+
+    @Override
+    public synchronized void instantiateServiceInstance() {
+        final ChannelFuture future = bootstrapSupport.newServerBootstrap()
+                .option(ChannelOption.SO_BACKLOG, 3)
+                .childOption(ChannelOption.SO_KEEPALIVE, true)
+                .childHandler(this)
+                .bind(listenPort);
+
+        try {
+            future.sync();
+        } catch (InterruptedException e) {
+            throw new IllegalStateException("Failed to bind port " + listenPort, e);
+        }
+
+        serverChannel = future.channel();
+        LOG.info("Replication source started on port {}", listenPort);
+    }
+
+    @Override
+    public synchronized ListenableFuture<?> closeServiceInstance() {
+        LOG.info("Replication source on port {} shutting down", listenPort);
+
+        final List<ListenableFuture<Void>> futures = new ArrayList<>();
+
+        // Close server channel
+        futures.add(closeChannel(serverChannel));
+        serverChannel = null;
+
+        // Close all child channels
+        for (SocketChannel channel : children) {
+            futures.add(closeChannel(channel));
+        }
+        children.clear();
+
+        final ListenableFuture<?> ret = Futures.nonCancellationPropagating(Futures.successfulAsList(futures));
+        ret.addListener(() -> {
+            LOG.info("Replication source on port {} shut down", listenPort);
+        }, MoreExecutors.directExecutor());
+        return ret;
+    }
+
+    @Override
+    public synchronized void initChannel(final SocketChannel ch) {
+        if (serverChannel == null) {
+            LOG.debug("Channel {} established while shutting down, closing it", ch);
+            ch.close();
+            return;
+        }
+
+        ch.pipeline()
+            .addLast("frameDecoder", new MessageFrameDecoder())
+            .addLast("requestHandler", new SourceRequestHandler(dtcs))
+            .addLast("dtclHandler", new DeltaEncoder(NormalizedNodeStreamVersion.current()))
+            .addLast("frameEncoder", MessageFrameEncoder.instance());
+        children.add(ch);
+
+        LOG.debug("Channel {} established", ch);
+    }
+
+    private static ListenableFuture<Void> closeChannel(final Channel ch) {
+        final SettableFuture<Void> ret = SettableFuture.create();
+        ch.closeFuture().addListener(chf -> {
+            final Throwable cause = chf.cause();
+            if (cause != null) {
+                ret.setException(cause);
+            } else {
+                ret.set(null);
+            }
+        });
+
+        ch.close();
+        return ret;
+    }
+}
diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SplittingOutputStream.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SplittingOutputStream.java
new file mode 100644 (file)
index 0000000..a01b7bd
--- /dev/null
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+import static com.google.common.base.Verify.verify;
+import static java.util.Objects.requireNonNull;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * An OutputStream which makes sure to slice messages to a maximum size. This prevents array reallocations and
+ * GC thrashing on huge objects.
+ */
+final class SplittingOutputStream extends OutputStream {
+    private static final int INIT_BUF = 4096;
+
+    static {
+        verify(INIT_BUF <= Constants.LENGTH_FIELD_MAX);
+    }
+
+    private final List<Object> out;
+
+    private ByteBuf buf;
+
+    SplittingOutputStream(final List<Object> out) {
+        this.out = requireNonNull(out);
+        allocBuffer();
+    }
+
+    @Override
+    @SuppressWarnings("checkstyle:parameterName")
+    public void write(final int b) throws IOException {
+        buf.writeByte(b);
+        checkSend();
+    }
+
+    @Override
+    @SuppressWarnings("checkstyle:parameterName")
+    public void write(final byte[] b, final int off, final int len) throws IOException {
+        if (off < 0 || len < 0) {
+            throw new IndexOutOfBoundsException();
+        }
+
+        int left = len;
+        int ptr = off;
+        while (left > 0) {
+            final int chunk = Math.min(Constants.LENGTH_FIELD_MAX - buf.writerIndex(), left);
+
+            buf.writeBytes(b, ptr, chunk);
+            ptr += chunk;
+            left -= chunk;
+            checkSend();
+        }
+    }
+
+    @Override
+    public void close() {
+        if (buf.writerIndex() != 0) {
+            out.add(buf);
+        }
+        buf = null;
+    }
+
+    private void allocBuffer() {
+        // FIXME: use buffer allocator?
+        buf = Unpooled.buffer(INIT_BUF, Constants.LENGTH_FIELD_MAX);
+        buf.writeByte(Constants.MSG_DTC_CHUNK);
+    }
+
+    private void checkSend() {
+        if (buf.writerIndex() == Constants.LENGTH_FIELD_MAX) {
+            out.add(buf);
+            allocBuffer();
+        }
+    }
+}
diff --git a/replicate/mdsal-replicate-netty/src/main/resources/org/opendaylight/blueprint/netty-replication-common.xml b/replicate/mdsal-replicate-netty/src/main/resources/org/opendaylight/blueprint/netty-replication-common.xml
new file mode 100644 (file)
index 0000000..00dc595
--- /dev/null
@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0">
+  <bean id="bootstrapSupport" class="org.opendaylight.mdsal.replicate.netty.AbstractBootstrapSupport"
+               factory-method="create" destroy-method="close"/>
+  <service ref="bootstrapSupport" interface="org.opendaylight.mdsal.replicate.netty.BootstrapSupport"/>
+</blueprint>
diff --git a/replicate/mdsal-replicate-netty/src/main/resources/org/opendaylight/blueprint/netty-replication-sink.xml b/replicate/mdsal-replicate-netty/src/main/resources/org/opendaylight/blueprint/netty-replication-sink.xml
new file mode 100644 (file)
index 0000000..9a76c3d
--- /dev/null
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
+           xmlns:odl="http://opendaylight.org/xmlns/blueprint/v1.0.0"
+           xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.2.0">
+  <cm:property-placeholder persistent-id="org.opendaylight.mdsal.replicate.netty.sink" update-strategy="reload">
+    <cm:default-properties>
+      <cm:property name="enabled" value="false"/>
+      <cm:property name="source-host" value="127.0.0.1"/>
+      <cm:property name="source-port" value="9999"/>
+      <cm:property name="reconnect-delay-millis" value="3000"/>
+    </cm:default-properties>
+  </cm:property-placeholder>
+
+  <reference id="dataBroker" interface="org.opendaylight.mdsal.dom.api.DOMDataBroker" odl:type="default"/>
+  <reference id="singletonServiceProvider" interface="org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider"
+             odl:type="default"/>
+  <reference id="bootstrapSupport" interface="org.opendaylight.mdsal.replicate.netty.BootstrapSupport"/>
+
+  <bean id="reconnectDelay" class="java.time.Duration" factory-method="ofMillis">
+    <argument value="${reconnect-delay-millis}"/>
+  </bean>
+
+  <bean id="sourceAddress" class="java.net.InetAddress" factory-method="getByName">
+    <argument value="${source-host}"/>
+  </bean>
+
+  <bean id="nettyReplicationSink" class="org.opendaylight.mdsal.replicate.netty.NettyReplication"
+        factory-method="createSink" destroy-method="close">
+    <argument ref="bootstrapSupport"/>
+    <argument ref="dataBroker"/>
+    <argument ref="singletonServiceProvider"/>
+    <argument value="${enabled}"/>
+    <argument ref="sourceAddress"/>
+    <argument value="${source-port}"/>
+    <argument ref="reconnectDelay"/>
+  </bean>
+
+</blueprint>
diff --git a/replicate/mdsal-replicate-netty/src/main/resources/org/opendaylight/blueprint/netty-replication-source.xml b/replicate/mdsal-replicate-netty/src/main/resources/org/opendaylight/blueprint/netty-replication-source.xml
new file mode 100644 (file)
index 0000000..14a2d4c
--- /dev/null
@@ -0,0 +1,25 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
+           xmlns:odl="http://opendaylight.org/xmlns/blueprint/v1.0.0"
+           xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.2.0">
+  <cm:property-placeholder persistent-id="org.opendaylight.mdsal.replicate.netty.source" update-strategy="reload">
+    <cm:default-properties>
+      <cm:property name="enabled" value="false"/>
+      <cm:property name="listen-port" value="9999"/>
+    </cm:default-properties>
+  </cm:property-placeholder>
+
+  <reference id="dataBroker" interface="org.opendaylight.mdsal.dom.api.DOMDataBroker" odl:type="default"/>
+  <reference id="singletonServiceProvider" interface="org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider"
+             odl:type="default"/>
+  <reference id="bootstrapSupport" interface="org.opendaylight.mdsal.replicate.netty.BootstrapSupport"/>
+
+  <bean id="nettyReplicationSource" class="org.opendaylight.mdsal.replicate.netty.NettyReplication"
+        factory-method="createSource" destroy-method="close">
+    <argument ref="bootstrapSupport"/>
+    <argument ref="dataBroker"/>
+    <argument ref="singletonServiceProvider"/>
+    <argument value="${enabled}"/>
+    <argument value="${listen-port}"/>
+  </bean>
+</blueprint>
diff --git a/replicate/pom.xml b/replicate/pom.xml
new file mode 100644 (file)
index 0000000..6920154
--- /dev/null
@@ -0,0 +1,33 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+
+ This program and the accompanying materials are made available under the
+ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.opendaylight.odlparent</groupId>
+        <artifactId>odlparent-lite</artifactId>
+        <version>5.0.8</version>
+        <relativePath/>
+    </parent>
+
+    <groupId>org.opendaylight.mdsal</groupId>
+    <artifactId>mdsal-replicate</artifactId>
+    <version>4.0.15-SNAPSHOT</version>
+    <packaging>pom</packaging>
+
+    <properties>
+        <maven.deploy.skip>true</maven.deploy.skip>
+        <maven.install.skip>true</maven.install.skip>
+    </properties>
+
+    <modules>
+        <module>mdsal-replicate-common</module>
+        <module>mdsal-replicate-netty</module>
+    </modules>
+</project>