2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.applications.frsync.impl;
10 import com.google.common.util.concurrent.Futures;
11 import com.google.common.util.concurrent.ListenableFuture;
12 import com.google.common.util.concurrent.ListeningExecutorService;
13 import com.google.common.util.concurrent.MoreExecutors;
14 import com.google.common.util.concurrent.ThreadFactoryBuilder;
15 import java.util.ArrayList;
16 import java.util.List;
17 import java.util.concurrent.CountDownLatch;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.TimeUnit;
21 import org.junit.After;
22 import org.junit.Before;
23 import org.junit.Test;
24 import org.junit.runner.RunWith;
25 import org.mockito.ArgumentMatchers;
26 import org.mockito.InOrder;
27 import org.mockito.Mock;
28 import org.mockito.Mockito;
29 import org.mockito.junit.MockitoJUnitRunner;
30 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
31 import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
32 import org.opendaylight.openflowplugin.applications.frsync.util.SyncupEntry;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
38 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
43 * Test for {@link SyncReactorFutureZipDecorator}.
45 @RunWith(MockitoJUnitRunner.class)
46 public class SyncReactorFutureZipDecoratorTest {
48 private static final Logger LOG = LoggerFactory.getLogger(SyncReactorFutureZipDecoratorTest.class);
49 private static final NodeId NODE_ID = new NodeId("testNode");
50 private SyncReactorFutureZipDecorator reactor;
51 private InstanceIdentifier<FlowCapableNode> fcNodePath;
52 private ListeningExecutorService syncThreadPool;
53 private final LogicalDatastoreType configDS = LogicalDatastoreType.CONFIGURATION;
54 private final LogicalDatastoreType operationalDS = LogicalDatastoreType.OPERATIONAL;
57 private SyncReactor delegate;
59 private SyncupEntry syncupEntry;
63 final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
65 .setNameFormat("frsync-test-%d")
66 .setUncaughtExceptionHandler((thread, ex) -> LOG.error("Uncaught exception {}", thread, ex))
68 syncThreadPool = MoreExecutors.listeningDecorator(executorService);
69 reactor = new SyncReactorFutureZipDecorator(delegate, syncThreadPool);
70 fcNodePath = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(NODE_ID))
71 .augmentation(FlowCapableNode.class);
75 public void testSyncupWithOptimizedConfigDeltaCompression() throws Exception {
76 final FlowCapableNode dataBefore = Mockito.mock(FlowCapableNode.class);
77 final FlowCapableNode dataAfter = Mockito.mock(FlowCapableNode.class);
78 final FlowCapableNode dataAfter2 = Mockito.mock(FlowCapableNode.class);
79 final CountDownLatch latchForFirst = new CountDownLatch(1);
80 final CountDownLatch latchForNext = new CountDownLatch(1);
82 final SyncupEntry first = new SyncupEntry(dataBefore, configDS, null, operationalDS);
83 final SyncupEntry second = new SyncupEntry(dataAfter, configDS, dataBefore, configDS);
84 final SyncupEntry third = new SyncupEntry(null, configDS, dataAfter, configDS);
85 final SyncupEntry fourth = new SyncupEntry(dataAfter2, configDS, null, configDS);
86 final SyncupEntry zipped = new SyncupEntry(dataAfter2, configDS, dataBefore, configDS);
87 final List<ListenableFuture<Boolean>> allResults = new ArrayList<>();
89 Mockito.when(delegate.syncup(ArgumentMatchers.<InstanceIdentifier<FlowCapableNode>>any(), Mockito.eq(first)))
90 .thenAnswer(invocationOnMock -> {
91 LOG.info("unlocking next configs");
92 latchForNext.countDown();
93 latchForFirst.await();
94 LOG.info("unlocking first delegate");
95 return Futures.immediateFuture(Boolean.TRUE);
98 allResults.add(reactor.syncup(fcNodePath, first));
101 mockSyncupWithEntry(second);
102 allResults.add(reactor.syncup(fcNodePath, second));
103 mockSyncupWithEntry(third);
104 allResults.add(reactor.syncup(fcNodePath, third));
105 mockSyncupWithEntry(fourth);
106 allResults.add(reactor.syncup(fcNodePath, fourth));
107 latchForFirst.countDown();
109 Futures.allAsList(allResults).get(1, TimeUnit.SECONDS);
110 LOG.info("all configs done");
112 syncThreadPool.shutdown();
113 boolean terminated = syncThreadPool.awaitTermination(1, TimeUnit.SECONDS);
115 LOG.info("thread pool not terminated.");
116 syncThreadPool.shutdownNow();
118 final InOrder inOrder = Mockito.inOrder(delegate);
119 inOrder.verify(delegate).syncup(fcNodePath, first);
120 inOrder.verify(delegate).syncup(fcNodePath, zipped);
121 inOrder.verifyNoMoreInteractions();
125 public void testSyncupConfigEmptyQueue() throws Exception {
126 final FlowCapableNode dataBefore = Mockito.mock(FlowCapableNode.class);
127 final FlowCapableNode dataAfter = Mockito.mock(FlowCapableNode.class);
128 final CountDownLatch latchForNext = new CountDownLatch(1);
130 final SyncupEntry first = new SyncupEntry(dataBefore, configDS, null, configDS);
131 final SyncupEntry second = new SyncupEntry(dataAfter, configDS, dataBefore, configDS);
133 Mockito.when(delegate.syncup(ArgumentMatchers.<InstanceIdentifier<FlowCapableNode>>any(), Mockito.eq(first)))
134 .thenAnswer(invocationOnMock -> {
135 LOG.info("unlocking next config");
136 latchForNext.countDown();
137 return Futures.immediateFuture(Boolean.TRUE);
140 reactor.syncup(fcNodePath, first);
141 latchForNext.await();
142 mockSyncupWithEntry(second);
143 reactor.syncup(fcNodePath, second);
145 boolean terminated = syncThreadPool.awaitTermination(1, TimeUnit.SECONDS);
147 LOG.info("thread pool not terminated.");
148 syncThreadPool.shutdownNow();
150 final InOrder inOrder = Mockito.inOrder(delegate);
151 inOrder.verify(delegate).syncup(fcNodePath, first);
152 inOrder.verify(delegate).syncup(fcNodePath, second);
153 inOrder.verifyNoMoreInteractions();
157 public void testSyncupRewriteZipEntryWithOperationalDelta() throws Exception {
158 final FlowCapableNode configBefore = Mockito.mock(FlowCapableNode.class);
159 final FlowCapableNode configAfter = Mockito.mock(FlowCapableNode.class);
160 final FlowCapableNode configActual = Mockito.mock(FlowCapableNode.class);
161 final FlowCapableNode freshOperational = Mockito.mock(FlowCapableNode.class);
162 final CountDownLatch latchForFirst = new CountDownLatch(1);
163 final CountDownLatch latchForNext = new CountDownLatch(1);
165 final SyncupEntry first = new SyncupEntry(configAfter, configDS, configBefore, configDS);
166 final SyncupEntry second = new SyncupEntry(configActual, configDS, freshOperational, operationalDS);
168 Mockito.when(delegate.syncup(ArgumentMatchers.<InstanceIdentifier<FlowCapableNode>>any(), Mockito.eq(first)))
169 .thenAnswer(invocationOnMock -> {
170 LOG.info("unlocking for fresh operational");
171 latchForNext.countDown();
172 latchForFirst.await();
173 LOG.info("unlocking first delegate");
174 return Futures.immediateFuture(Boolean.TRUE);
177 reactor.syncup(fcNodePath, first);
178 latchForNext.await();
180 mockSyncupWithEntry(second);
181 reactor.syncup(fcNodePath, second);
182 latchForFirst.countDown();
184 syncThreadPool.shutdown();
185 boolean terminated = syncThreadPool.awaitTermination(1, TimeUnit.SECONDS);
187 LOG.info("thread pool not terminated.");
188 syncThreadPool.shutdownNow();
190 Mockito.verify(delegate, Mockito.times(1)).syncup(fcNodePath, second);
193 private void mockSyncupWithEntry(final SyncupEntry entry) {
194 Mockito.when(delegate.syncup(ArgumentMatchers.any(), Mockito.eq(entry)))
195 .thenReturn(Futures.immediateFuture(Boolean.TRUE));
199 public void tearDown() {
200 syncThreadPool.shutdownNow();