a2f1920e8bc05696d850ef3923aaf3d80b56b77c
[mdsal.git] / replicate / mdsal-replicate-netty / src / test / java / org / opendaylight / mdsal / replicate / netty / IntegrationTest.java
1 /*
2  * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.replicate.netty;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.mockito.ArgumentMatchers.any;
12 import static org.mockito.ArgumentMatchers.eq;
13 import static org.mockito.Mockito.doReturn;
14 import static org.mockito.Mockito.mock;
15 import static org.mockito.Mockito.timeout;
16 import static org.mockito.Mockito.verify;
17
18 import java.net.Inet4Address;
19 import java.time.Duration;
20 import java.util.concurrent.ExecutionException;
21 import org.junit.After;
22 import org.junit.Before;
23 import org.junit.Test;
24 import org.junit.runner.RunWith;
25 import org.mockito.ArgumentCaptor;
26 import org.mockito.junit.MockitoJUnitRunner;
27 import org.opendaylight.mdsal.binding.api.DataBroker;
28 import org.opendaylight.mdsal.binding.api.WriteTransaction;
29 import org.opendaylight.mdsal.binding.dom.adapter.test.AbstractDataBrokerTest;
30 import org.opendaylight.mdsal.common.api.CommitInfo;
31 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
32 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
33 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
34 import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
35 import org.opendaylight.mdsal.eos.dom.simple.SimpleDOMEntityOwnershipService;
36 import org.opendaylight.mdsal.singleton.dom.impl.EOSClusterSingletonServiceProvider;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.mdsal.core.general.entity.rev150930.Entity;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.mdsal.core.general.entity.rev150930.EntityBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.mdsal.core.general.entity.rev150930.EntityKey;
40 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
41 import org.opendaylight.yangtools.yang.common.QName;
42 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
43 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
44 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
45 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
46 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
47 import org.opendaylight.yangtools.yang.data.spi.node.ImmutableNodes;
48 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
49
50 @RunWith(MockitoJUnitRunner.StrictStubs.class)
51 public class IntegrationTest extends AbstractDataBrokerTest {
52     private static final int TEST_PORT = 4000;
53
54     private static final QName ENTITY_QNAME = Entity.QNAME;
55     private static final QName ENTITY_NAME_QNAME = QName.create(ENTITY_QNAME, "name");
56
57     private AbstractBootstrapSupport support;
58     private EOSClusterSingletonServiceProvider css;
59
60     @Before
61     public void before() {
62         support = AbstractBootstrapSupport.create();
63         css = new EOSClusterSingletonServiceProvider(new SimpleDOMEntityOwnershipService());
64     }
65
66     @After
67     public void after() throws Exception {
68         support.close();
69         css.close();
70     }
71
72     /**
73      * Connect the Source and Sink. Verify that the initial state returned from Source was empty. After that make some
74      * changes to the Source's datastore and verify that they were all replicated to the Sink
75      */
76     @Test
77     public void testSourceToSink() throws InterruptedException, ExecutionException {
78         // Make sure to start source...
79         try (var source = NettyReplicationSource.createSource(support, getDomBroker(), css, true, TEST_PORT,
80             Duration.ZERO, 5)) {
81             // ... and give it some time start up and open up the port
82             Thread.sleep(1000);
83
84             // Mocking for sink...
85             final DOMTransactionChain sinkChain = mock(DOMTransactionChain.class);
86             final DOMDataTreeWriteTransaction sinkTx = mock(DOMDataTreeWriteTransaction.class);
87             doReturn(CommitInfo.emptyFluentFuture()).when(sinkTx).commit();
88             doReturn(sinkTx).when(sinkChain).newWriteOnlyTransaction();
89             final DOMDataBroker sinkBroker = mock(DOMDataBroker.class);
90             doReturn(sinkChain).when(sinkBroker).createMergingTransactionChain();
91
92             // Kick of the sink ...
93             try (var sink = NettyReplicationSink.createSink(support, sinkBroker, css, true,
94                 Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO, Duration.ZERO, 3)) {
95                 // ... and sync on it starting up
96
97                 // verify the connection was established and MSG_EMPTY_DATA was transferred
98                 verify(sinkBroker, timeout(1000)).createMergingTransactionChain();
99                 verify(sinkTx, timeout(1000)).put(eq(LogicalDatastoreType.CONFIGURATION),
100                     eq(YangInstanceIdentifier.of()), any(ContainerNode.class));
101
102                 // generate some deltas
103                 final int deltaCount = 5;
104                 generateModification(getDataBroker(), deltaCount);
105
106                 // verify that all the deltas were transferred and committed + 1 invocation from receiving
107                 // MSG_EMPTY_DATA
108                 verify(sinkChain, timeout(2000).times(deltaCount + 1)).newWriteOnlyTransaction();
109                 verify(sinkTx, timeout(2000).times(deltaCount + 1)).commit();
110             }
111         }
112     }
113
114     /**
115      * Add some data first and then start replication. Only 1 change is expected to return from the Source and it
116      * should contain the initial state of the Source's datastore.
117      */
118     @Test
119     public void testReplicateInitialState() throws InterruptedException, ExecutionException {
120         // add some data to datastore
121         final int deltaCount = 5;
122         generateModification(getDataBroker(), deltaCount);
123
124         // Make sure to start source...
125         try (var source = NettyReplicationSource.createSource(support, getDomBroker(), css, true, TEST_PORT,
126             Duration.ZERO, 5)) {
127             // ... and give it some time start up and open up the port
128             Thread.sleep(1000);
129
130             // Mocking for sink...
131             final DOMTransactionChain sinkChain = mock(DOMTransactionChain.class);
132             final DOMDataTreeWriteTransaction sinkTx = mock(DOMDataTreeWriteTransaction.class);
133             doReturn(CommitInfo.emptyFluentFuture()).when(sinkTx).commit();
134             doReturn(sinkTx).when(sinkChain).newWriteOnlyTransaction();
135             final DOMDataBroker sinkBroker = mock(DOMDataBroker.class);
136             doReturn(sinkChain).when(sinkBroker).createMergingTransactionChain();
137
138             // Kick of the sink ...
139             try (var sink = NettyReplicationSink.createSink(support, sinkBroker, css, true,
140                 Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO, Duration.ZERO, 3)) {
141                 // ... and sync on it starting up
142
143                 // verify the connection was established and MSG_EMPTY_DATA was transferred
144                 verify(sinkBroker, timeout(1000)).createMergingTransactionChain();
145                 verify(sinkChain, timeout(2000).times(1)).newWriteOnlyTransaction();
146
147                 // verify that the initial data invoked onDataTreeChanged() and was transferred to sink
148                 final var dataCaptor = ArgumentCaptor.forClass(NormalizedNode.class);
149                 verify(sinkTx, timeout(2000).times(1)).put(any(), any(), dataCaptor.capture());
150                 // verify that the initial state contains everything
151                 NormalizedNode capturedInitialState = dataCaptor.getAllValues().iterator().next();
152                 NormalizedNode expectedEntityState = generateNormalizedNodeForEntities(deltaCount);
153                 assertEquals(expectedEntityState, capturedInitialState);
154
155                 verify(sinkTx, timeout(2000).times(1)).commit();
156             }
157         }
158     }
159
160     private static ContainerNode generateNormalizedNodeForEntities(final int amount) {
161         final var builder = ImmutableNodes.newSystemMapBuilder().withNodeIdentifier(new NodeIdentifier(ENTITY_QNAME));
162         for (int i = 0; i < amount; i++) {
163             final var name = "testEntity" + i;
164             builder.withChild(ImmutableNodes.newMapEntryBuilder()
165                 .withNodeIdentifier(NodeIdentifierWithPredicates.of(ENTITY_QNAME, ENTITY_NAME_QNAME, name))
166                 .build());
167         }
168
169         return ImmutableNodes.newContainerBuilder()
170                 .withNodeIdentifier(new NodeIdentifier(SchemaContext.NAME))
171                 .withChild(builder.build())
172                 .build();
173     }
174
175     private static void generateModification(final DataBroker broker, final int amount)
176             throws InterruptedException, ExecutionException {
177         for (int i = 0; i < amount; i++) {
178             final WriteTransaction writeTransaction = broker.newWriteOnlyTransaction();
179             final EntityKey key = new EntityKey("testEntity" + i);
180
181             writeTransaction.put(LogicalDatastoreType.CONFIGURATION,
182                 InstanceIdentifier.builder(Entity.class, key).build(), new EntityBuilder().withKey(key).build());
183             writeTransaction.commit().get();
184         }
185     }
186 }