Netty Replicator - improve the reconnection and keepalive mechanisms
[mdsal.git] / replicate / mdsal-replicate-netty / src / test / java / org / opendaylight / mdsal / replicate / netty / IntegrationTest.java
1 /*
2  * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.replicate.netty;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.mockito.ArgumentMatchers.any;
12 import static org.mockito.ArgumentMatchers.eq;
13 import static org.mockito.Mockito.doReturn;
14 import static org.mockito.Mockito.mock;
15 import static org.mockito.Mockito.timeout;
16 import static org.mockito.Mockito.verify;
17
18 import java.net.Inet4Address;
19 import java.time.Duration;
20 import java.util.concurrent.ExecutionException;
21 import org.junit.After;
22 import org.junit.Before;
23 import org.junit.Test;
24 import org.junit.runner.RunWith;
25 import org.mockito.ArgumentCaptor;
26 import org.mockito.junit.MockitoJUnitRunner;
27 import org.opendaylight.mdsal.binding.api.DataBroker;
28 import org.opendaylight.mdsal.binding.api.WriteTransaction;
29 import org.opendaylight.mdsal.binding.dom.adapter.test.AbstractDataBrokerTest;
30 import org.opendaylight.mdsal.common.api.CommitInfo;
31 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
32 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
33 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
34 import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
35 import org.opendaylight.mdsal.eos.dom.simple.SimpleDOMEntityOwnershipService;
36 import org.opendaylight.mdsal.singleton.dom.impl.DOMClusterSingletonServiceProviderImpl;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.mdsal.core.general.entity.rev150930.Entity;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.mdsal.core.general.entity.rev150930.EntityBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.mdsal.core.general.entity.rev150930.EntityKey;
40 import org.opendaylight.yangtools.concepts.Registration;
41 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
42 import org.opendaylight.yangtools.yang.common.QName;
43 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
44 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
45 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
46 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
47 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
48 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
49 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
50 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
51 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
52 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
53
54 @RunWith(MockitoJUnitRunner.StrictStubs.class)
55 public class IntegrationTest extends AbstractDataBrokerTest {
56     private static final int TEST_PORT = 4000;
57
58     private static final QName ENTITY_QNAME = Entity.QNAME;
59     private static final QName ENTITY_NAME_QNAME = QName.create(ENTITY_QNAME, "name");
60
61     private AbstractBootstrapSupport support;
62     private DOMClusterSingletonServiceProviderImpl css;
63
64     @Before
65     public void before() {
66         support = AbstractBootstrapSupport.create();
67         css = new DOMClusterSingletonServiceProviderImpl(new SimpleDOMEntityOwnershipService());
68         css.initializeProvider();
69     }
70
71     @After
72     public void after() throws InterruptedException {
73         support.close();
74         css.close();
75     }
76
77     /**
78      * Connect the Source and Sink. Verify that the initial state returned from Source was empty. After that make some
79      * changes to the Source's datastore and verify that they were all replicated to the Sink
80      */
81     @Test
82     public void testSourceToSink() throws InterruptedException, ExecutionException {
83         // Make sure to start source...
84         final Registration source = NettyReplication.createSource(support, getDomBroker(), css, true, TEST_PORT,
85             Duration.ZERO, 5);
86         // ... and give it some time start up and open up the port
87         Thread.sleep(1000);
88
89         // Mocking for sink...
90         final DOMTransactionChain sinkChain = mock(DOMTransactionChain.class);
91         final DOMDataTreeWriteTransaction sinkTx = mock(DOMDataTreeWriteTransaction.class);
92         doReturn(CommitInfo.emptyFluentFuture()).when(sinkTx).commit();
93         doReturn(sinkTx).when(sinkChain).newWriteOnlyTransaction();
94         final DOMDataBroker sinkBroker = mock(DOMDataBroker.class);
95         doReturn(sinkChain).when(sinkBroker).createMergingTransactionChain(any());
96
97         // Kick of the sink ...
98         final Registration sink = NettyReplication.createSink(support, sinkBroker, css, true,
99             Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO, Duration.ZERO, 3);
100         // ... and sync on it starting up
101
102         // verify the connection was established and MSG_EMPTY_DATA was transferred
103         verify(sinkBroker, timeout(1000)).createMergingTransactionChain(any());
104         verify(sinkTx, timeout(1000)).put(eq(LogicalDatastoreType.CONFIGURATION), eq(YangInstanceIdentifier.empty()),
105             any(ContainerNode.class));
106
107         // generate some deltas
108         final int deltaCount = 5;
109         generateModification(getDataBroker(), deltaCount);
110
111         // verify that all the deltas were transferred and committed + 1 invocation from receiving MSG_EMPTY_DATA
112         verify(sinkChain, timeout(2000).times(deltaCount + 1)).newWriteOnlyTransaction();
113         verify(sinkTx, timeout(2000).times(deltaCount + 1)).commit();
114
115         sink.close();
116         source.close();
117     }
118
119     /**
120      * Add some data first and then start replication. Only 1 change is expected to return from the Source and it
121      * should contain the initial state of the Source's datastore.
122      */
123     @Test
124     public void testReplicateInitialState() throws InterruptedException, ExecutionException {
125         // add some data to datastore
126         final int deltaCount = 5;
127         generateModification(getDataBroker(), deltaCount);
128
129         // Make sure to start source...
130         final Registration source = NettyReplication.createSource(support, getDomBroker(), css, true, TEST_PORT,
131             Duration.ZERO, 5);
132         // ... and give it some time start up and open up the port
133         Thread.sleep(1000);
134
135         // Mocking for sink...
136         final DOMTransactionChain sinkChain = mock(DOMTransactionChain.class);
137         final DOMDataTreeWriteTransaction sinkTx = mock(DOMDataTreeWriteTransaction.class);
138         doReturn(CommitInfo.emptyFluentFuture()).when(sinkTx).commit();
139         doReturn(sinkTx).when(sinkChain).newWriteOnlyTransaction();
140         final DOMDataBroker sinkBroker = mock(DOMDataBroker.class);
141         doReturn(sinkChain).when(sinkBroker).createMergingTransactionChain(any());
142
143         // Kick of the sink ...
144         final Registration sink = NettyReplication.createSink(support, sinkBroker, css, true,
145             Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO, Duration.ZERO, 3);
146         // ... and sync on it starting up
147
148         // verify the connection was established and MSG_EMPTY_DATA was transferred
149         verify(sinkBroker, timeout(1000)).createMergingTransactionChain(any());
150         verify(sinkChain, timeout(2000).times(1)).newWriteOnlyTransaction();
151
152         // verify that the initial data invoked onDataTreeChanged() and was transferred to sink
153         ArgumentCaptor<NormalizedNode<?, ?>> dataCaptor = ArgumentCaptor.forClass(NormalizedNode.class);
154         verify(sinkTx, timeout(2000).times(1)).put(any(), any(), dataCaptor.capture());
155         // verify that the initial state contains everything
156         NormalizedNode<?, ?> capturedInitialState = dataCaptor.getAllValues().iterator().next();
157         NormalizedNode<?, ?> expectedEntityState = generateNormalizedNodeForEntities(deltaCount);
158         assertEquals(expectedEntityState, capturedInitialState);
159
160         verify(sinkTx, timeout(2000).times(1)).commit();
161
162         sink.close();
163         source.close();
164     }
165
166     private static ContainerNode generateNormalizedNodeForEntities(final int amount) {
167         final CollectionNodeBuilder<MapEntryNode, MapNode> builder = ImmutableNodes.mapNodeBuilder(ENTITY_QNAME);
168         for (int i = 0; i < amount; i++) {
169             builder.withChild(ImmutableNodes.mapEntry(ENTITY_QNAME, ENTITY_NAME_QNAME, "testEntity" + i));
170         }
171
172         return Builders.containerBuilder()
173                 .withNodeIdentifier(new NodeIdentifier(SchemaContext.NAME))
174                 .withChild(builder.build())
175                 .build();
176     }
177
178     private static void generateModification(final DataBroker broker, final int amount)
179             throws InterruptedException, ExecutionException {
180         for (int i = 0; i < amount; i++) {
181             final WriteTransaction writeTransaction = broker.newWriteOnlyTransaction();
182             final EntityKey key = new EntityKey("testEntity" + i);
183
184             writeTransaction.put(LogicalDatastoreType.CONFIGURATION,
185                 InstanceIdentifier.builder(Entity.class, key).build(), new EntityBuilder().withKey(key).build());
186             writeTransaction.commit().get();
187         }
188     }
189 }