Ditch ListeningExecutorService 07/106807/4
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 4 Jul 2023 19:08:15 +0000 (21:08 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Wed, 5 Jul 2023 15:20:12 +0000 (17:20 +0200)
We have Futures.submit() at our disposal, hence we can use plain
Executor/ExecutorService to manage our tasks.

Change-Id: I18c26b52603dc6cca11b8feb42f8ac0c6f62bf14
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
programming/impl/src/main/java/org/opendaylight/bgpcep/programming/impl/DefaultInstructionSchedulerFactory.java
programming/impl/src/main/java/org/opendaylight/bgpcep/programming/impl/ProgrammingServiceImpl.java
programming/impl/src/test/java/org/opendaylight/bgpcep/programming/impl/MockedExecutorWrapper.java
programming/impl/src/test/java/org/opendaylight/bgpcep/programming/impl/ProgrammingServiceImplTest.java

index 3921c62967765fb0162b1923e76e1035d08d412a..c59fce7fe56fb5fb4a40afb38158130b4619f793 100644 (file)
@@ -9,9 +9,8 @@ package org.opendaylight.bgpcep.programming.impl;
 
 import static java.util.Objects.requireNonNull;
 
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import io.netty.util.Timer;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import javax.annotation.PreDestroy;
 import javax.inject.Inject;
@@ -39,7 +38,7 @@ public final class DefaultInstructionSchedulerFactory implements InstructionSche
     private final Timer timer;
     private final RpcProviderService rpcProviderRegistry;
     private final ClusterSingletonServiceProvider cssp;
-    private final ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
+    private final ExecutorService exec = Executors.newSingleThreadExecutor();
 
     @Inject
     @Activate
index a3be2d1e6b462d00ed65b3de60ffb0a0df3dfcd3..6d478acbbdc9d2fc47acc516de774e3a289cec08 100644 (file)
@@ -13,7 +13,6 @@ import com.google.common.util.concurrent.FluentFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
 import io.netty.util.Timeout;
@@ -26,6 +25,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.opendaylight.bgpcep.programming.NanotimeUtil;
@@ -80,7 +80,7 @@ final class ProgrammingServiceImpl implements ClusterSingletonService, Instructi
     private final Map<InstructionId, InstructionImpl> insns = new HashMap<>();
     private final InstanceIdentifier<InstructionsQueue> qid;
     private final NotificationPublishService notifs;
-    private final ListeningExecutorService executor;
+    private final Executor executor;
     private final DataBroker dataProvider;
     private final Timer timer;
     private final String instructionId;
@@ -154,7 +154,7 @@ final class ProgrammingServiceImpl implements ClusterSingletonService, Instructi
     }
 
     ProgrammingServiceImpl(final DataBroker dataProvider, final NotificationPublishService notifs,
-            final ListeningExecutorService executor, final RpcProviderService rpcProviderRegistry,
+            final Executor executor, final RpcProviderService rpcProviderRegistry,
             final ClusterSingletonServiceProvider cssp, final Timer timer, final String instructionId) {
         this.dataProvider = requireNonNull(dataProvider);
         this.instructionId = requireNonNull(instructionId);
@@ -197,12 +197,12 @@ final class ProgrammingServiceImpl implements ClusterSingletonService, Instructi
 
     @Override
     public ListenableFuture<RpcResult<CancelInstructionOutput>> cancelInstruction(final CancelInstructionInput input) {
-        return executor.submit(() -> realCancelInstruction(input));
+        return Futures.submit(() -> realCancelInstruction(input), executor);
     }
 
     @Override
     public ListenableFuture<RpcResult<CleanInstructionsOutput>> cleanInstructions(final CleanInstructionsInput input) {
-        return executor.submit(() -> realCleanInstructions(input));
+        return Futures.submit(() -> realCleanInstructions(input), executor);
     }
 
     private synchronized RpcResult<CancelInstructionOutput> realCancelInstruction(final CancelInstructionInput input) {
@@ -363,7 +363,7 @@ final class ProgrammingServiceImpl implements ClusterSingletonService, Instructi
          * This task should be ingress-weighed, so we reinsert it into the
          * same execution service.
          */
-        executor.submit(() -> tryScheduleInstruction(instruction));
+        executor.execute(() -> tryScheduleInstruction(instruction));
 
         return ret;
     }
index bbcff5388828108be9e345c7c72ba692bb4f3c54..a622e918d24df4d6d4f8fde20312efd8d8e4bff2 100644 (file)
@@ -8,49 +8,26 @@
 package org.opendaylight.bgpcep.programming.impl;
 
 import static org.junit.Assert.assertEquals;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
 
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.Callable;
-import org.mockito.stubbing.Answer;
+import java.util.concurrent.Executor;
 
 final class MockedExecutorWrapper {
-
-    private final List<Object> submittedTasksToExecutor;
+    private final List<Runnable> submittedTasksToExecutor;
 
     MockedExecutorWrapper() {
-        this.submittedTasksToExecutor = new ArrayList<>();
+        submittedTasksToExecutor = new ArrayList<>();
     }
 
-    ListeningExecutorService getMockedExecutor() {
-        final ListeningExecutorService mockedExecutor = mock(ListeningExecutorService.class);
-        final Answer<ListenableFuture<?>> submitAnswer = invocation -> {
-            final Object task = invocation.getArguments()[0];
-            this.submittedTasksToExecutor.add(task);
-
-            Object result = null;
-            if (task instanceof Runnable) {
-                ((Runnable) task).run();
-            } else if (task instanceof Callable) {
-                result = ((Callable<?>) task).call();
-            }
-
-            final ListenableFuture<?> mockedFuture = mock(ListenableFuture.class);
-            doReturn(result).when(mockedFuture).get();
-            return mockedFuture;
+    Executor getMockedExecutor() {
+        return runnable -> {
+            submittedTasksToExecutor.add(runnable);
+            runnable.run();
         };
-        doAnswer(submitAnswer).when(mockedExecutor).submit(any(Runnable.class));
-        doAnswer(submitAnswer).when(mockedExecutor).submit(any(Callable.class));
-        return mockedExecutor;
     }
 
     void assertSubmittedTasksSize(final int taskCount) {
-        assertEquals(taskCount, this.submittedTasksToExecutor.size());
+        assertEquals(taskCount, submittedTasksToExecutor.size());
     }
 }
index bb4f71e2563626ed16433237996f0f5ac5270335..b22c136c6a3268bb4040264aa7148d79f3b6fb6d 100644 (file)
@@ -10,8 +10,8 @@ package org.opendaylight.bgpcep.programming.impl;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.opendaylight.protocol.util.CheckTestUtil.checkNotPresentOperational;
@@ -113,15 +113,10 @@ public class ProgrammingServiceImplTest extends AbstractProgrammingTest {
 
     @Test
     public void testScheduleDependingInstructionToFail() throws Exception {
-        try {
-            testedProgrammingService.scheduleInstruction(getMockedSubmitInstructionInput("mockedSubmit",
-                    "dep1"));
-        } catch (final SchedulerException e) {
-            assertThat(e.getMessage(), containsString("Unknown dependency ID"));
-            mockedNotificationServiceWrapper.assertNotificationsCount(0);
-            return;
-        }
-        fail("Instruction schedule should fail on unresolved dependencies");
+        final var insn = getMockedSubmitInstructionInput("mockedSubmit", "dep1");
+        final var ex = assertThrows(SchedulerException.class, () -> testedProgrammingService.scheduleInstruction(insn));
+        assertThat(ex.getMessage(), containsString("Unknown dependency ID"));
+        mockedNotificationServiceWrapper.assertNotificationsCount(0);
     }
 
     @Test