2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.applications.frsync.impl;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import com.google.common.util.concurrent.ListeningExecutorService;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import com.google.common.util.concurrent.ThreadFactoryBuilder;
16 import java.util.ArrayList;
17 import java.util.List;
18 import java.util.concurrent.CountDownLatch;
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.TimeUnit;
22 import org.junit.After;
23 import org.junit.Before;
24 import org.junit.Test;
25 import org.junit.runner.RunWith;
26 import org.mockito.InOrder;
27 import org.mockito.Matchers;
28 import org.mockito.Mock;
29 import org.mockito.Mockito;
30 import org.mockito.invocation.InvocationOnMock;
31 import org.mockito.runners.MockitoJUnitRunner;
32 import org.mockito.stubbing.Answer;
33 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
34 import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper;
35 import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
36 import org.opendaylight.openflowplugin.applications.frsync.util.SemaphoreKeeperGuavaImpl;
37 import org.opendaylight.openflowplugin.applications.frsync.util.SyncupEntry;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
43 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
48 * Test for {@link SyncReactorFutureZipDecorator}.
50 @RunWith(MockitoJUnitRunner.class)
51 public class SyncReactorFutureZipDecoratorTest {
53 private static final Logger LOG = LoggerFactory.getLogger(SyncReactorFutureZipDecoratorTest.class);
54 private static final NodeId NODE_ID = new NodeId("testNode");
55 private SyncReactorFutureZipDecorator reactor;
56 private InstanceIdentifier<FlowCapableNode> fcNodePath;
57 private ListeningExecutorService syncThreadPool;
58 private final LogicalDatastoreType configDS = LogicalDatastoreType.CONFIGURATION;
59 private final LogicalDatastoreType operationalDS = LogicalDatastoreType.OPERATIONAL;
62 private SyncReactor delegate;
64 private SyncupEntry syncupEntry;
68 final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper = new SemaphoreKeeperGuavaImpl<>(1, true);
69 final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
71 .setNameFormat("frsync-test-%d")
72 .setUncaughtExceptionHandler((thread, e) -> LOG.error("Uncaught exception {}", thread, e))
74 syncThreadPool = MoreExecutors.listeningDecorator(executorService);
75 reactor = new SyncReactorFutureZipDecorator(delegate, syncThreadPool, semaphoreKeeper);
76 fcNodePath = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(NODE_ID))
77 .augmentation(FlowCapableNode.class);
81 public void testSyncupWithOptimizedConfigDeltaCompression() throws Exception {
82 final FlowCapableNode dataBefore = Mockito.mock(FlowCapableNode.class);
83 final FlowCapableNode dataAfter = Mockito.mock(FlowCapableNode.class);
84 final FlowCapableNode dataAfter2 = Mockito.mock(FlowCapableNode.class);
85 final CountDownLatch latchForFirst = new CountDownLatch(1);
86 final CountDownLatch latchForNext = new CountDownLatch(1);
88 final SyncupEntry first = new SyncupEntry(dataBefore, configDS, null, operationalDS);
89 final SyncupEntry second = new SyncupEntry(dataAfter, configDS, dataBefore, configDS);
90 final SyncupEntry third = new SyncupEntry(null, configDS, dataAfter, configDS);
91 final SyncupEntry fourth = new SyncupEntry(dataAfter2, configDS, null, configDS);
92 final SyncupEntry zipped = new SyncupEntry(dataAfter2, configDS, dataBefore, configDS);
93 final List<ListenableFuture<Boolean>> allResults = new ArrayList<>();
95 Mockito.when(delegate.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Mockito.eq(first)))
96 .thenAnswer(new Answer<ListenableFuture<Boolean>>() {
98 public ListenableFuture<Boolean> answer(final InvocationOnMock invocationOnMock) throws Throwable {
99 LOG.info("unlocking next configs");
100 latchForNext.countDown();
101 latchForFirst.await();
102 LOG.info("unlocking first delegate");
103 return Futures.immediateFuture(Boolean.TRUE);
107 allResults.add(reactor.syncup(fcNodePath, first));
108 latchForNext.await();
110 mockSyncupWithEntry(second);
111 allResults.add(reactor.syncup(fcNodePath, second));
112 mockSyncupWithEntry(third);
113 allResults.add(reactor.syncup(fcNodePath, third));
114 mockSyncupWithEntry(fourth);
115 allResults.add(reactor.syncup(fcNodePath, fourth));
116 latchForFirst.countDown();
118 Futures.allAsList(allResults).get(1, TimeUnit.SECONDS);
119 LOG.info("all configs done");
121 syncThreadPool.shutdown();
122 boolean terminated = syncThreadPool.awaitTermination(1, TimeUnit.SECONDS);
124 LOG.info("thread pool not terminated.");
125 syncThreadPool.shutdownNow();
127 final InOrder inOrder = Mockito.inOrder(delegate);
128 inOrder.verify(delegate).syncup(fcNodePath, first);
129 inOrder.verify(delegate).syncup(fcNodePath, zipped);
130 inOrder.verifyNoMoreInteractions();
134 public void testSyncupConfigEmptyQueue() throws Exception {
135 final FlowCapableNode dataBefore = Mockito.mock(FlowCapableNode.class);
136 final FlowCapableNode dataAfter = Mockito.mock(FlowCapableNode.class);
137 final CountDownLatch latchForNext = new CountDownLatch(1);
139 final SyncupEntry first = new SyncupEntry(dataBefore, configDS, null, configDS);
140 final SyncupEntry second = new SyncupEntry(dataAfter, configDS, dataBefore, configDS);
142 Mockito.when(delegate.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Mockito.eq(first)))
143 .thenAnswer(new Answer<ListenableFuture<Boolean>>() {
145 public ListenableFuture<Boolean> answer(final InvocationOnMock invocationOnMock) throws Throwable {
146 LOG.info("unlocking next config");
147 latchForNext.countDown();
148 return Futures.immediateFuture(Boolean.TRUE);
152 reactor.syncup(fcNodePath, first);
153 latchForNext.await();
154 mockSyncupWithEntry(second);
155 reactor.syncup(fcNodePath, second);
157 boolean terminated = syncThreadPool.awaitTermination(1, TimeUnit.SECONDS);
159 LOG.info("thread pool not terminated.");
160 syncThreadPool.shutdownNow();
162 final InOrder inOrder = Mockito.inOrder(delegate);
163 inOrder.verify(delegate).syncup(fcNodePath, first);
164 inOrder.verify(delegate).syncup(fcNodePath, second);
165 inOrder.verifyNoMoreInteractions();
169 public void testSyncupRewriteZipEntryWithOperationalDelta() throws Exception {
170 final FlowCapableNode configBefore = Mockito.mock(FlowCapableNode.class);
171 final FlowCapableNode configAfter = Mockito.mock(FlowCapableNode.class);
172 final FlowCapableNode configActual = Mockito.mock(FlowCapableNode.class);
173 final FlowCapableNode freshOperational = Mockito.mock(FlowCapableNode.class);
174 final CountDownLatch latchForFirst = new CountDownLatch(1);
175 final CountDownLatch latchForNext = new CountDownLatch(1);
177 final SyncupEntry first = new SyncupEntry(configAfter, configDS, configBefore, configDS);
178 final SyncupEntry second = new SyncupEntry(configActual, configDS, freshOperational, operationalDS);
180 Mockito.when(delegate.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Mockito.eq(first)))
181 .thenAnswer(new Answer<ListenableFuture<Boolean>>() {
183 public ListenableFuture<Boolean> answer(final InvocationOnMock invocationOnMock) throws Throwable {
184 LOG.info("unlocking for fresh operational");
185 latchForNext.countDown();
186 latchForFirst.await();
187 LOG.info("unlocking first delegate");
188 return Futures.immediateFuture(Boolean.TRUE);
192 reactor.syncup(fcNodePath, first);
193 latchForNext.await();
195 mockSyncupWithEntry(second);
196 reactor.syncup(fcNodePath, second);
197 latchForFirst.countDown();
199 syncThreadPool.shutdown();
200 boolean terminated = syncThreadPool.awaitTermination(1, TimeUnit.SECONDS);
202 LOG.info("thread pool not terminated.");
203 syncThreadPool.shutdownNow();
205 Mockito.verify(delegate, Mockito.times(1)).syncup(fcNodePath, second);
208 private void mockSyncupWithEntry(final SyncupEntry entry) {
209 Mockito.when(delegate.syncup(Matchers.any(), Mockito.eq(entry)))
210 .thenReturn(Futures.immediateFuture(Boolean.TRUE));
214 public void tearDown() {
215 syncThreadPool.shutdownNow();