Ditch ListeningExecutorService
[bgpcep.git] / programming / impl / src / test / java / org / opendaylight / bgpcep / programming / impl / ProgrammingServiceImplTest.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.bgpcep.programming.impl;
9
10 import static org.hamcrest.CoreMatchers.containsString;
11 import static org.hamcrest.MatcherAssert.assertThat;
12 import static org.junit.Assert.assertEquals;
13 import static org.junit.Assert.assertThrows;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.doReturn;
16 import static org.mockito.Mockito.mock;
17 import static org.opendaylight.protocol.util.CheckTestUtil.checkNotPresentOperational;
18 import static org.opendaylight.protocol.util.CheckTestUtil.checkPresentOperational;
19
20 import com.google.common.util.concurrent.ListenableFuture;
21 import io.netty.util.HashedWheelTimer;
22 import io.netty.util.Timer;
23 import java.math.BigInteger;
24 import java.util.Arrays;
25 import java.util.Optional;
26 import java.util.Set;
27 import java.util.stream.Collectors;
28 import org.junit.After;
29 import org.junit.Before;
30 import org.junit.Test;
31 import org.junit.runner.RunWith;
32 import org.mockito.junit.MockitoJUnitRunner;
33 import org.opendaylight.bgpcep.programming.NanotimeUtil;
34 import org.opendaylight.bgpcep.programming.spi.Instruction;
35 import org.opendaylight.bgpcep.programming.spi.SchedulerException;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.CancelInstructionInput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.CancelInstructionInputBuilder;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.CleanInstructionsInput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.CleanInstructionsInputBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.CleanInstructionsOutput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.InstructionId;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.InstructionStatus;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.InstructionsQueue;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.InstructionsQueueKey;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.Nanotime;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.SubmitInstructionInput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.instruction.queue.InstructionKey;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.instruction.status.changed.Details;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.instruction.status.changed.DetailsBuilder;
50 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
51 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
52 import org.opendaylight.yangtools.yang.common.RpcResult;
53 import org.opendaylight.yangtools.yang.common.Uint64;
54
55 @RunWith(MockitoJUnitRunner.StrictStubs.class)
56 public class ProgrammingServiceImplTest extends AbstractProgrammingTest {
57
58     private static final int INSTRUCTION_DEADLINE_OFFSET_IN_SECONDS = 3;
59     private static final String INSTRUCTIONS_QUEUE_KEY = "test-instraction-queue";
60     private final Timer timer = new HashedWheelTimer();
61     private MockedExecutorWrapper mockedExecutorWrapper;
62     private MockedNotificationServiceWrapper mockedNotificationServiceWrapper;
63     private ProgrammingServiceImpl testedProgrammingService;
64
65     @Before
66     @Override
67     public void setUp() throws Exception {
68         super.setUp();
69         mockedExecutorWrapper = new MockedExecutorWrapper();
70         mockedNotificationServiceWrapper = new MockedNotificationServiceWrapper();
71
72         testedProgrammingService = new ProgrammingServiceImpl(getDataBroker(),
73                 mockedNotificationServiceWrapper.getMockedNotificationService(),
74                 mockedExecutorWrapper.getMockedExecutor(), rpcRegistry, cssp, timer,
75                 INSTRUCTIONS_QUEUE_KEY);
76         singletonService.instantiateServiceInstance();
77     }
78
79     @After
80     public void tearDown() throws Exception {
81         singletonService.closeServiceInstance();
82         testedProgrammingService.close();
83     }
84
85     @Test
86     public void testScheduleInstruction() throws Exception {
87         final SubmitInstructionInput mockedSubmit = getMockedSubmitInstructionInput("mockedSubmit");
88         testedProgrammingService.scheduleInstruction(mockedSubmit);
89
90         checkPresentOperational(getDataBroker(), buildInstructionIID(mockedSubmit.getId()));
91
92         // assert Schedule to executor
93         mockedExecutorWrapper.assertSubmittedTasksSize(1);
94
95         // assert Notification
96         mockedNotificationServiceWrapper.assertNotificationsCount(1);
97         mockedNotificationServiceWrapper.assertInstructionStatusChangedNotification(0, mockedSubmit.getId(),
98                 InstructionStatus.Scheduled);
99     }
100
101     @Test
102     public void testScheduleDependingInstruction() throws Exception {
103         testedProgrammingService.scheduleInstruction(getMockedSubmitInstructionInput("mockedSubmit1"));
104         final SubmitInstructionInput mockedSubmit2 = getMockedSubmitInstructionInput("mockedSubmit2",
105                 "mockedSubmit1");
106         testedProgrammingService.scheduleInstruction(mockedSubmit2);
107
108         mockedExecutorWrapper.assertSubmittedTasksSize(2);
109
110         // First is in state scheduled, so second could not be scheduled yet
111         mockedNotificationServiceWrapper.assertNotificationsCount(1);
112     }
113
114     @Test
115     public void testScheduleDependingInstructionToFail() throws Exception {
116         final var insn = getMockedSubmitInstructionInput("mockedSubmit", "dep1");
117         final var ex = assertThrows(SchedulerException.class, () -> testedProgrammingService.scheduleInstruction(insn));
118         assertThat(ex.getMessage(), containsString("Unknown dependency ID"));
119         mockedNotificationServiceWrapper.assertNotificationsCount(0);
120     }
121
122     @Test
123     public void testCancelInstruction() throws Exception {
124         final SubmitInstructionInput mockedSubmit = getMockedSubmitInstructionInput("mockedSubmit");
125         testedProgrammingService.scheduleInstruction(mockedSubmit);
126         checkPresentOperational(getDataBroker(), buildInstructionIID(mockedSubmit.getId()));
127
128         final CancelInstructionInput mockedCancel = getCancelInstruction("mockedSubmit");
129         testedProgrammingService.cancelInstruction(mockedCancel);
130
131         checkPresentOperational(getDataBroker(), buildInstructionIID(mockedSubmit.getId()));
132         mockedExecutorWrapper.assertSubmittedTasksSize(2);
133
134         mockedNotificationServiceWrapper.assertNotificationsCount(2);
135         mockedNotificationServiceWrapper.assertInstructionStatusChangedNotification(1, mockedSubmit.getId(),
136                 InstructionStatus.Cancelled);
137     }
138
139     @Test
140     public void testCancelDependantInstruction() throws Exception {
141         final SubmitInstructionInput mockedSubmit1 = getMockedSubmitInstructionInput("mockedSubmit1");
142         testedProgrammingService.scheduleInstruction(mockedSubmit1);
143         final SubmitInstructionInput mockedSubmit2 = getMockedSubmitInstructionInput("mockedSubmit2",
144                 "mockedSubmit1");
145         testedProgrammingService.scheduleInstruction(mockedSubmit2);
146         final SubmitInstructionInput mockedSubmit3 = getMockedSubmitInstructionInput("mockedSubmit3",
147                 "mockedSubmit1", "mockedSubmit2");
148         testedProgrammingService.scheduleInstruction(mockedSubmit3);
149
150         testedProgrammingService.cancelInstruction(getCancelInstruction("mockedSubmit1"));
151
152         mockedNotificationServiceWrapper
153                 .assertNotificationsCount(1 /*First Scheduled*/ + 3 /*First and all dependencies cancelled*/);
154         mockedNotificationServiceWrapper.assertInstructionStatusChangedNotification(0, mockedSubmit1.getId(),
155                 InstructionStatus.Scheduled);
156         mockedNotificationServiceWrapper.assertInstructionStatusChangedNotification(1, mockedSubmit1.getId(),
157                 InstructionStatus.Cancelled);
158         mockedNotificationServiceWrapper.assertInstructionStatusChangedNotification(2, mockedSubmit2.getId(),
159                 InstructionStatus.Cancelled);
160         mockedNotificationServiceWrapper.assertInstructionStatusChangedNotification(3, mockedSubmit3.getId(),
161                 InstructionStatus.Cancelled);
162
163         checkPresentOperational(getDataBroker(), buildInstructionIID(mockedSubmit1.getId()));
164         checkPresentOperational(getDataBroker(), buildInstructionIID(mockedSubmit2.getId()));
165         checkPresentOperational(getDataBroker(), buildInstructionIID(mockedSubmit3.getId()));
166     }
167
168     @Test
169     public void testCleanInstructions() throws Exception {
170         final SubmitInstructionInput mockedSubmit1 = getMockedSubmitInstructionInput("mockedSubmit1");
171         testedProgrammingService.scheduleInstruction(mockedSubmit1);
172         final SubmitInstructionInput mockedSubmit2 = getMockedSubmitInstructionInput("mockedSubmit2",
173                 "mockedSubmit1");
174         testedProgrammingService.scheduleInstruction(mockedSubmit2);
175
176         final CleanInstructionsInputBuilder cleanInstructionsInputBuilder = new CleanInstructionsInputBuilder();
177         final CleanInstructionsInput cleanInstructionsInput = cleanInstructionsInputBuilder.setId(
178                 Set.of(mockedSubmit1.getId(), mockedSubmit2.getId())).build();
179
180         ListenableFuture<RpcResult<CleanInstructionsOutput>> cleanedInstructionOutput = testedProgrammingService
181                 .cleanInstructions(cleanInstructionsInput);
182
183         assertCleanInstructionOutput(cleanedInstructionOutput, 2);
184
185         testedProgrammingService.cancelInstruction(getCancelInstruction("mockedSubmit1"));
186
187         cleanedInstructionOutput = testedProgrammingService.cleanInstructions(cleanInstructionsInput);
188         assertCleanInstructionOutput(cleanedInstructionOutput, 0);
189
190         checkNotPresentOperational(getDataBroker(), buildInstructionIID(mockedSubmit1.getId()));
191         checkNotPresentOperational(getDataBroker(), buildInstructionIID(mockedSubmit2.getId()));
192     }
193
194     private static void assertCleanInstructionOutput(final ListenableFuture<RpcResult<CleanInstructionsOutput>>
195             cleanedInstructionOutput, final int unflushedCount) throws InterruptedException,
196             java.util.concurrent.ExecutionException {
197         if (unflushedCount == 0) {
198             final Set<InstructionId> unflushed = cleanedInstructionOutput.get().getResult().getUnflushed();
199             assertTrue(unflushed == null || unflushed.isEmpty());
200         } else {
201             assertEquals(unflushedCount, cleanedInstructionOutput.get().getResult().getUnflushed().size());
202         }
203         assertEquals(0, cleanedInstructionOutput.get().getErrors().size());
204     }
205
206     @Test
207     public void testCloseProgrammingService() throws Exception {
208         final SubmitInstructionInput mockedSubmit1 = getMockedSubmitInstructionInput("mockedSubmit1");
209         testedProgrammingService.scheduleInstruction(mockedSubmit1);
210         final SubmitInstructionInput mockedSubmit2 = getMockedSubmitInstructionInput("mockedSubmit2",
211                 "mockedSubmit1");
212         testedProgrammingService.scheduleInstruction(mockedSubmit2);
213
214         testedProgrammingService.close();
215
216         mockedNotificationServiceWrapper
217                 .assertNotificationsCount(1/* First scheduled */ + 2/* Both cancelled at close */);
218     }
219
220     @Test(timeout = 30 * 1000)
221     public void testTimeoutWhileScheduledTransaction() throws Exception {
222         final BigInteger deadlineOffset = BigInteger.valueOf(
223                 1000L * 1000 * 1000 * INSTRUCTION_DEADLINE_OFFSET_IN_SECONDS /* seconds */);
224         final Nanotime current = NanotimeUtil.currentTime();
225         final Nanotime deadlineNano = new Nanotime(Uint64.valueOf(current.getValue().toJava().add(deadlineOffset)));
226
227         final Optional<Nanotime> deadline = Optional.of(deadlineNano);
228         final SubmitInstructionInput mockedSubmit1 = getMockedSubmitInstructionInput("mockedSubmit1", deadline);
229         final ListenableFuture<Instruction> future = testedProgrammingService.scheduleInstruction(mockedSubmit1);
230
231         mockedNotificationServiceWrapper.assertNotificationsCount(1);
232
233         future.get();
234
235         mockedNotificationServiceWrapper.assertNotificationsCount(2);
236         mockedNotificationServiceWrapper.assertInstructionStatusChangedNotification(1, mockedSubmit1.getId(),
237                 InstructionStatus.Cancelled);
238     }
239
240     @Test(timeout = 30 * 1000)
241     public void testTimeoutWhileSuccessfulTransaction() throws Exception {
242         final BigInteger deadlineOffset = BigInteger.valueOf(
243                 1000L * 1000 * 1000 * INSTRUCTION_DEADLINE_OFFSET_IN_SECONDS /* seconds */);
244         final Nanotime current = NanotimeUtil.currentTime();
245         final Nanotime deadlineNano = new Nanotime(Uint64.valueOf(current.getValue().toJava().add(deadlineOffset)));
246
247         final Optional<Nanotime> deadline = Optional.of(deadlineNano);
248         final SubmitInstructionInput mockedSubmit1 = getMockedSubmitInstructionInput("mockedSubmit1", deadline);
249         final ListenableFuture<Instruction> future = testedProgrammingService.scheduleInstruction(mockedSubmit1);
250
251         mockedNotificationServiceWrapper.assertNotificationsCount(1);
252
253         final Instruction i = future.get();
254         i.checkedExecutionStart();
255         i.executionCompleted(InstructionStatus.Successful, getDetails());
256
257         mockedNotificationServiceWrapper.assertNotificationsCount(3);
258         mockedNotificationServiceWrapper.assertInstructionStatusChangedNotification(1, mockedSubmit1.getId(),
259                 InstructionStatus.Executing);
260         mockedNotificationServiceWrapper.assertInstructionStatusChangedNotification(2, mockedSubmit1.getId(),
261                 InstructionStatus.Successful);
262         // Timeout in success should not do anything
263     }
264
265     @Test(timeout = 30 * 1000)
266     public void testTimeoutWhileExecutingWithDependenciesTransaction() throws Exception {
267         final BigInteger deadlineOffset = BigInteger.valueOf(
268                 1000L * 1000 * 1000 * INSTRUCTION_DEADLINE_OFFSET_IN_SECONDS /* seconds */);
269         final Nanotime current = NanotimeUtil.currentTime();
270         final Nanotime deadlineNano = new Nanotime(Uint64.valueOf(current.getValue().toJava().add(deadlineOffset)));
271
272         final Optional<Nanotime> deadline = Optional.of(deadlineNano);
273         final SubmitInstructionInput mockedSubmit1 = getMockedSubmitInstructionInput("mockedSubmit1", deadline);
274         final ListenableFuture<Instruction> future = testedProgrammingService.scheduleInstruction(mockedSubmit1);
275
276         final SubmitInstructionInput mockedSubmit2 = getMockedSubmitInstructionInput("mockedSubmit2",
277                 "mockedSubmit1");
278         testedProgrammingService.scheduleInstruction(mockedSubmit2);
279
280         mockedNotificationServiceWrapper.assertNotificationsCount(1);
281
282         final Instruction i = future.get();
283         i.checkedExecutionStart();
284
285         mockedNotificationServiceWrapper.assertNotificationsCount(4);
286         mockedNotificationServiceWrapper.assertInstructionStatusChangedNotification(1, mockedSubmit1.getId(),
287                 InstructionStatus.Executing);
288         mockedNotificationServiceWrapper.assertInstructionStatusChangedNotification(2, mockedSubmit1.getId(),
289                 InstructionStatus.Unknown);
290         mockedNotificationServiceWrapper.assertInstructionStatusChangedNotification(3, mockedSubmit2.getId(),
291                 InstructionStatus.Cancelled);
292     }
293
294     // TODO test deadline with state Queued
295
296     @Test
297     public void testSuccessExecutingWithDependenciesTransaction() throws Exception {
298         final SubmitInstructionInput mockedSubmit1 = getMockedSubmitInstructionInput("mockedSubmit1");
299         final ListenableFuture<Instruction> future = testedProgrammingService.scheduleInstruction(mockedSubmit1);
300
301         final SubmitInstructionInput mockedSubmit2 =
302                 getMockedSubmitInstructionInput("mockedSubmit2", "mockedSubmit1");
303         final ListenableFuture<Instruction> future2 = testedProgrammingService.scheduleInstruction(mockedSubmit2);
304
305         mockedNotificationServiceWrapper.assertNotificationsCount(1);
306
307         Instruction instruction = future.get();
308         instruction.checkedExecutionStart();
309         instruction.executionCompleted(InstructionStatus.Successful, getDetails());
310
311         mockedNotificationServiceWrapper.assertNotificationsCount(4);
312         mockedNotificationServiceWrapper.assertInstructionStatusChangedNotification(1, mockedSubmit1.getId(),
313                 InstructionStatus.Executing);
314         mockedNotificationServiceWrapper.assertInstructionStatusChangedNotification(2, mockedSubmit1.getId(),
315                 InstructionStatus.Successful);
316         mockedNotificationServiceWrapper.assertInstructionStatusChangedNotification(3, mockedSubmit2.getId(),
317                 InstructionStatus.Scheduled);
318
319         instruction = future2.get();
320         instruction.checkedExecutionStart();
321         instruction.executionCompleted(InstructionStatus.Successful, getDetails());
322
323         mockedNotificationServiceWrapper.assertNotificationsCount(6);
324         mockedNotificationServiceWrapper.assertInstructionStatusChangedNotification(4, mockedSubmit2.getId(),
325                 InstructionStatus.Executing);
326         mockedNotificationServiceWrapper.assertInstructionStatusChangedNotification(5, mockedSubmit2.getId(),
327                 InstructionStatus.Successful);
328     }
329
330     private static Details getDetails() {
331         return new DetailsBuilder().build();
332     }
333
334     private static SubmitInstructionInput getMockedSubmitInstructionInput(final String id,
335             final String... dependencyIds) {
336         return getMockedSubmitInstructionInput(id, Optional.empty(), dependencyIds);
337     }
338
339     private static SubmitInstructionInput getMockedSubmitInstructionInput(final String id,
340             final Optional<Nanotime> deadline, final String... dependencyIds) {
341         final SubmitInstructionInput mockedSubmitInstruction = mock(SubmitInstructionInput.class);
342
343         final Set<InstructionId> dependencies = Arrays.stream(dependencyIds)
344             .map(InstructionId::new)
345             .collect(Collectors.toUnmodifiableSet());
346
347         doReturn(dependencies).when(mockedSubmitInstruction).getPreconditions();
348         doReturn(new InstructionId(id)).when(mockedSubmitInstruction).getId();
349         doReturn(deadline.orElseGet(() -> new Nanotime(Uint64.valueOf(Long.MAX_VALUE))))
350                 .when(mockedSubmitInstruction).getDeadline();
351         return mockedSubmitInstruction;
352     }
353
354     private static CancelInstructionInput getCancelInstruction(final String instructionId) {
355         final CancelInstructionInputBuilder builder = new CancelInstructionInputBuilder();
356         builder.setId(new InstructionId(instructionId));
357         return builder.build();
358     }
359
360     private static KeyedInstanceIdentifier<org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming
361             .rev150720.instruction.queue.Instruction, InstructionKey> buildInstructionIID(final InstructionId id) {
362         return InstanceIdentifier.builder(InstructionsQueue.class, new InstructionsQueueKey(INSTRUCTIONS_QUEUE_KEY))
363                 .build().child(org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720
364                         .instruction.queue.Instruction.class, new InstructionKey(id));
365     }
366 }