From: Robert Varga Date: Wed, 17 Jun 2020 10:29:12 +0000 (+0200) Subject: Add basic netty replication utility X-Git-Tag: v4.0.15~2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=mdsal.git;a=commitdiff_plain;h=cd61eab688c3bbd7d54d04b2c72021838ee3ca71 Add basic netty replication utility This adds a source/sink datastore replication component based on Netty TCP channels. The sink (consumer) connects to source (producer) via a channel, specifies which data tree it wants replicated and then listens for DataTreeCandidates to arrive from the source. Change-Id: Ib283baa9a186ae2fb4ccf909b257006d4645de37 Signed-off-by: Tibor Král Signed-off-by: Robert Varga (cherry picked from commit d7b666857c54c07d9bf5c8e5e38671151d89fb4c) --- diff --git a/artifacts/pom.xml b/artifacts/pom.xml index b5a508e5d0..0bdb2991eb 100644 --- a/artifacts/pom.xml +++ b/artifacts/pom.xml @@ -379,6 +379,32 @@ xml + + + ${project.groupId} + mdsal-replicate-common + 4.0.15-SNAPSHOT + + + ${project.groupId} + mdsal-replicate-netty + 4.0.15-SNAPSHOT + + + ${project.groupId} + odl-mdsal-exp-replicate-common + 4.0.15-SNAPSHOT + features + xml + + + ${project.groupId} + odl-mdsal-exp-replicate-netty + 4.0.15-SNAPSHOT + features + xml + + org.opendaylight.mdsal.model diff --git a/docs/pom.xml b/docs/pom.xml index 0135adc70e..d79e0ebe8c 100644 --- a/docs/pom.xml +++ b/docs/pom.xml @@ -188,6 +188,15 @@ mdsal-yanglib-rfc8525 + + org.opendaylight.mdsal + mdsal-replicate-common + + + org.opendaylight.mdsal + mdsal-replicate-netty + + org.opendaylight.mdsal.model opendaylight-l2-types diff --git a/features/features-mdsal-experimental/pom.xml b/features/features-mdsal-experimental/pom.xml index 2e97ff1b43..c6077faf90 100644 --- a/features/features-mdsal-experimental/pom.xml +++ b/features/features-mdsal-experimental/pom.xml @@ -61,5 +61,18 @@ features xml + + + org.opendaylight.mdsal + odl-mdsal-exp-replicate-common + features + xml + + + org.opendaylight.mdsal + odl-mdsal-exp-replicate-netty + features + xml + diff --git a/features/odl-mdsal-exp-replicate-common/pom.xml b/features/odl-mdsal-exp-replicate-common/pom.xml new file mode 100644 index 0000000000..e0e49c6737 --- /dev/null +++ b/features/odl-mdsal-exp-replicate-common/pom.xml @@ -0,0 +1,36 @@ + + + + 4.0.0 + + + org.opendaylight.mdsal + feature-parent + 4.0.15-SNAPSHOT + ../feature-parent + + + odl-mdsal-exp-replicate-common + feature + + OpenDaylight :: MD-SAL :: Replicate :: Common + + + + org.opendaylight.mdsal + odl-mdsal-dom-api + features + xml + + + org.opendaylight.mdsal + mdsal-replicate-common + + + diff --git a/features/odl-mdsal-exp-replicate-netty/pom.xml b/features/odl-mdsal-exp-replicate-netty/pom.xml new file mode 100644 index 0000000000..07fd3d156c --- /dev/null +++ b/features/odl-mdsal-exp-replicate-netty/pom.xml @@ -0,0 +1,36 @@ + + + + 4.0.0 + + + org.opendaylight.mdsal + feature-parent + 4.0.15-SNAPSHOT + ../feature-parent + + + odl-mdsal-exp-replicate-netty + feature + + OpenDaylight :: MD-SAL :: Replicate :: Netty + + + + org.opendaylight.mdsal + odl-mdsal-exp-replicate-common + features + xml + + + org.opendaylight.mdsal + mdsal-replicate-netty + + + diff --git a/features/pom.xml b/features/pom.xml index 26c4de36df..49582b36e0 100644 --- a/features/pom.xml +++ b/features/pom.xml @@ -59,6 +59,10 @@ odl-mdsal-exp-yanglib-rfc7895 odl-mdsal-exp-yanglib-rfc8525 + + odl-mdsal-exp-replicate-common + odl-mdsal-exp-replicate-netty + odl-mdsal-model-rfc6991 diff --git a/pom.xml b/pom.xml index 1590edd2c9..f5927e747d 100644 --- a/pom.xml +++ b/pom.xml @@ -35,6 +35,9 @@ singleton-service trace + + replicate + yanglib diff --git a/replicate/mdsal-replicate-common/pom.xml b/replicate/mdsal-replicate-common/pom.xml new file mode 100644 index 0000000000..1e0e183a94 --- /dev/null +++ b/replicate/mdsal-replicate-common/pom.xml @@ -0,0 +1,43 @@ + + + + 4.0.0 + + + org.opendaylight.mdsal + dom-parent + 4.0.15-SNAPSHOT + ../../dom/dom-parent + + + mdsal-replicate-common + bundle + + + + org.opendaylight.yangtools + yang-data-codec-binfmt + + + org.opendaylight.mdsal + mdsal-dom-spi + + + org.opendaylight.mdsal + mdsal-singleton-common-api + + + + + scm:git:http://git.opendaylight.org/gerrit/mdsal.git + scm:git:ssh://git.opendaylight.org:29418/mdsal.git + HEAD + https://wiki.opendaylight.org/view/MD-SAL:Main + + diff --git a/replicate/mdsal-replicate-common/src/main/java/org/opendaylight/mdsal/replicate/common/DOMDataBrokerModification.java b/replicate/mdsal-replicate-common/src/main/java/org/opendaylight/mdsal/replicate/common/DOMDataBrokerModification.java new file mode 100644 index 0000000000..40e2e69712 --- /dev/null +++ b/replicate/mdsal-replicate-common/src/main/java/org/opendaylight/mdsal/replicate/common/DOMDataBrokerModification.java @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.mdsal.replicate.common; + +import static java.util.Objects.requireNonNull; + +import java.util.Optional; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteOperations; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class DOMDataBrokerModification implements DataTreeModification { + private static final Logger LOG = LoggerFactory.getLogger(DOMDataBrokerModification.class); + + private final DOMDataTreeWriteOperations transaction; + private final LogicalDatastoreType datastore; + + DOMDataBrokerModification(final DOMDataTreeWriteOperations transaction, final LogicalDatastoreType datastore) { + this.transaction = requireNonNull(transaction); + this.datastore = requireNonNull(datastore); + } + + @Override + public void delete(final YangInstanceIdentifier path) { + LOG.trace("BackupModification - DELETE - {}", path); + transaction.delete(datastore, path); + } + + @Override + public void write(final YangInstanceIdentifier path, final NormalizedNode data) { + LOG.trace("BackupModification - WRITE - {} - DATA: {}", path, data); + transaction.put(datastore, path, data); + } + + @Override + public void merge(final YangInstanceIdentifier path, final NormalizedNode data) { + throw new UnsupportedOperationException(); + } + + @Override + public void ready() { + throw new UnsupportedOperationException(); + } + + @Override + public void applyToCursor(final DataTreeModificationCursor cursor) { + throw new UnsupportedOperationException(); + } + + @Override + public Optional> readNode(final YangInstanceIdentifier path) { + throw new UnsupportedOperationException(); + } + + @Override + public DataTreeModification newModification() { + throw new UnsupportedOperationException(); + } + + @Override + public SchemaContext getSchemaContext() { + throw new UnsupportedOperationException(); + } +} diff --git a/replicate/mdsal-replicate-common/src/main/java/org/opendaylight/mdsal/replicate/common/DOMStoreModification.java b/replicate/mdsal-replicate-common/src/main/java/org/opendaylight/mdsal/replicate/common/DOMStoreModification.java new file mode 100644 index 0000000000..6d1d721b69 --- /dev/null +++ b/replicate/mdsal-replicate-common/src/main/java/org/opendaylight/mdsal/replicate/common/DOMStoreModification.java @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.mdsal.replicate.common; + +import static java.util.Objects.requireNonNull; + +import java.util.Optional; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class DOMStoreModification implements DataTreeModification { + private static final Logger LOG = LoggerFactory.getLogger(DOMStoreModification.class); + + private final DOMStoreWriteTransaction transaction; + + DOMStoreModification(final DOMStoreWriteTransaction transaction) { + this.transaction = requireNonNull(transaction); + } + + @Override + public void delete(final YangInstanceIdentifier path) { + LOG.trace("Delete {}", path); + transaction.delete(path); + } + + @Override + public void write(final YangInstanceIdentifier path, final NormalizedNode data) { + LOG.trace("Write {} data {}", path, data); + transaction.write(path, data); + } + + @Override + public Optional> readNode(final YangInstanceIdentifier path) { + throw new UnsupportedOperationException(); + } + + @Override + public @NonNull DataTreeModification newModification() { + throw new UnsupportedOperationException(); + } + + @Override + public SchemaContext getSchemaContext() { + throw new UnsupportedOperationException(); + } + + @Override + public void merge(final YangInstanceIdentifier path, final NormalizedNode data) { + throw new UnsupportedOperationException(); + } + + @Override + public void ready() { + throw new UnsupportedOperationException(); + } + + @Override + public void applyToCursor(@NonNull final DataTreeModificationCursor cursor) { + throw new UnsupportedOperationException(); + } +} diff --git a/replicate/mdsal-replicate-common/src/main/java/org/opendaylight/mdsal/replicate/common/DataTreeCandidateUtils.java b/replicate/mdsal-replicate-common/src/main/java/org/opendaylight/mdsal/replicate/common/DataTreeCandidateUtils.java new file mode 100644 index 0000000000..d7e5d5fb93 --- /dev/null +++ b/replicate/mdsal-replicate-common/src/main/java/org/opendaylight/mdsal/replicate/common/DataTreeCandidateUtils.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.mdsal.replicate.common; + +import com.google.common.annotations.Beta; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteOperations; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; + +@Beta +public final class DataTreeCandidateUtils { + private DataTreeCandidateUtils() { + // Hidden on purpose + } + + public static void applyToTransaction(final DOMDataTreeWriteOperations transaction, + final LogicalDatastoreType datastore, final DataTreeCandidate candidate) { + DataTreeCandidates.applyToModification(new DOMDataBrokerModification(transaction, datastore), candidate); + } + + public static void applyToTransaction(final DOMStoreWriteTransaction transaction, + final DataTreeCandidate candidate) { + DataTreeCandidates.applyToModification(new DOMStoreModification(transaction), candidate); + } +} diff --git a/replicate/mdsal-replicate-netty/pom.xml b/replicate/mdsal-replicate-netty/pom.xml new file mode 100644 index 0000000000..1b4b00e1d8 --- /dev/null +++ b/replicate/mdsal-replicate-netty/pom.xml @@ -0,0 +1,49 @@ + + + + 4.0.0 + + + org.opendaylight.mdsal + dom-parent + 4.0.15-SNAPSHOT + ../../dom/dom-parent + + + mdsal-replicate-netty + bundle + + + + org.opendaylight.mdsal + mdsal-replicate-common + + + io.netty + netty-handler + + + io.netty + netty-transport + + + io.netty + netty-transport-native-epoll + true + + + + + + scm:git:http://git.opendaylight.org/gerrit/mdsal.git + scm:git:ssh://git.opendaylight.org:29418/mdsal.git + HEAD + https://wiki.opendaylight.org/view/MD-SAL:Main + + diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/AbstractBootstrapSupport.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/AbstractBootstrapSupport.java new file mode 100644 index 0000000000..926a7761d3 --- /dev/null +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/AbstractBootstrapSupport.java @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.mdsal.replicate.netty; + +import static java.util.Objects.requireNonNull; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.Epoll; +import io.netty.channel.socket.ServerSocketChannel; +import java.util.concurrent.TimeUnit; +import org.eclipse.jdt.annotation.NonNull; + +public abstract class AbstractBootstrapSupport implements AutoCloseable, BootstrapSupport { + private final Class channelClass; + private final Class serverChannelClass; + private final EventLoopGroup bossGroup; + private final EventLoopGroup workerGroup; + + AbstractBootstrapSupport(final Class channelClass, + final Class serverChannelClass, final EventLoopGroup bossGroup, + final EventLoopGroup workerGroup) { + this.channelClass = requireNonNull(channelClass); + this.serverChannelClass = requireNonNull(serverChannelClass); + this.bossGroup = requireNonNull(bossGroup); + this.workerGroup = requireNonNull(workerGroup); + } + + public static @NonNull BootstrapSupport create() { + if (Epoll.isAvailable()) { + return new EpollBootstrapSupport(); + } + return new NioBootstrapSupport(); + } + + @Override + public final Bootstrap newBootstrap() { + return new Bootstrap().group(workerGroup).channel(channelClass); + } + + @Override + public final ServerBootstrap newServerBootstrap() { + return new ServerBootstrap().group(bossGroup, workerGroup).channel(serverChannelClass); + } + + @Override + public final void close() throws InterruptedException { + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + + bossGroup.awaitTermination(10, TimeUnit.SECONDS); + workerGroup.awaitTermination(10, TimeUnit.SECONDS); + } +} diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/AbstractSourceMessage.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/AbstractSourceMessage.java new file mode 100644 index 0000000000..0df9b42ff0 --- /dev/null +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/AbstractSourceMessage.java @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.mdsal.replicate.netty; + +import static java.util.Objects.requireNonNull; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.codec.binfmt.DataTreeCandidateInputOutput; +import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeDataOutput; +import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion; + +abstract class AbstractSourceMessage { + private static final class Empty extends AbstractSourceMessage { + @Override + void encodeTo(final NormalizedNodeStreamVersion version, final List out) throws IOException { + out.add(Constants.EMPTY_DATA); + } + } + + private static final class Deltas extends AbstractSourceMessage { + private final Collection deltas; + + Deltas(final Collection deltas) { + this.deltas = requireNonNull(deltas); + } + + @Override + void encodeTo(final NormalizedNodeStreamVersion version, final List out) throws IOException { + for (DataTreeCandidate candidate : deltas) { + try (DataOutputStream stream = new DataOutputStream(new SplittingOutputStream(out))) { + try (NormalizedNodeDataOutput output = version.newDataOutput(stream)) { + DataTreeCandidateInputOutput.writeDataTreeCandidate(output, candidate); + } + } + out.add(Constants.DTC_APPLY); + } + } + } + + private static final AbstractSourceMessage EMPTY = new Empty(); + + static AbstractSourceMessage empty() { + return EMPTY; + } + + static AbstractSourceMessage of(final Collection deltas) { + return new Deltas(deltas); + } + + abstract void encodeTo(NormalizedNodeStreamVersion version, List out) throws IOException; +} diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/BootstrapSupport.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/BootstrapSupport.java new file mode 100644 index 0000000000..10c1be4d37 --- /dev/null +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/BootstrapSupport.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.mdsal.replicate.netty; + +import com.google.common.annotations.Beta; +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import org.eclipse.jdt.annotation.NonNull; + +@Beta +public interface BootstrapSupport { + + @NonNull Bootstrap newBootstrap(); + + @NonNull ServerBootstrap newServerBootstrap(); +} \ No newline at end of file diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/Constants.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/Constants.java new file mode 100644 index 0000000000..eddf8d6dba --- /dev/null +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/Constants.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.mdsal.replicate.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; + +final class Constants { + /** + * Subscribe request message. This is the only valid initial message in the sink->source direction. Its payload is + * composed of a binary normalized node stream. The stream must contain a {@link LogicalDatastoreType} serialized + * via {@link LogicalDatastoreType#writeTo(java.io.DataOutput)} followed by a single {@link YangInstanceIdentifier}. + */ + static final byte MSG_SUBSCRIBE_REQ = 1; + /** + * Initial data indicating non-presence of the subscribed path. + */ + static final byte MSG_EMPTY_DATA = 2; + /** + * A chunk of the DataTreeCandidate serialization stream. May be followed by another chunk or + * {@link #MSG_DTC_APPLY}. + */ + static final byte MSG_DTC_CHUNK = 3; + /** + * End-of-DataTreeCandidate serialization stream. The payload is empty. + */ + static final byte MSG_DTC_APPLY = 4; + + /** + * Length of the length field in each transmitted frame. + */ + static final int LENGTH_FIELD_LENGTH = 4; + /** + * Maximum frame size allowed by encoding, 1MiB. + */ + static final int LENGTH_FIELD_MAX = 1024 * 1024; + + static final ByteBuf EMPTY_DATA = Unpooled.wrappedBuffer(new byte[] { MSG_EMPTY_DATA }); + static final ByteBuf DTC_APPLY = Unpooled.wrappedBuffer(new byte[] { MSG_DTC_APPLY }); + + private Constants() { + // Hidden on purpose + } +} diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/DeltaEncoder.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/DeltaEncoder.java new file mode 100644 index 0000000000..96d6740614 --- /dev/null +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/DeltaEncoder.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.mdsal.replicate.netty; + +import static java.util.Objects.requireNonNull; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageEncoder; +import java.io.IOException; +import java.util.List; +import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion; + +final class DeltaEncoder extends MessageToMessageEncoder { + private final NormalizedNodeStreamVersion version; + + DeltaEncoder(final NormalizedNodeStreamVersion version) { + this.version = requireNonNull(version); + } + + @Override + protected void encode(final ChannelHandlerContext ctx, final AbstractSourceMessage msg, final List out) + throws IOException { + msg.encodeTo(version, out); + } +} diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/EpollBootstrapSupport.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/EpollBootstrapSupport.java new file mode 100644 index 0000000000..f5527c46ad --- /dev/null +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/EpollBootstrapSupport.java @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.mdsal.replicate.netty; + +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollServerSocketChannel; +import io.netty.channel.epoll.EpollSocketChannel; + +final class EpollBootstrapSupport extends AbstractBootstrapSupport { + EpollBootstrapSupport() { + super(EpollSocketChannel.class, EpollServerSocketChannel.class, new EpollEventLoopGroup(), + new EpollEventLoopGroup()); + } +} \ No newline at end of file diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/MessageFrameDecoder.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/MessageFrameDecoder.java new file mode 100644 index 0000000000..876847eefc --- /dev/null +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/MessageFrameDecoder.java @@ -0,0 +1,16 @@ +/* + * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.mdsal.replicate.netty; + +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; + +final class MessageFrameDecoder extends LengthFieldBasedFrameDecoder { + MessageFrameDecoder() { + super(Constants.LENGTH_FIELD_MAX, 0, Constants.LENGTH_FIELD_LENGTH, 0, Constants.LENGTH_FIELD_LENGTH); + } +} diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/MessageFrameEncoder.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/MessageFrameEncoder.java new file mode 100644 index 0000000000..f6d934781f --- /dev/null +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/MessageFrameEncoder.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.mdsal.replicate.netty; + +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.handler.codec.LengthFieldPrepender; + +@Sharable +final class MessageFrameEncoder extends LengthFieldPrepender { + private static final MessageFrameEncoder INSTANCE = new MessageFrameEncoder(); + + private MessageFrameEncoder() { + super(Constants.LENGTH_FIELD_LENGTH); + } + + static MessageFrameEncoder instance() { + return INSTANCE; + } +} diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/NettyReplication.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/NettyReplication.java new file mode 100644 index 0000000000..7e396e8ba4 --- /dev/null +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/NettyReplication.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.mdsal.replicate.netty; + +import static com.google.common.base.Verify.verify; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.time.Duration; +import org.opendaylight.mdsal.dom.api.DOMDataBroker; +import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService; +import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider; +import org.opendaylight.yangtools.concepts.AbstractRegistration; +import org.opendaylight.yangtools.concepts.Registration; + +public final class NettyReplication { + private static final class Disabled extends AbstractRegistration { + @Override + protected void removeRegistration() { + // no-op + } + } + + private NettyReplication() { + // Hidden on purpose + } + + public static Registration createSink(final BootstrapSupport bootstrapSupport, final DOMDataBroker dataBroker, + final ClusterSingletonServiceProvider singletonService, final boolean enabled, + final InetAddress sourceAddress, final int sourcePort, final Duration reconnectDelay) { + return enabled ? singletonService.registerClusterSingletonService(new SinkSingletonService(bootstrapSupport, + dataBroker, new InetSocketAddress(sourceAddress, sourcePort), reconnectDelay)) : new Disabled(); + } + + public static Registration createSource(final BootstrapSupport bootstrapSupport, final DOMDataBroker dataBroker, + final ClusterSingletonServiceProvider singletonService, final boolean enabled, final int listenPort) { + final DOMDataTreeChangeService dtcs = dataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class); + verify(dtcs != null, "Missing DOMDataTreeChangeService in broker %s", dataBroker); + + return enabled ? singletonService.registerClusterSingletonService(new SourceSingletonService(bootstrapSupport, + dtcs, listenPort)) : new Disabled(); + } +} diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/NioBootstrapSupport.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/NioBootstrapSupport.java new file mode 100644 index 0000000000..116aa6c0bd --- /dev/null +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/NioBootstrapSupport.java @@ -0,0 +1,18 @@ +/* + * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.mdsal.replicate.netty; + +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; + +final class NioBootstrapSupport extends AbstractBootstrapSupport { + NioBootstrapSupport() { + super(NioSocketChannel.class, NioServerSocketChannel.class, new NioEventLoopGroup(), new NioEventLoopGroup()); + } +} \ No newline at end of file diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkRequestHandler.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkRequestHandler.java new file mode 100644 index 0000000000..7ead9c26e2 --- /dev/null +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkRequestHandler.java @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.mdsal.replicate.netty; + +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verify; +import static java.util.Objects.requireNonNull; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.MoreExecutors; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.opendaylight.mdsal.common.api.CommitInfo; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction; +import org.opendaylight.mdsal.dom.api.DOMTransactionChain; +import org.opendaylight.mdsal.replicate.common.DataTreeCandidateUtils; +import org.opendaylight.yangtools.yang.data.api.schema.stream.ReusableStreamReceiver; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.codec.binfmt.DataTreeCandidateInputOutput; +import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeDataInput; +import org.opendaylight.yangtools.yang.data.impl.schema.ReusableImmutableNormalizedNodeStreamWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class SinkRequestHandler extends SimpleChannelInboundHandler { + private static final Logger LOG = LoggerFactory.getLogger(SinkRequestHandler.class); + + private final ReusableStreamReceiver receiver = ReusableImmutableNormalizedNodeStreamWriter.create(); + private final List chunks = new ArrayList<>(); + private final DOMDataTreeIdentifier tree; + private final DOMTransactionChain chain; + + SinkRequestHandler(final DOMDataTreeIdentifier tree, final DOMTransactionChain chain) { + this.tree = requireNonNull(tree); + this.chain = requireNonNull(chain); + } + + @Override + protected void channelRead0(final ChannelHandlerContext ctx, final ByteBuf msg) throws IOException { + verify(msg.isReadable(), "Empty message received"); + + final short msgType = msg.readUnsignedByte(); + final Channel channel = ctx.channel(); + LOG.trace("Channel {} received message type {}", channel, msgType); + switch (msgType) { + case Constants.MSG_EMPTY_DATA: + handleEmptyData(); + break; + case Constants.MSG_DTC_CHUNK: + chunks.add(msg); + break; + case Constants.MSG_DTC_APPLY: + handleDtcApply(); + break; + default: + throw new IllegalStateException("Unexpected message type " + msgType); + } + } + + private void handleEmptyData() { + final DOMDataTreeWriteTransaction tx = chain.newWriteOnlyTransaction(); + tx.delete(tree.getDatastoreType(), tree.getRootIdentifier()); + commit(tx); + } + + private void handleDtcApply() throws IOException { + checkState(!chunks.isEmpty(), "No chunks to apply"); + + final ByteBuf bufs = Unpooled.wrappedBuffer(chunks.toArray(new ByteBuf[0])); + chunks.clear(); + + final DataTreeCandidate candidate; + try (ByteBufInputStream stream = new ByteBufInputStream(bufs)) { + candidate = DataTreeCandidateInputOutput.readDataTreeCandidate(NormalizedNodeDataInput.newDataInput(stream), + receiver); + } + + final DOMDataTreeWriteTransaction tx = chain.newWriteOnlyTransaction(); + DataTreeCandidateUtils.applyToTransaction(tx, tree.getDatastoreType(), candidate); + commit(tx); + } + + private static void commit(final DOMDataTreeWriteTransaction tx) { + tx.commit().addCallback(new FutureCallback() { + @Override + public void onSuccess(final CommitInfo result) { + LOG.trace("Transaction committed with {}", result); + } + + @Override + public void onFailure(final Throwable cause) { + // Handled by transaction chain listener + } + }, MoreExecutors.directExecutor()); + } +} diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkSingletonService.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkSingletonService.java new file mode 100644 index 0000000000..c2b5f4a838 --- /dev/null +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkSingletonService.java @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.mdsal.replicate.netty; + +import static java.util.Objects.requireNonNull; + +import com.google.common.util.concurrent.ListenableFuture; +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelOption; +import io.netty.util.concurrent.Future; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.time.Duration; +import java.util.concurrent.ScheduledExecutorService; +import org.checkerframework.checker.lock.qual.GuardedBy; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.dom.api.DOMDataBroker; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService; +import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeDataOutput; +import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class SinkSingletonService implements ClusterSingletonService { + private static final Logger LOG = LoggerFactory.getLogger(SinkSingletonService.class); + private static final ServiceGroupIdentifier SGID = + ServiceGroupIdentifier.create(SinkSingletonService.class.getName()); + // TODO: allow different trees? + private static final DOMDataTreeIdentifier TREE = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, + YangInstanceIdentifier.empty()); + private static final ByteBuf TREE_REQUEST; + + static { + try { + TREE_REQUEST = requestTree(TREE); + } catch (IOException e) { + throw new ExceptionInInitializerError(e); + } + } + + private final BootstrapSupport bootstrapSupport; + private final DOMDataBroker dataBroker; + private final InetSocketAddress sourceAddress; + private final Duration reconnectDelay; + + @GuardedBy("this") + private ChannelFuture futureChannel; + + SinkSingletonService(final BootstrapSupport bootstrapSupport, final DOMDataBroker dataBroker, + final InetSocketAddress sourceAddress, final Duration reconnectDelay) { + this.bootstrapSupport = requireNonNull(bootstrapSupport); + this.dataBroker = requireNonNull(dataBroker); + this.sourceAddress = requireNonNull(sourceAddress); + this.reconnectDelay = requireNonNull(reconnectDelay); + } + + @Override + public ServiceGroupIdentifier getIdentifier() { + return SGID; + } + + @Override + public synchronized void instantiateServiceInstance() { + LOG.info("Replication sink started with source {}", sourceAddress); + + final Bootstrap bs = bootstrapSupport.newBootstrap(); + final ScheduledExecutorService group = bs.config().group(); + + futureChannel = bs + .option(ChannelOption.SO_KEEPALIVE, true) + .connect(sourceAddress, null); + + futureChannel.addListener(compl -> channelResolved(compl, group)); + } + + @Override + public synchronized ListenableFuture closeServiceInstance() { + // TODO Auto-generated method stub + return null; + } + + private synchronized void channelResolved(final Future completedFuture, final ScheduledExecutorService group) { + if (completedFuture != futureChannel) { + // Future changed, this callback is irrelevant + return; + } + + final Channel channel = futureChannel.channel(); + channel.pipeline() + .addLast("frameDecoder", new MessageFrameDecoder()) + .addLast("requestHandler", new SinkRequestHandler(TREE, dataBroker.createMergingTransactionChain( + new SinkTransactionChainListener(channel)))) + .addLast("frameEncoder", MessageFrameEncoder.instance()); + + channel.writeAndFlush(TREE_REQUEST); + } + + private static ByteBuf requestTree(final DOMDataTreeIdentifier tree) throws IOException { + final ByteBuf ret = Unpooled.buffer(); + + try (ByteBufOutputStream stream = new ByteBufOutputStream(ret)) { + try (NormalizedNodeDataOutput output = NormalizedNodeStreamVersion.current().newDataOutput(stream)) { + tree.getDatastoreType().writeTo(output); + output.writeYangInstanceIdentifier(tree.getRootIdentifier()); + } + } + + return ret; + } +} diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkTransactionChainListener.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkTransactionChainListener.java new file mode 100644 index 0000000000..8b6c139d0d --- /dev/null +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkTransactionChainListener.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.mdsal.replicate.netty; + +import static java.util.Objects.requireNonNull; + +import io.netty.channel.Channel; +import org.opendaylight.mdsal.dom.api.DOMDataTreeTransaction; +import org.opendaylight.mdsal.dom.api.DOMTransactionChain; +import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class SinkTransactionChainListener implements DOMTransactionChainListener { + private static final Logger LOG = LoggerFactory.getLogger(SinkTransactionChainListener.class); + + private final Channel channel; + + SinkTransactionChainListener(final Channel channel) { + this.channel = requireNonNull(channel); + } + + @Override + public void onTransactionChainFailed(final DOMTransactionChain chain, final DOMDataTreeTransaction transaction, + final Throwable cause) { + LOG.error("Transaction chain for channel {} failed", channel, cause); + channel.close(); + } + + @Override + public void onTransactionChainSuccessful(final DOMTransactionChain chain) { + LOG.info("Transaction chain for channel {} completed", channel); + } +} diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceRequestHandler.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceRequestHandler.java new file mode 100644 index 0000000000..f302ca156d --- /dev/null +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceRequestHandler.java @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.mdsal.replicate.netty; + +import static com.google.common.base.Verify.verify; +import static java.util.Objects.requireNonNull; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import java.io.IOException; +import java.util.Collection; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.dom.api.ClusteredDOMDataTreeChangeListener; +import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeDataInput; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Final inbound handler on source side. Handles requests coming from sink and reacts to them. + */ +final class SourceRequestHandler extends SimpleChannelInboundHandler { + private static final Logger LOG = LoggerFactory.getLogger(SourceRequestHandler.class); + + private final DOMDataTreeChangeService dtcs; + + private ListenerRegistration reg; + + SourceRequestHandler(final DOMDataTreeChangeService dtcs) { + this.dtcs = requireNonNull(dtcs); + } + + @Override + public void channelInactive(final ChannelHandlerContext ctx) { + LOG.trace("Channel {} going inactive", ctx.channel()); + if (reg != null) { + reg.close(); + reg = null; + } + ctx.fireChannelInactive(); + } + + @Override + protected void channelRead0(final ChannelHandlerContext ctx, final ByteBuf msg) throws IOException { + verify(msg.isReadable(), "Empty message received"); + + final short msgType = msg.readUnsignedByte(); + final Channel channel = ctx.channel(); + LOG.trace("Channel {} received message type {}", channel, msgType); + switch (msgType) { + case Constants.MSG_SUBSCRIBE_REQ: + subscribe(channel, msg); + break; + default: + throw new IllegalStateException("Unexpected message type " + msgType); + } + } + + private void subscribe(final Channel channel, final ByteBuf msg) throws IOException { + verify(reg == null, "Unexpected subscription when already subscribed"); + + final DOMDataTreeIdentifier dataTree; + try (ByteBufInputStream input = new ByteBufInputStream(msg)) { + final NormalizedNodeDataInput normalizedInput = NormalizedNodeDataInput.newDataInput(input); + + dataTree = new DOMDataTreeIdentifier(LogicalDatastoreType.readFrom(normalizedInput), + normalizedInput.readYangInstanceIdentifier()); + } + + LOG.info("Channel {} subscribing to {}", channel, dataTree); + reg = dtcs.registerDataTreeChangeListener(dataTree, new ClusteredDOMDataTreeChangeListener() { + @Override + public void onInitialData() { + channel.writeAndFlush(AbstractSourceMessage.empty()); + } + + @Override + public void onDataTreeChanged(final Collection changes) { + channel.writeAndFlush(AbstractSourceMessage.of(changes)); + } + }); + } +} diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceSingletonService.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceSingletonService.java new file mode 100644 index 0000000000..415637269b --- /dev/null +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceSingletonService.java @@ -0,0 +1,136 @@ +/* + * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.mdsal.replicate.netty; + +import static java.util.Objects.requireNonNull; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.socket.SocketChannel; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import org.checkerframework.checker.lock.qual.GuardedBy; +import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService; +import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService; +import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier; +import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Cluster Singleton Service handler for delta stream source. Responsible for starting/stopping the delta stream source + * for a particular port. + */ +final class SourceSingletonService extends ChannelInitializer implements ClusterSingletonService { + private static final Logger LOG = LoggerFactory.getLogger(SourceSingletonService.class); + private static final ServiceGroupIdentifier SGID = + ServiceGroupIdentifier.create(SourceSingletonService.class.getName()); + + private final BootstrapSupport bootstrapSupport; + private final DOMDataTreeChangeService dtcs; + private final int listenPort; + + @GuardedBy("this") + private final Collection children = new HashSet<>(); + @GuardedBy("this") + private Channel serverChannel; + + SourceSingletonService(final BootstrapSupport bootstrapSupport, final DOMDataTreeChangeService dtcs, + final int listenPort) { + this.bootstrapSupport = requireNonNull(bootstrapSupport); + this.dtcs = requireNonNull(dtcs); + this.listenPort = listenPort; + } + + @Override + public ServiceGroupIdentifier getIdentifier() { + return SGID; + } + + @Override + public synchronized void instantiateServiceInstance() { + final ChannelFuture future = bootstrapSupport.newServerBootstrap() + .option(ChannelOption.SO_BACKLOG, 3) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .childHandler(this) + .bind(listenPort); + + try { + future.sync(); + } catch (InterruptedException e) { + throw new IllegalStateException("Failed to bind port " + listenPort, e); + } + + serverChannel = future.channel(); + LOG.info("Replication source started on port {}", listenPort); + } + + @Override + public synchronized ListenableFuture closeServiceInstance() { + LOG.info("Replication source on port {} shutting down", listenPort); + + final List> futures = new ArrayList<>(); + + // Close server channel + futures.add(closeChannel(serverChannel)); + serverChannel = null; + + // Close all child channels + for (SocketChannel channel : children) { + futures.add(closeChannel(channel)); + } + children.clear(); + + final ListenableFuture ret = Futures.nonCancellationPropagating(Futures.successfulAsList(futures)); + ret.addListener(() -> { + LOG.info("Replication source on port {} shut down", listenPort); + }, MoreExecutors.directExecutor()); + return ret; + } + + @Override + public synchronized void initChannel(final SocketChannel ch) { + if (serverChannel == null) { + LOG.debug("Channel {} established while shutting down, closing it", ch); + ch.close(); + return; + } + + ch.pipeline() + .addLast("frameDecoder", new MessageFrameDecoder()) + .addLast("requestHandler", new SourceRequestHandler(dtcs)) + .addLast("dtclHandler", new DeltaEncoder(NormalizedNodeStreamVersion.current())) + .addLast("frameEncoder", MessageFrameEncoder.instance()); + children.add(ch); + + LOG.debug("Channel {} established", ch); + } + + private static ListenableFuture closeChannel(final Channel ch) { + final SettableFuture ret = SettableFuture.create(); + ch.closeFuture().addListener(chf -> { + final Throwable cause = chf.cause(); + if (cause != null) { + ret.setException(cause); + } else { + ret.set(null); + } + }); + + ch.close(); + return ret; + } +} diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SplittingOutputStream.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SplittingOutputStream.java new file mode 100644 index 0000000000..a01b7bdeda --- /dev/null +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SplittingOutputStream.java @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.mdsal.replicate.netty; + +import static com.google.common.base.Verify.verify; +import static java.util.Objects.requireNonNull; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.io.IOException; +import java.io.OutputStream; +import java.util.List; +import java.util.Objects; + +/** + * An OutputStream which makes sure to slice messages to a maximum size. This prevents array reallocations and + * GC thrashing on huge objects. + */ +final class SplittingOutputStream extends OutputStream { + private static final int INIT_BUF = 4096; + + static { + verify(INIT_BUF <= Constants.LENGTH_FIELD_MAX); + } + + private final List out; + + private ByteBuf buf; + + SplittingOutputStream(final List out) { + this.out = requireNonNull(out); + allocBuffer(); + } + + @Override + @SuppressWarnings("checkstyle:parameterName") + public void write(final int b) throws IOException { + buf.writeByte(b); + checkSend(); + } + + @Override + @SuppressWarnings("checkstyle:parameterName") + public void write(final byte[] b, final int off, final int len) throws IOException { + if (off < 0 || len < 0) { + throw new IndexOutOfBoundsException(); + } + + int left = len; + int ptr = off; + while (left > 0) { + final int chunk = Math.min(Constants.LENGTH_FIELD_MAX - buf.writerIndex(), left); + + buf.writeBytes(b, ptr, chunk); + ptr += chunk; + left -= chunk; + checkSend(); + } + } + + @Override + public void close() { + if (buf.writerIndex() != 0) { + out.add(buf); + } + buf = null; + } + + private void allocBuffer() { + // FIXME: use buffer allocator? + buf = Unpooled.buffer(INIT_BUF, Constants.LENGTH_FIELD_MAX); + buf.writeByte(Constants.MSG_DTC_CHUNK); + } + + private void checkSend() { + if (buf.writerIndex() == Constants.LENGTH_FIELD_MAX) { + out.add(buf); + allocBuffer(); + } + } +} diff --git a/replicate/mdsal-replicate-netty/src/main/resources/org/opendaylight/blueprint/netty-replication-common.xml b/replicate/mdsal-replicate-netty/src/main/resources/org/opendaylight/blueprint/netty-replication-common.xml new file mode 100644 index 0000000000..00dc59516c --- /dev/null +++ b/replicate/mdsal-replicate-netty/src/main/resources/org/opendaylight/blueprint/netty-replication-common.xml @@ -0,0 +1,6 @@ + + + + + diff --git a/replicate/mdsal-replicate-netty/src/main/resources/org/opendaylight/blueprint/netty-replication-sink.xml b/replicate/mdsal-replicate-netty/src/main/resources/org/opendaylight/blueprint/netty-replication-sink.xml new file mode 100644 index 0000000000..9a76c3d546 --- /dev/null +++ b/replicate/mdsal-replicate-netty/src/main/resources/org/opendaylight/blueprint/netty-replication-sink.xml @@ -0,0 +1,38 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/replicate/mdsal-replicate-netty/src/main/resources/org/opendaylight/blueprint/netty-replication-source.xml b/replicate/mdsal-replicate-netty/src/main/resources/org/opendaylight/blueprint/netty-replication-source.xml new file mode 100644 index 0000000000..14a2d4c436 --- /dev/null +++ b/replicate/mdsal-replicate-netty/src/main/resources/org/opendaylight/blueprint/netty-replication-source.xml @@ -0,0 +1,25 @@ + + + + + + + + + + + + + + + + + + + + + diff --git a/replicate/pom.xml b/replicate/pom.xml new file mode 100644 index 0000000000..6920154fdc --- /dev/null +++ b/replicate/pom.xml @@ -0,0 +1,33 @@ + + + + 4.0.0 + + + org.opendaylight.odlparent + odlparent-lite + 5.0.8 + + + + org.opendaylight.mdsal + mdsal-replicate + 4.0.15-SNAPSHOT + pom + + + true + true + + + + mdsal-replicate-common + mdsal-replicate-netty + +