c2b5f4a83822488efb4be54903adf09b90a3139c
[mdsal.git] / replicate / mdsal-replicate-netty / src / main / java / org / opendaylight / mdsal / replicate / netty / SinkSingletonService.java
1 /*
2  * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.replicate.netty;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.util.concurrent.ListenableFuture;
13 import io.netty.bootstrap.Bootstrap;
14 import io.netty.buffer.ByteBuf;
15 import io.netty.buffer.ByteBufOutputStream;
16 import io.netty.buffer.Unpooled;
17 import io.netty.channel.Channel;
18 import io.netty.channel.ChannelFuture;
19 import io.netty.channel.ChannelOption;
20 import io.netty.util.concurrent.Future;
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.time.Duration;
24 import java.util.concurrent.ScheduledExecutorService;
25 import org.checkerframework.checker.lock.qual.GuardedBy;
26 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
27 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
28 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
29 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
30 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
31 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
32 import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeDataOutput;
33 import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 final class SinkSingletonService implements ClusterSingletonService {
38     private static final Logger LOG = LoggerFactory.getLogger(SinkSingletonService.class);
39     private static final ServiceGroupIdentifier SGID =
40             ServiceGroupIdentifier.create(SinkSingletonService.class.getName());
41     // TODO: allow different trees?
42     private static final DOMDataTreeIdentifier TREE = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
43         YangInstanceIdentifier.empty());
44     private static final ByteBuf TREE_REQUEST;
45
46     static {
47         try {
48             TREE_REQUEST = requestTree(TREE);
49         } catch (IOException e) {
50             throw new ExceptionInInitializerError(e);
51         }
52     }
53
54     private final BootstrapSupport bootstrapSupport;
55     private final DOMDataBroker dataBroker;
56     private final InetSocketAddress sourceAddress;
57     private final Duration reconnectDelay;
58
59     @GuardedBy("this")
60     private ChannelFuture futureChannel;
61
62     SinkSingletonService(final BootstrapSupport bootstrapSupport, final DOMDataBroker dataBroker,
63             final InetSocketAddress sourceAddress, final Duration reconnectDelay) {
64         this.bootstrapSupport = requireNonNull(bootstrapSupport);
65         this.dataBroker = requireNonNull(dataBroker);
66         this.sourceAddress = requireNonNull(sourceAddress);
67         this.reconnectDelay = requireNonNull(reconnectDelay);
68     }
69
70     @Override
71     public ServiceGroupIdentifier getIdentifier() {
72         return SGID;
73     }
74
75     @Override
76     public synchronized void instantiateServiceInstance() {
77         LOG.info("Replication sink started with source {}", sourceAddress);
78
79         final Bootstrap bs = bootstrapSupport.newBootstrap();
80         final ScheduledExecutorService group = bs.config().group();
81
82         futureChannel = bs
83                 .option(ChannelOption.SO_KEEPALIVE, true)
84                 .connect(sourceAddress, null);
85
86         futureChannel.addListener(compl -> channelResolved(compl, group));
87     }
88
89     @Override
90     public synchronized ListenableFuture<?> closeServiceInstance() {
91         // TODO Auto-generated method stub
92         return null;
93     }
94
95     private synchronized void channelResolved(final Future<?> completedFuture, final ScheduledExecutorService group) {
96         if (completedFuture != futureChannel) {
97             // Future changed, this callback is irrelevant
98             return;
99         }
100
101         final Channel channel = futureChannel.channel();
102         channel.pipeline()
103             .addLast("frameDecoder", new MessageFrameDecoder())
104             .addLast("requestHandler", new SinkRequestHandler(TREE, dataBroker.createMergingTransactionChain(
105                 new SinkTransactionChainListener(channel))))
106             .addLast("frameEncoder", MessageFrameEncoder.instance());
107
108         channel.writeAndFlush(TREE_REQUEST);
109     }
110
111     private static ByteBuf requestTree(final DOMDataTreeIdentifier tree) throws IOException {
112         final ByteBuf ret = Unpooled.buffer();
113
114         try (ByteBufOutputStream stream = new ByteBufOutputStream(ret)) {
115             try (NormalizedNodeDataOutput output = NormalizedNodeStreamVersion.current().newDataOutput(stream)) {
116                 tree.getDatastoreType().writeTo(output);
117                 output.writeYangInstanceIdentifier(tree.getRootIdentifier());
118             }
119         }
120
121         return ret;
122     }
123 }