Switch to MD-SAL APIs
[openflowplugin.git] / applications / forwardingrules-sync / src / test / java / org / opendaylight / openflowplugin / applications / frsync / impl / SyncReactorFutureZipDecoratorTest.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.frsync.impl;
9
10 import com.google.common.util.concurrent.Futures;
11 import com.google.common.util.concurrent.ListenableFuture;
12 import com.google.common.util.concurrent.ListeningExecutorService;
13 import com.google.common.util.concurrent.MoreExecutors;
14 import com.google.common.util.concurrent.ThreadFactoryBuilder;
15 import java.util.ArrayList;
16 import java.util.List;
17 import java.util.concurrent.CountDownLatch;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.TimeUnit;
21 import org.junit.After;
22 import org.junit.Before;
23 import org.junit.Test;
24 import org.junit.runner.RunWith;
25 import org.mockito.ArgumentMatchers;
26 import org.mockito.InOrder;
27 import org.mockito.Mock;
28 import org.mockito.Mockito;
29 import org.mockito.junit.MockitoJUnitRunner;
30 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
31 import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
32 import org.opendaylight.openflowplugin.applications.frsync.util.SyncupEntry;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
38 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 /**
43  * Test for {@link SyncReactorFutureZipDecorator}.
44  */
45 @RunWith(MockitoJUnitRunner.class)
46 public class SyncReactorFutureZipDecoratorTest {
47
48     private static final Logger LOG = LoggerFactory.getLogger(SyncReactorFutureZipDecoratorTest.class);
49     private static final NodeId NODE_ID = new NodeId("testNode");
50     private SyncReactorFutureZipDecorator reactor;
51     private InstanceIdentifier<FlowCapableNode> fcNodePath;
52     private ListeningExecutorService syncThreadPool;
53     private final LogicalDatastoreType configDS = LogicalDatastoreType.CONFIGURATION;
54     private final LogicalDatastoreType operationalDS = LogicalDatastoreType.OPERATIONAL;
55
56     @Mock
57     private SyncReactor delegate;
58     @Mock
59     private SyncupEntry syncupEntry;
60
61     @Before
62     public void setUp() {
63         final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
64                 .setDaemon(false)
65                 .setNameFormat("frsync-test-%d")
66                 .setUncaughtExceptionHandler((thread, ex) -> LOG.error("Uncaught exception {}", thread, ex))
67                 .build());
68         syncThreadPool = MoreExecutors.listeningDecorator(executorService);
69         reactor = new SyncReactorFutureZipDecorator(delegate, syncThreadPool);
70         fcNodePath = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(NODE_ID))
71                 .augmentation(FlowCapableNode.class);
72     }
73
74     @Test
75     public void testSyncupWithOptimizedConfigDeltaCompression() throws Exception {
76         final FlowCapableNode dataBefore = Mockito.mock(FlowCapableNode.class);
77         final FlowCapableNode dataAfter = Mockito.mock(FlowCapableNode.class);
78         final FlowCapableNode dataAfter2 = Mockito.mock(FlowCapableNode.class);
79         final CountDownLatch latchForFirst = new CountDownLatch(1);
80         final CountDownLatch latchForNext = new CountDownLatch(1);
81
82         final SyncupEntry first = new SyncupEntry(dataBefore, configDS, null, operationalDS);
83         final SyncupEntry second = new SyncupEntry(dataAfter, configDS, dataBefore, configDS);
84         final SyncupEntry third = new SyncupEntry(null, configDS, dataAfter, configDS);
85         final SyncupEntry fourth = new SyncupEntry(dataAfter2, configDS, null, configDS);
86         final SyncupEntry zipped = new SyncupEntry(dataAfter2, configDS, dataBefore, configDS);
87         final List<ListenableFuture<Boolean>> allResults = new ArrayList<>();
88
89         Mockito.when(delegate.syncup(ArgumentMatchers.<InstanceIdentifier<FlowCapableNode>>any(), Mockito.eq(first)))
90                 .thenAnswer(invocationOnMock -> {
91                     LOG.info("unlocking next configs");
92                     latchForNext.countDown();
93                     latchForFirst.await();
94                     LOG.info("unlocking first delegate");
95                     return Futures.immediateFuture(Boolean.TRUE);
96                 });
97
98         allResults.add(reactor.syncup(fcNodePath, first));
99         latchForNext.await();
100
101         mockSyncupWithEntry(second);
102         allResults.add(reactor.syncup(fcNodePath, second));
103         mockSyncupWithEntry(third);
104         allResults.add(reactor.syncup(fcNodePath, third));
105         mockSyncupWithEntry(fourth);
106         allResults.add(reactor.syncup(fcNodePath, fourth));
107         latchForFirst.countDown();
108
109         Futures.allAsList(allResults).get(1, TimeUnit.SECONDS);
110         LOG.info("all configs done");
111
112         syncThreadPool.shutdown();
113         boolean terminated = syncThreadPool.awaitTermination(1, TimeUnit.SECONDS);
114         if (!terminated) {
115             LOG.info("thread pool not terminated.");
116             syncThreadPool.shutdownNow();
117         }
118         final InOrder inOrder = Mockito.inOrder(delegate);
119         inOrder.verify(delegate).syncup(fcNodePath, first);
120         inOrder.verify(delegate).syncup(fcNodePath, zipped);
121         inOrder.verifyNoMoreInteractions();
122     }
123
124     @Test
125     public void testSyncupConfigEmptyQueue() throws Exception {
126         final FlowCapableNode dataBefore = Mockito.mock(FlowCapableNode.class);
127         final FlowCapableNode dataAfter = Mockito.mock(FlowCapableNode.class);
128         final CountDownLatch latchForNext = new CountDownLatch(1);
129
130         final SyncupEntry first = new SyncupEntry(dataBefore, configDS, null, configDS);
131         final SyncupEntry second = new SyncupEntry(dataAfter, configDS, dataBefore, configDS);
132
133         Mockito.when(delegate.syncup(ArgumentMatchers.<InstanceIdentifier<FlowCapableNode>>any(), Mockito.eq(first)))
134                 .thenAnswer(invocationOnMock -> {
135                     LOG.info("unlocking next config");
136                     latchForNext.countDown();
137                     return Futures.immediateFuture(Boolean.TRUE);
138                 });
139
140         reactor.syncup(fcNodePath, first);
141         latchForNext.await();
142         mockSyncupWithEntry(second);
143         reactor.syncup(fcNodePath, second);
144
145         boolean terminated = syncThreadPool.awaitTermination(1, TimeUnit.SECONDS);
146         if (!terminated) {
147             LOG.info("thread pool not terminated.");
148             syncThreadPool.shutdownNow();
149         }
150         final InOrder inOrder = Mockito.inOrder(delegate);
151         inOrder.verify(delegate).syncup(fcNodePath, first);
152         inOrder.verify(delegate).syncup(fcNodePath, second);
153         inOrder.verifyNoMoreInteractions();
154     }
155
156     @Test
157     public void testSyncupRewriteZipEntryWithOperationalDelta() throws Exception {
158         final FlowCapableNode configBefore = Mockito.mock(FlowCapableNode.class);
159         final FlowCapableNode configAfter = Mockito.mock(FlowCapableNode.class);
160         final FlowCapableNode configActual = Mockito.mock(FlowCapableNode.class);
161         final FlowCapableNode freshOperational = Mockito.mock(FlowCapableNode.class);
162         final CountDownLatch latchForFirst = new CountDownLatch(1);
163         final CountDownLatch latchForNext = new CountDownLatch(1);
164
165         final SyncupEntry first = new SyncupEntry(configAfter, configDS, configBefore, configDS);
166         final SyncupEntry second = new SyncupEntry(configActual, configDS, freshOperational, operationalDS);
167
168         Mockito.when(delegate.syncup(ArgumentMatchers.<InstanceIdentifier<FlowCapableNode>>any(), Mockito.eq(first)))
169                 .thenAnswer(invocationOnMock -> {
170                     LOG.info("unlocking for fresh operational");
171                     latchForNext.countDown();
172                     latchForFirst.await();
173                     LOG.info("unlocking first delegate");
174                     return Futures.immediateFuture(Boolean.TRUE);
175                 });
176
177         reactor.syncup(fcNodePath, first);
178         latchForNext.await();
179
180         mockSyncupWithEntry(second);
181         reactor.syncup(fcNodePath, second);
182         latchForFirst.countDown();
183
184         syncThreadPool.shutdown();
185         boolean terminated = syncThreadPool.awaitTermination(1, TimeUnit.SECONDS);
186         if (!terminated) {
187             LOG.info("thread pool not terminated.");
188             syncThreadPool.shutdownNow();
189         }
190         Mockito.verify(delegate, Mockito.times(1)).syncup(fcNodePath, second);
191     }
192
193     private void mockSyncupWithEntry(final SyncupEntry entry) {
194         Mockito.when(delegate.syncup(ArgumentMatchers.any(), Mockito.eq(entry)))
195                 .thenReturn(Futures.immediateFuture(Boolean.TRUE));
196     }
197
198     @After
199     public void tearDown() {
200         syncThreadPool.shutdownNow();
201     }
202 }