2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.applications.frsync.impl;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import com.google.common.util.concurrent.ListeningExecutorService;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import com.google.common.util.concurrent.ThreadFactoryBuilder;
16 import java.util.ArrayList;
17 import java.util.List;
18 import java.util.concurrent.CountDownLatch;
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.TimeUnit;
22 import org.junit.After;
23 import org.junit.Before;
24 import org.junit.Test;
25 import org.junit.runner.RunWith;
26 import org.mockito.InOrder;
27 import org.mockito.Matchers;
28 import org.mockito.Mock;
29 import org.mockito.Mockito;
30 import org.mockito.invocation.InvocationOnMock;
31 import org.mockito.runners.MockitoJUnitRunner;
32 import org.mockito.stubbing.Answer;
33 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
39 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
44 * Test for {@link SyncReactorFutureZipDecorator}.
46 @RunWith(MockitoJUnitRunner.class)
47 public class SyncReactorFutureZipDecoratorTest {
49 private static final Logger LOG = LoggerFactory.getLogger(SyncReactorFutureZipDecoratorTest.class);
50 private static final NodeId NODE_ID = new NodeId("testNode");
51 private SyncReactorFutureZipDecorator reactor;
52 private InstanceIdentifier<FlowCapableNode> fcNodePath;
53 private ListeningExecutorService syncThreadPool;
56 private SyncReactorGuardDecorator delegate;
60 final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
62 .setNameFormat("frsync-test%d")
63 .setUncaughtExceptionHandler((thread, e) -> LOG.error("Uncaught exception {}", thread, e))
65 syncThreadPool = MoreExecutors.listeningDecorator(executorService);
66 reactor = new SyncReactorFutureZipDecorator(delegate, syncThreadPool);
67 fcNodePath = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(NODE_ID))
68 .augmentation(FlowCapableNode.class);
72 public void testSyncupWithOptimizedConfigDeltaCompression() throws Exception {
73 final FlowCapableNode dataBefore = Mockito.mock(FlowCapableNode.class);
74 final FlowCapableNode dataAfter = Mockito.mock(FlowCapableNode.class);
75 final FlowCapableNode dataAfter2 = Mockito.mock(FlowCapableNode.class);
76 final CountDownLatch latchForFirst = new CountDownLatch(1);
77 final CountDownLatch latchForNext = new CountDownLatch(1);
78 final LogicalDatastoreType dsType = LogicalDatastoreType.CONFIGURATION;
80 Mockito.when(delegate.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Matchers.<FlowCapableNode>any(),
81 Matchers.<FlowCapableNode>any(), Matchers.<LogicalDatastoreType>any())).thenAnswer(new Answer<ListenableFuture<Boolean>>() {
83 public ListenableFuture<Boolean> answer(final InvocationOnMock invocationOnMock) throws Throwable {
84 LOG.info("unlocking next configs");
85 latchForNext.countDown();
86 latchForFirst.await();
87 LOG.info("unlocking first delegate");
88 return Futures.immediateFuture(Boolean.TRUE);
90 }).thenReturn(Futures.immediateFuture(Boolean.TRUE));
92 final List<ListenableFuture<Boolean>> allResults = new ArrayList<>();
93 allResults.add(reactor.syncup(fcNodePath, dataBefore, null, dsType));
96 allResults.add(reactor.syncup(fcNodePath, dataAfter, dataBefore, dsType));
97 allResults.add(reactor.syncup(fcNodePath, null, dataAfter, dsType));
98 allResults.add(reactor.syncup(fcNodePath, dataAfter2, null, dsType));
99 latchForFirst.countDown();
101 Futures.allAsList(allResults).get(1, TimeUnit.SECONDS);
102 LOG.info("all configs done");
104 syncThreadPool.shutdown();
105 boolean terminated = syncThreadPool.awaitTermination(1, TimeUnit.SECONDS);
107 LOG.info("thread pool not terminated.");
108 syncThreadPool.shutdownNow();
110 final InOrder inOrder = Mockito.inOrder(delegate);
111 inOrder.verify(delegate).syncup(fcNodePath, dataBefore, null, dsType);
112 inOrder.verify(delegate).syncup(fcNodePath, dataAfter2, dataBefore, dsType);
113 inOrder.verifyNoMoreInteractions();
117 public void testSyncupConfigEmptyQueue() throws Exception {
118 final FlowCapableNode dataBefore = Mockito.mock(FlowCapableNode.class);
119 final FlowCapableNode dataAfter = Mockito.mock(FlowCapableNode.class);
120 final CountDownLatch latchForNext = new CountDownLatch(1);
121 final LogicalDatastoreType dsType = LogicalDatastoreType.CONFIGURATION;
123 Mockito.when(delegate.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Matchers.<FlowCapableNode>any(),
124 Matchers.<FlowCapableNode>any(), Matchers.<LogicalDatastoreType>any())).thenAnswer(new Answer<ListenableFuture<Boolean>>() {
126 public ListenableFuture<Boolean> answer(final InvocationOnMock invocationOnMock) throws Throwable {
127 LOG.info("unlocking next config");
128 latchForNext.countDown();
129 return Futures.immediateFuture(Boolean.TRUE);
131 }).thenReturn(Futures.immediateFuture(Boolean.TRUE));
133 reactor.syncup(fcNodePath, dataBefore, null, dsType);
134 latchForNext.await();
135 reactor.syncup(fcNodePath, dataAfter, dataBefore, dsType);
137 boolean terminated = syncThreadPool.awaitTermination(1, TimeUnit.SECONDS);
139 LOG.info("thread pool not terminated.");
140 syncThreadPool.shutdownNow();
142 final InOrder inOrder = Mockito.inOrder(delegate);
143 inOrder.verify(delegate).syncup(fcNodePath, dataBefore, null, dsType);
144 inOrder.verify(delegate).syncup(fcNodePath, dataAfter, dataBefore, dsType);
145 inOrder.verifyNoMoreInteractions();
150 public void testSyncupRewriteZipEntryWithOperationalDelta() throws Exception {
151 final FlowCapableNode configBefore = Mockito.mock(FlowCapableNode.class);
152 final FlowCapableNode configAfter = Mockito.mock(FlowCapableNode.class);
153 final FlowCapableNode configActual = Mockito.mock(FlowCapableNode.class);
154 final FlowCapableNode freshOperational = Mockito.mock(FlowCapableNode.class);
155 final CountDownLatch latchForFirst = new CountDownLatch(1);
156 final CountDownLatch latchForNext = new CountDownLatch(1);
158 Mockito.when(delegate.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Matchers.<FlowCapableNode>any(),
159 Matchers.<FlowCapableNode>any(), Matchers.<LogicalDatastoreType>any())).thenAnswer(new Answer<ListenableFuture<Boolean>>() {
161 public ListenableFuture<Boolean> answer(final InvocationOnMock invocationOnMock) throws Throwable {
162 LOG.info("unlocking for fresh operational");
163 latchForNext.countDown();
164 latchForFirst.await();
165 LOG.info("unlocking first delegate");
166 return Futures.immediateFuture(Boolean.TRUE);
168 }).thenReturn(Futures.immediateFuture(Boolean.TRUE));
170 reactor.syncup(fcNodePath, configAfter, configBefore, LogicalDatastoreType.CONFIGURATION);
171 latchForNext.await();
173 reactor.syncup(fcNodePath, configActual, freshOperational, LogicalDatastoreType.OPERATIONAL);
174 latchForFirst.countDown();
176 syncThreadPool.shutdown();
177 boolean terminated = syncThreadPool.awaitTermination(1, TimeUnit.SECONDS);
179 LOG.info("thread pool not terminated.");
180 syncThreadPool.shutdownNow();
182 Mockito.verify(delegate, Mockito.times(1)).syncup(fcNodePath, configActual, freshOperational, LogicalDatastoreType.OPERATIONAL);
186 public void tearDown() {
187 syncThreadPool.shutdownNow();