/* * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.mdsal.replicate.netty; import static java.util.Objects.requireNonNull; import com.google.common.util.concurrent.ListenableFuture; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufOutputStream; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.util.concurrent.Future; import java.io.IOException; import java.net.InetSocketAddress; import java.time.Duration; import java.util.concurrent.ScheduledExecutorService; import org.checkerframework.checker.lock.qual.GuardedBy; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.mdsal.dom.api.DOMDataBroker; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService; import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeDataOutput; import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion; import org.slf4j.Logger; import org.slf4j.LoggerFactory; final class SinkSingletonService implements ClusterSingletonService { private static final Logger LOG = LoggerFactory.getLogger(SinkSingletonService.class); private static final ServiceGroupIdentifier SGID = ServiceGroupIdentifier.create(SinkSingletonService.class.getName()); // TODO: allow different trees? private static final DOMDataTreeIdentifier TREE = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.empty()); private static final ByteBuf TREE_REQUEST; static { try { TREE_REQUEST = requestTree(TREE); } catch (IOException e) { throw new ExceptionInInitializerError(e); } } private final BootstrapSupport bootstrapSupport; private final DOMDataBroker dataBroker; private final InetSocketAddress sourceAddress; private final Duration reconnectDelay; @GuardedBy("this") private ChannelFuture futureChannel; SinkSingletonService(final BootstrapSupport bootstrapSupport, final DOMDataBroker dataBroker, final InetSocketAddress sourceAddress, final Duration reconnectDelay) { this.bootstrapSupport = requireNonNull(bootstrapSupport); this.dataBroker = requireNonNull(dataBroker); this.sourceAddress = requireNonNull(sourceAddress); this.reconnectDelay = requireNonNull(reconnectDelay); } @Override public ServiceGroupIdentifier getIdentifier() { return SGID; } @Override public synchronized void instantiateServiceInstance() { LOG.info("Replication sink started with source {}", sourceAddress); final Bootstrap bs = bootstrapSupport.newBootstrap(); final ScheduledExecutorService group = bs.config().group(); futureChannel = bs .option(ChannelOption.SO_KEEPALIVE, true) .connect(sourceAddress, null); futureChannel.addListener(compl -> channelResolved(compl, group)); } @Override public synchronized ListenableFuture closeServiceInstance() { // TODO Auto-generated method stub return null; } private synchronized void channelResolved(final Future completedFuture, final ScheduledExecutorService group) { if (completedFuture != futureChannel) { // Future changed, this callback is irrelevant return; } final Channel channel = futureChannel.channel(); channel.pipeline() .addLast("frameDecoder", new MessageFrameDecoder()) .addLast("requestHandler", new SinkRequestHandler(TREE, dataBroker.createMergingTransactionChain( new SinkTransactionChainListener(channel)))) .addLast("frameEncoder", MessageFrameEncoder.instance()); channel.writeAndFlush(TREE_REQUEST); } private static ByteBuf requestTree(final DOMDataTreeIdentifier tree) throws IOException { final ByteBuf ret = Unpooled.buffer(); try (ByteBufOutputStream stream = new ByteBufOutputStream(ret)) { try (NormalizedNodeDataOutput output = NormalizedNodeStreamVersion.current().newDataOutput(stream)) { tree.getDatastoreType().writeTo(output); output.writeYangInstanceIdentifier(tree.getRootIdentifier()); } } return ret; } }