Revert "Bug 6745 Do not ignore syncup return value"
[openflowplugin.git] / applications / forwardingrules-sync / src / test / java / org / opendaylight / openflowplugin / applications / frsync / impl / SyncReactorFutureZipDecoratorTest.java
1 /**
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.frsync.impl;
10
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import com.google.common.util.concurrent.ListeningExecutorService;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import com.google.common.util.concurrent.ThreadFactoryBuilder;
16 import java.util.ArrayList;
17 import java.util.List;
18 import java.util.concurrent.CountDownLatch;
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.TimeUnit;
22 import org.junit.After;
23 import org.junit.Before;
24 import org.junit.Test;
25 import org.junit.runner.RunWith;
26 import org.mockito.InOrder;
27 import org.mockito.Matchers;
28 import org.mockito.Mock;
29 import org.mockito.Mockito;
30 import org.mockito.invocation.InvocationOnMock;
31 import org.mockito.runners.MockitoJUnitRunner;
32 import org.mockito.stubbing.Answer;
33 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
34 import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
35 import org.opendaylight.openflowplugin.applications.frsync.util.SyncupEntry;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
41 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 /**
46  * Test for {@link SyncReactorFutureZipDecorator}.
47  */
48 @RunWith(MockitoJUnitRunner.class)
49 public class SyncReactorFutureZipDecoratorTest {
50
51     private static final Logger LOG = LoggerFactory.getLogger(SyncReactorFutureZipDecoratorTest.class);
52     private static final NodeId NODE_ID = new NodeId("testNode");
53     private SyncReactorFutureZipDecorator reactor;
54     private InstanceIdentifier<FlowCapableNode> fcNodePath;
55     private ListeningExecutorService syncThreadPool;
56     private final LogicalDatastoreType configDS = LogicalDatastoreType.CONFIGURATION;
57     private final LogicalDatastoreType operationalDS = LogicalDatastoreType.OPERATIONAL;
58
59     @Mock
60     private SyncReactor delegate;
61     @Mock
62     private SyncupEntry syncupEntry;
63
64     @Before
65     public void setUp() {
66         final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
67                 .setDaemon(false)
68                 .setNameFormat(SyncReactorFutureDecorator.FRM_RPC_CLIENT_PREFIX)
69                 .setUncaughtExceptionHandler((thread, e) -> LOG.error("Uncaught exception {}", thread, e))
70                 .build());
71         syncThreadPool = MoreExecutors.listeningDecorator(executorService);
72         reactor = new SyncReactorFutureZipDecorator(delegate, syncThreadPool);
73         fcNodePath = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(NODE_ID))
74                 .augmentation(FlowCapableNode.class);
75     }
76
77     @Test
78     public void testSyncupWithOptimizedConfigDeltaCompression() throws Exception {
79         final FlowCapableNode dataBefore = Mockito.mock(FlowCapableNode.class);
80         final FlowCapableNode dataAfter = Mockito.mock(FlowCapableNode.class);
81         final FlowCapableNode dataAfter2 = Mockito.mock(FlowCapableNode.class);
82         final CountDownLatch latchForFirst = new CountDownLatch(1);
83         final CountDownLatch latchForNext = new CountDownLatch(1);
84
85         final SyncupEntry first = new SyncupEntry(dataBefore, configDS, null, operationalDS);
86         final SyncupEntry second = new SyncupEntry(dataAfter, configDS, dataBefore, configDS);
87         final SyncupEntry third = new SyncupEntry(null, configDS, dataAfter, configDS);
88         final SyncupEntry fourth = new SyncupEntry(dataAfter2, configDS, null, configDS);
89         final SyncupEntry zipped = new SyncupEntry(dataAfter2, configDS, dataBefore, configDS);
90         final List<ListenableFuture<Boolean>> allResults = new ArrayList<>();
91
92         Mockito.when(delegate.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Mockito.eq(first)))
93                 .thenAnswer(new Answer<ListenableFuture<Boolean>>() {
94                     @Override
95                     public ListenableFuture<Boolean> answer(final InvocationOnMock invocationOnMock) throws Throwable {
96                         LOG.info("unlocking next configs");
97                         latchForNext.countDown();
98                         latchForFirst.await();
99                         LOG.info("unlocking first delegate");
100                         return Futures.immediateFuture(Boolean.TRUE);
101                     }
102                 });
103
104         allResults.add(reactor.syncup(fcNodePath, first));
105         latchForNext.await();
106
107         mockSyncupWithEntry(second);
108         allResults.add(reactor.syncup(fcNodePath, second));
109         mockSyncupWithEntry(third);
110         allResults.add(reactor.syncup(fcNodePath, third));
111         mockSyncupWithEntry(fourth);
112         allResults.add(reactor.syncup(fcNodePath, fourth));
113         latchForFirst.countDown();
114
115         Futures.allAsList(allResults).get(1, TimeUnit.SECONDS);
116         LOG.info("all configs done");
117
118         syncThreadPool.shutdown();
119         boolean terminated = syncThreadPool.awaitTermination(1, TimeUnit.SECONDS);
120         if (!terminated) {
121             LOG.info("thread pool not terminated.");
122             syncThreadPool.shutdownNow();
123         }
124         final InOrder inOrder = Mockito.inOrder(delegate);
125         inOrder.verify(delegate).syncup(fcNodePath, first);
126         inOrder.verify(delegate).syncup(fcNodePath, zipped);
127         inOrder.verifyNoMoreInteractions();
128     }
129
130     @Test
131     public void testSyncupConfigEmptyQueue() throws Exception {
132         final FlowCapableNode dataBefore = Mockito.mock(FlowCapableNode.class);
133         final FlowCapableNode dataAfter = Mockito.mock(FlowCapableNode.class);
134         final CountDownLatch latchForNext = new CountDownLatch(1);
135
136         final SyncupEntry first = new SyncupEntry(dataBefore, configDS, null, configDS);
137         final SyncupEntry second = new SyncupEntry(dataAfter, configDS, dataBefore, configDS);
138
139         Mockito.when(delegate.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Mockito.eq(first)))
140                 .thenAnswer(new Answer<ListenableFuture<Boolean>>() {
141             @Override
142             public ListenableFuture<Boolean> answer(final InvocationOnMock invocationOnMock) throws Throwable {
143                 LOG.info("unlocking next config");
144                 latchForNext.countDown();
145                 return Futures.immediateFuture(Boolean.TRUE);
146             }
147             });
148
149         reactor.syncup(fcNodePath, first);
150         latchForNext.await();
151         mockSyncupWithEntry(second);
152         reactor.syncup(fcNodePath, second);
153
154         boolean terminated = syncThreadPool.awaitTermination(1, TimeUnit.SECONDS);
155         if (!terminated) {
156             LOG.info("thread pool not terminated.");
157             syncThreadPool.shutdownNow();
158         }
159         final InOrder inOrder = Mockito.inOrder(delegate);
160         inOrder.verify(delegate).syncup(fcNodePath, first);
161         inOrder.verify(delegate).syncup(fcNodePath, second);
162         inOrder.verifyNoMoreInteractions();
163     }
164
165     @Test
166     public void testSyncupRewriteZipEntryWithOperationalDelta() throws Exception {
167         final FlowCapableNode configBefore = Mockito.mock(FlowCapableNode.class);
168         final FlowCapableNode configAfter = Mockito.mock(FlowCapableNode.class);
169         final FlowCapableNode configActual = Mockito.mock(FlowCapableNode.class);
170         final FlowCapableNode freshOperational = Mockito.mock(FlowCapableNode.class);
171         final CountDownLatch latchForFirst = new CountDownLatch(1);
172         final CountDownLatch latchForNext = new CountDownLatch(1);
173
174         final SyncupEntry first = new SyncupEntry(configAfter, configDS, configBefore, configDS);
175         final SyncupEntry second = new SyncupEntry(configActual, configDS, freshOperational, operationalDS);
176
177         Mockito.when(delegate.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Mockito.eq(first)))
178                 .thenAnswer(new Answer<ListenableFuture<Boolean>>() {
179             @Override
180             public ListenableFuture<Boolean> answer(final InvocationOnMock invocationOnMock) throws Throwable {
181                 LOG.info("unlocking for fresh operational");
182                 latchForNext.countDown();
183                 latchForFirst.await();
184                 LOG.info("unlocking first delegate");
185                 return Futures.immediateFuture(Boolean.TRUE);
186             }
187         });
188
189         reactor.syncup(fcNodePath, first);
190         latchForNext.await();
191
192         mockSyncupWithEntry(second);
193         reactor.syncup(fcNodePath, second);
194         latchForFirst.countDown();
195
196         syncThreadPool.shutdown();
197         boolean terminated = syncThreadPool.awaitTermination(1, TimeUnit.SECONDS);
198         if (!terminated) {
199             LOG.info("thread pool not terminated.");
200             syncThreadPool.shutdownNow();
201         }
202         Mockito.verify(delegate, Mockito.times(1)).syncup(fcNodePath, second);
203     }
204
205     private void mockSyncupWithEntry(final SyncupEntry entry) throws InterruptedException {
206         Mockito.when(delegate.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Mockito.eq(entry)))
207                 .thenReturn(Futures.immediateFuture(Boolean.TRUE));
208     }
209
210     @After
211     public void tearDown() {
212         syncThreadPool.shutdownNow();
213     }
214 }