Bug 6745 Improve compression queue locking and handle InterruptedException
[openflowplugin.git] / applications / forwardingrules-sync / src / test / java / org / opendaylight / openflowplugin / applications / frsync / impl / SyncReactorFutureZipDecoratorTest.java
1 /**
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.frsync.impl;
10
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import com.google.common.util.concurrent.ListeningExecutorService;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import com.google.common.util.concurrent.ThreadFactoryBuilder;
16 import java.util.ArrayList;
17 import java.util.List;
18 import java.util.concurrent.CountDownLatch;
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.TimeUnit;
22 import org.junit.After;
23 import org.junit.Before;
24 import org.junit.Test;
25 import org.junit.runner.RunWith;
26 import org.mockito.InOrder;
27 import org.mockito.Matchers;
28 import org.mockito.Mock;
29 import org.mockito.Mockito;
30 import org.mockito.invocation.InvocationOnMock;
31 import org.mockito.runners.MockitoJUnitRunner;
32 import org.mockito.stubbing.Answer;
33 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
34 import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper;
35 import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
36 import org.opendaylight.openflowplugin.applications.frsync.util.SemaphoreKeeperGuavaImpl;
37 import org.opendaylight.openflowplugin.applications.frsync.util.SyncupEntry;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
43 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 /**
48  * Test for {@link SyncReactorFutureZipDecorator}.
49  */
50 @RunWith(MockitoJUnitRunner.class)
51 public class SyncReactorFutureZipDecoratorTest {
52
53     private static final Logger LOG = LoggerFactory.getLogger(SyncReactorFutureZipDecoratorTest.class);
54     private static final NodeId NODE_ID = new NodeId("testNode");
55     private SyncReactorFutureZipDecorator reactor;
56     private InstanceIdentifier<FlowCapableNode> fcNodePath;
57     private ListeningExecutorService syncThreadPool;
58     private final LogicalDatastoreType configDS = LogicalDatastoreType.CONFIGURATION;
59     private final LogicalDatastoreType operationalDS = LogicalDatastoreType.OPERATIONAL;
60
61     @Mock
62     private SyncReactor delegate;
63     @Mock
64     private SyncupEntry syncupEntry;
65
66     @Before
67     public void setUp() {
68         final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper = new SemaphoreKeeperGuavaImpl<>(1, true);
69         final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
70                 .setDaemon(false)
71                 .setNameFormat("frsync-test-%d")
72                 .setUncaughtExceptionHandler((thread, e) -> LOG.error("Uncaught exception {}", thread, e))
73                 .build());
74         syncThreadPool = MoreExecutors.listeningDecorator(executorService);
75         reactor = new SyncReactorFutureZipDecorator(delegate, syncThreadPool, semaphoreKeeper);
76         fcNodePath = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(NODE_ID))
77                 .augmentation(FlowCapableNode.class);
78     }
79
80     @Test
81     public void testSyncupWithOptimizedConfigDeltaCompression() throws Exception {
82         final FlowCapableNode dataBefore = Mockito.mock(FlowCapableNode.class);
83         final FlowCapableNode dataAfter = Mockito.mock(FlowCapableNode.class);
84         final FlowCapableNode dataAfter2 = Mockito.mock(FlowCapableNode.class);
85         final CountDownLatch latchForFirst = new CountDownLatch(1);
86         final CountDownLatch latchForNext = new CountDownLatch(1);
87
88         final SyncupEntry first = new SyncupEntry(dataBefore, configDS, null, operationalDS);
89         final SyncupEntry second = new SyncupEntry(dataAfter, configDS, dataBefore, configDS);
90         final SyncupEntry third = new SyncupEntry(null, configDS, dataAfter, configDS);
91         final SyncupEntry fourth = new SyncupEntry(dataAfter2, configDS, null, configDS);
92         final SyncupEntry zipped = new SyncupEntry(dataAfter2, configDS, dataBefore, configDS);
93         final List<ListenableFuture<Boolean>> allResults = new ArrayList<>();
94
95         Mockito.when(delegate.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Mockito.eq(first)))
96                 .thenAnswer(new Answer<ListenableFuture<Boolean>>() {
97                     @Override
98                     public ListenableFuture<Boolean> answer(final InvocationOnMock invocationOnMock) throws Throwable {
99                         LOG.info("unlocking next configs");
100                         latchForNext.countDown();
101                         latchForFirst.await();
102                         LOG.info("unlocking first delegate");
103                         return Futures.immediateFuture(Boolean.TRUE);
104                     }
105                 });
106
107         allResults.add(reactor.syncup(fcNodePath, first));
108         latchForNext.await();
109
110         mockSyncupWithEntry(second);
111         allResults.add(reactor.syncup(fcNodePath, second));
112         mockSyncupWithEntry(third);
113         allResults.add(reactor.syncup(fcNodePath, third));
114         mockSyncupWithEntry(fourth);
115         allResults.add(reactor.syncup(fcNodePath, fourth));
116         latchForFirst.countDown();
117
118         Futures.allAsList(allResults).get(1, TimeUnit.SECONDS);
119         LOG.info("all configs done");
120
121         syncThreadPool.shutdown();
122         boolean terminated = syncThreadPool.awaitTermination(1, TimeUnit.SECONDS);
123         if (!terminated) {
124             LOG.info("thread pool not terminated.");
125             syncThreadPool.shutdownNow();
126         }
127         final InOrder inOrder = Mockito.inOrder(delegate);
128         inOrder.verify(delegate).syncup(fcNodePath, first);
129         inOrder.verify(delegate).syncup(fcNodePath, zipped);
130         inOrder.verifyNoMoreInteractions();
131     }
132
133     @Test
134     public void testSyncupConfigEmptyQueue() throws Exception {
135         final FlowCapableNode dataBefore = Mockito.mock(FlowCapableNode.class);
136         final FlowCapableNode dataAfter = Mockito.mock(FlowCapableNode.class);
137         final CountDownLatch latchForNext = new CountDownLatch(1);
138
139         final SyncupEntry first = new SyncupEntry(dataBefore, configDS, null, configDS);
140         final SyncupEntry second = new SyncupEntry(dataAfter, configDS, dataBefore, configDS);
141
142         Mockito.when(delegate.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Mockito.eq(first)))
143                 .thenAnswer(new Answer<ListenableFuture<Boolean>>() {
144             @Override
145             public ListenableFuture<Boolean> answer(final InvocationOnMock invocationOnMock) throws Throwable {
146                 LOG.info("unlocking next config");
147                 latchForNext.countDown();
148                 return Futures.immediateFuture(Boolean.TRUE);
149             }
150             });
151
152         reactor.syncup(fcNodePath, first);
153         latchForNext.await();
154         mockSyncupWithEntry(second);
155         reactor.syncup(fcNodePath, second);
156
157         boolean terminated = syncThreadPool.awaitTermination(1, TimeUnit.SECONDS);
158         if (!terminated) {
159             LOG.info("thread pool not terminated.");
160             syncThreadPool.shutdownNow();
161         }
162         final InOrder inOrder = Mockito.inOrder(delegate);
163         inOrder.verify(delegate).syncup(fcNodePath, first);
164         inOrder.verify(delegate).syncup(fcNodePath, second);
165         inOrder.verifyNoMoreInteractions();
166     }
167
168     @Test
169     public void testSyncupRewriteZipEntryWithOperationalDelta() throws Exception {
170         final FlowCapableNode configBefore = Mockito.mock(FlowCapableNode.class);
171         final FlowCapableNode configAfter = Mockito.mock(FlowCapableNode.class);
172         final FlowCapableNode configActual = Mockito.mock(FlowCapableNode.class);
173         final FlowCapableNode freshOperational = Mockito.mock(FlowCapableNode.class);
174         final CountDownLatch latchForFirst = new CountDownLatch(1);
175         final CountDownLatch latchForNext = new CountDownLatch(1);
176
177         final SyncupEntry first = new SyncupEntry(configAfter, configDS, configBefore, configDS);
178         final SyncupEntry second = new SyncupEntry(configActual, configDS, freshOperational, operationalDS);
179
180         Mockito.when(delegate.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Mockito.eq(first)))
181                 .thenAnswer(new Answer<ListenableFuture<Boolean>>() {
182             @Override
183             public ListenableFuture<Boolean> answer(final InvocationOnMock invocationOnMock) throws Throwable {
184                 LOG.info("unlocking for fresh operational");
185                 latchForNext.countDown();
186                 latchForFirst.await();
187                 LOG.info("unlocking first delegate");
188                 return Futures.immediateFuture(Boolean.TRUE);
189             }
190         });
191
192         reactor.syncup(fcNodePath, first);
193         latchForNext.await();
194
195         mockSyncupWithEntry(second);
196         reactor.syncup(fcNodePath, second);
197         latchForFirst.countDown();
198
199         syncThreadPool.shutdown();
200         boolean terminated = syncThreadPool.awaitTermination(1, TimeUnit.SECONDS);
201         if (!terminated) {
202             LOG.info("thread pool not terminated.");
203             syncThreadPool.shutdownNow();
204         }
205         Mockito.verify(delegate, Mockito.times(1)).syncup(fcNodePath, second);
206     }
207
208     private void mockSyncupWithEntry(final SyncupEntry entry) {
209         Mockito.when(delegate.syncup(Matchers.any(), Mockito.eq(entry)))
210                 .thenReturn(Futures.immediateFuture(Boolean.TRUE));
211     }
212
213     @After
214     public void tearDown() {
215         syncThreadPool.shutdownNow();
216     }
217 }