Ditch blueprint from frm-sync 68/110168/2
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 7 Feb 2024 07:44:42 +0000 (08:44 +0100)
committerRobert Varga <nite@hq.sk>
Wed, 7 Feb 2024 08:28:10 +0000 (08:28 +0000)
forwarding-rules-sync has an extremely simplistic blueprint, replace it
with Declarative Services wiring.

JIRA: OPNFLWPLUG-1112
Change-Id: I35d9add3e1b861cdd59000ed9ba0e395f706e6a6
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
applications/forwardingrules-sync/pom.xml
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/ForwardingRulesSyncProvider.java
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureDecorator.java
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureZipDecorator.java
applications/forwardingrules-sync/src/main/resources/OSGI-INF/blueprint/forwardingrules-sync.xml [deleted file]
applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/ForwardingRulesSyncProviderTest.java

index 6ff0d095d8902d7d5ad713a85075e997face7bb8..5266aa91f1fd585d472b3e38d77af0181b796368 100644 (file)
     <packaging>bundle</packaging>
 
     <dependencies>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.guicedee.services</groupId>
+            <artifactId>javax.inject</artifactId>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>jakarta.annotation</groupId>
+            <artifactId>jakarta.annotation-api</artifactId>
+            <optional>true</optional>
+        </dependency>
         <dependency>
             <groupId>org.opendaylight.mdsal</groupId>
             <artifactId>mdsal-binding-api</artifactId>
         </dependency>
-
+        <dependency>
+            <groupId>org.opendaylight.mdsal</groupId>
+            <artifactId>mdsal-singleton-common-api</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.opendaylight.openflowplugin.model</groupId>
             <artifactId>model-inventory</artifactId>
         </dependency>
-
         <dependency>
             <groupId>org.opendaylight.openflowplugin.model</groupId>
             <artifactId>model-flow-service</artifactId>
         </dependency>
-
-        <dependency>
-            <groupId>org.opendaylight.yangtools</groupId>
-            <artifactId>yang-common</artifactId>
-        </dependency>
-
         <dependency>
             <groupId>org.opendaylight.yangtools</groupId>
             <artifactId>concepts</artifactId>
         </dependency>
-
         <dependency>
-            <groupId>org.opendaylight.mdsal</groupId>
-            <artifactId>mdsal-singleton-common-api</artifactId>
+            <groupId>org.opendaylight.yangtools</groupId>
+            <artifactId>yang-common</artifactId>
         </dependency>
-
         <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.service.component.annotations</artifactId>
         </dependency>
 
         <dependency>
             <groupId>org.opendaylight.mdsal</groupId>
             <artifactId>mdsal-binding-test-utils</artifactId>
         </dependency>
-
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-simple</artifactId>
-            <scope>test</scope>
-        </dependency>
-
         <dependency>
             <groupId>org.sonarsource.java</groupId>
             <artifactId>sonar-jacoco-listeners</artifactId>
         </dependency>
     </dependencies>
 
-    <build>
-        <resources>
-            <resource>
-                <filtering>true</filtering>
-                <directory>src/main/resources</directory>
-            </resource>
-        </resources>
-
-        <plugins>
-            <plugin>
-                <groupId>org.apache.felix</groupId>
-                <artifactId>maven-bundle-plugin</artifactId>
-            </plugin>
-        </plugins>
-    </build>
-
     <scm>
         <connection>scm:git:ssh://git.opendaylight.org:29418/openflowplugin.git</connection>
         <developerConnection>scm:git:ssh://git.opendaylight.org:29418/openflowplugin.git</developerConnection>
         <url>https://wiki.opendaylight.org/view/OpenDaylight_OpenFlow_Plugin:Main</url>
         <tag>HEAD</tag>
     </scm>
-
 </project>
index 537d16f38b861df9d02cac421320f2332a76dddb..b4db87d40da7d66206c2020ae827de6bcc43416f 100644 (file)
@@ -9,21 +9,19 @@ package org.opendaylight.openflowplugin.applications.frsync.impl;
 
 import static java.util.Objects.requireNonNull;
 
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import javax.annotation.PreDestroy;
+import javax.inject.Inject;
+import javax.inject.Singleton;
 import org.opendaylight.mdsal.binding.api.DataBroker;
 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
 import org.opendaylight.mdsal.binding.api.RpcConsumerRegistry;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
 import org.opendaylight.openflowplugin.applications.frsync.NodeListener;
-import org.opendaylight.openflowplugin.applications.frsync.SyncPlanPushStrategy;
-import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeCachedDao;
-import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeDao;
 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeOdlDao;
 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeSnapshotDao;
 import org.opendaylight.openflowplugin.applications.frsync.impl.clustering.DeviceMastershipManager;
@@ -36,14 +34,19 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.N
 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.UpdateTable;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Top provider of forwarding rules synchronization functionality.
  */
+@Singleton
+@Component(service = { })
 public class ForwardingRulesSyncProvider implements AutoCloseable {
-
     private static final Logger LOG = LoggerFactory.getLogger(ForwardingRulesSyncProvider.class);
     private static final String FRS_EXECUTOR_PREFIX = "FRS-executor-";
 
@@ -65,11 +68,13 @@ public class ForwardingRulesSyncProvider implements AutoCloseable {
     private ListenerRegistration<?> dataTreeConfigChangeListener;
     private ListenerRegistration<?> dataTreeOperationalChangeListener;
 
-    private final ListeningExecutorService syncThreadPool;
+    private final ExecutorService syncThreadPool;
 
-    public ForwardingRulesSyncProvider(final DataBroker dataBroker,
-                                       final RpcConsumerRegistry rpcRegistry,
-                                       final ClusterSingletonServiceProvider clusterSingletonService) {
+    @Inject
+    @Activate
+    public ForwardingRulesSyncProvider(@Reference final DataBroker dataBroker,
+            @Reference final RpcConsumerRegistry rpcRegistry,
+            @Reference final ClusterSingletonServiceProvider clusterSingletonService) {
         requireNonNull(rpcRegistry, "RpcConsumerRegistry can not be null!");
         dataService = requireNonNull(dataBroker, "DataBroker can not be null!");
         this.clusterSingletonService = requireNonNull(clusterSingletonService,
@@ -83,33 +88,30 @@ public class ForwardingRulesSyncProvider implements AutoCloseable {
                 FLOW_CAPABLE_NODE_WC_PATH);
         nodeOperationalDataTreePath = DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL, NODE_WC_PATH);
 
-        final ExecutorService executorService = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
-                .setNameFormat(FRS_EXECUTOR_PREFIX + "%d")
-                .setDaemon(false)
-                .setUncaughtExceptionHandler((thread, ex) -> LOG.error("Uncaught exception {}", thread, ex))
-                .build());
-        syncThreadPool = MoreExecutors.listeningDecorator(executorService);
-    }
+        syncThreadPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
+            .setNameFormat(FRS_EXECUTOR_PREFIX + "%d")
+            .setDaemon(false)
+            .setUncaughtExceptionHandler((thread, ex) -> LOG.error("Uncaught exception {}", thread, ex))
+            .build());
 
-    public void init() {
-        final SyncPlanPushStrategy syncPlanPushStrategy = new SyncPlanPushStrategyFlatBatchImpl(processFlatBatch);
+        final var syncPlanPushStrategy = new SyncPlanPushStrategyFlatBatchImpl(processFlatBatch);
 
-        final ReconciliationRegistry reconciliationRegistry = new ReconciliationRegistry();
-        final DeviceMastershipManager deviceMastershipManager =
-                new DeviceMastershipManager(clusterSingletonService, reconciliationRegistry);
+        final var reconciliationRegistry = new ReconciliationRegistry();
+        final var deviceMastershipManager = new DeviceMastershipManager(clusterSingletonService,
+            reconciliationRegistry);
 
-        final SyncReactor syncReactorImpl = new SyncReactorImpl(syncPlanPushStrategy);
-        final SyncReactor syncReactorRetry = new SyncReactorRetryDecorator(syncReactorImpl, reconciliationRegistry);
-        final SyncReactor syncReactorGuard = new SyncReactorGuardDecorator(syncReactorRetry);
-        final SyncReactor syncReactorFutureZip = new SyncReactorFutureZipDecorator(syncReactorGuard, syncThreadPool);
+        final var syncReactorImpl = new SyncReactorImpl(syncPlanPushStrategy);
+        final var syncReactorRetry = new SyncReactorRetryDecorator(syncReactorImpl, reconciliationRegistry);
+        final var syncReactorGuard = new SyncReactorGuardDecorator(syncReactorRetry);
+        final var syncReactorFutureZip = new SyncReactorFutureZipDecorator(syncReactorGuard, syncThreadPool);
 
-        final SyncReactor reactor = new SyncReactorClusterDecorator(syncReactorFutureZip, deviceMastershipManager);
+        final var reactor = new SyncReactorClusterDecorator(syncReactorFutureZip, deviceMastershipManager);
 
-        final FlowCapableNodeSnapshotDao configSnapshot = new FlowCapableNodeSnapshotDao();
-        final FlowCapableNodeSnapshotDao operationalSnapshot = new FlowCapableNodeSnapshotDao();
-        final FlowCapableNodeDao configDao = new FlowCapableNodeCachedDao(configSnapshot,
+        final var configSnapshot = new FlowCapableNodeSnapshotDao();
+        final var operationalSnapshot = new FlowCapableNodeSnapshotDao();
+        final var configDao = new FlowCapableNodeCachedDao(configSnapshot,
                 new FlowCapableNodeOdlDao(dataService, LogicalDatastoreType.CONFIGURATION));
-        final FlowCapableNodeDao operationalDao = new FlowCapableNodeCachedDao(operationalSnapshot,
+        final var operationalDao = new FlowCapableNodeCachedDao(operationalSnapshot,
                 new FlowCapableNodeOdlDao(dataService, LogicalDatastoreType.OPERATIONAL));
 
         final NodeListener<FlowCapableNode> nodeListenerConfig =
@@ -121,10 +123,11 @@ public class ForwardingRulesSyncProvider implements AutoCloseable {
                 dataService.registerDataTreeChangeListener(nodeConfigDataTreePath, nodeListenerConfig);
         dataTreeOperationalChangeListener =
                 dataService.registerDataTreeChangeListener(nodeOperationalDataTreePath, nodeListenerOperational);
-
-        LOG.info("ForwardingRulesSync has started.");
+        LOG.info("ForwardingRulesSync started");
     }
 
+    @PreDestroy
+    @Deactivate
     @Override
     public void close() {
         if (dataTreeConfigChangeListener != null) {
@@ -138,5 +141,6 @@ public class ForwardingRulesSyncProvider implements AutoCloseable {
         }
 
         syncThreadPool.shutdown();
+        LOG.info("ForwardingRulesSync stopped");
     }
 }
index bb418b2b447a57d7d7b8b12f0240d07aafa5d64a..82bc6b1d74251dc94392c39de887c326840691cb 100644 (file)
@@ -7,15 +7,17 @@
  */
 package org.opendaylight.openflowplugin.applications.frsync.impl;
 
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
 import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
 import org.opendaylight.openflowplugin.applications.frsync.util.SyncupEntry;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -24,31 +26,32 @@ import org.slf4j.LoggerFactory;
  * Decorator for running delegate syncup in Future.
  */
 public class SyncReactorFutureDecorator implements SyncReactor {
-
     private static final Logger LOG = LoggerFactory.getLogger(SyncReactorFutureDecorator.class);
+
     private final SyncReactor delegate;
-    private final ListeningExecutorService executorService;
+    private final Executor executor;
 
-    public SyncReactorFutureDecorator(final SyncReactor delegate, final ListeningExecutorService executorService) {
-        this.delegate = delegate;
-        this.executorService = executorService;
+    public SyncReactorFutureDecorator(final SyncReactor delegate, final Executor executor) {
+        this.delegate = requireNonNull(delegate);
+        this.executor = requireNonNull(executor);
     }
 
+    @Override
     public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
-                                            final SyncupEntry syncupEntry) {
-        final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
-        return executorService.submit(() -> {
+            final SyncupEntry syncupEntry) {
+        final var nodeId = PathUtil.digNodeId(flowcapableNodePath);
+        return Futures.submit(() -> {
             try {
                 return doSyncupInFuture(flowcapableNodePath, syncupEntry).get(10000, TimeUnit.MILLISECONDS);
             } catch (TimeoutException e) {
                 LOG.warn("Syncup future timeout occured {}", nodeId.getValue());
                 return Boolean.FALSE;
             }
-        });
+        }, executor);
     }
 
     protected ListenableFuture<Boolean> doSyncupInFuture(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
-                                                         final SyncupEntry syncupEntry) {
+            final SyncupEntry syncupEntry) {
         return delegate.syncup(flowcapableNodePath, syncupEntry);
     }
 }
index 8f49b42a9dfb2ff21abd255115d1fd1f29b00a9c..5b77f5089e55dd15ed9770a0e392d1de912bb541 100644 (file)
@@ -9,9 +9,9 @@ package org.opendaylight.openflowplugin.applications.frsync.impl;
 
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.Executor;
 import java.util.concurrent.Semaphore;
 import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper;
 import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
@@ -24,18 +24,17 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
  * Enriches {@link SyncReactorFutureDecorator} with state compression.
  */
 public class SyncReactorFutureZipDecorator extends SyncReactorFutureDecorator {
-
     private final Map<InstanceIdentifier<FlowCapableNode>, SyncupEntry> compressionQueue = new HashMap<>();
     private final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper =
             new SemaphoreKeeperGuavaImpl<>(1, true);
 
-    public SyncReactorFutureZipDecorator(final SyncReactor delegate, final ListeningExecutorService executorService) {
-        super(delegate, executorService);
+    public SyncReactorFutureZipDecorator(final SyncReactor delegate, final Executor executor) {
+        super(delegate, executor);
     }
 
     @Override
     public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
-                                            final SyncupEntry syncupEntry) {
+            final SyncupEntry syncupEntry) {
         Semaphore guard = null;
         try {
             guard = semaphoreKeeper.summonGuardAndAcquire(flowcapableNodePath);
@@ -54,9 +53,8 @@ public class SyncReactorFutureZipDecorator extends SyncReactorFutureDecorator {
 
     @Override
     protected ListenableFuture<Boolean> doSyncupInFuture(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
-                                                         final SyncupEntry syncupEntry) {
-        final SyncupEntry lastCompressionState = removeLastCompressionState(flowcapableNodePath);
-
+            final SyncupEntry syncupEntry) {
+        final var lastCompressionState = removeLastCompressionState(flowcapableNodePath);
         if (lastCompressionState == null) {
             return Futures.immediateFuture(Boolean.TRUE);
         } else {
@@ -70,9 +68,8 @@ public class SyncReactorFutureZipDecorator extends SyncReactorFutureDecorator {
      * entry (config vs. operational is coming) in queue otherwise.
      */
     private boolean updateCompressionState(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
-                                           final SyncupEntry syncupEntry) {
-        final SyncupEntry previousEntry = compressionQueue.get(flowcapableNodePath);
-
+            final SyncupEntry syncupEntry) {
+        final var previousEntry = compressionQueue.get(flowcapableNodePath);
         if (previousEntry != null && syncupEntry.isOptimizedConfigDelta()) {
             updateOptimizedConfigDelta(flowcapableNodePath, syncupEntry, previousEntry);
         } else {
@@ -82,11 +79,9 @@ public class SyncReactorFutureZipDecorator extends SyncReactorFutureDecorator {
     }
 
     private void updateOptimizedConfigDelta(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
-                                            final SyncupEntry actual,
-                                            final SyncupEntry previous) {
-        final SyncupEntry updatedEntry = new SyncupEntry(actual.getAfter(), actual.getDsTypeAfter(),
-                                                         previous.getBefore(), previous.getDsTypeBefore());
-        compressionQueue.put(flowcapableNodePath, updatedEntry);
+            final SyncupEntry actual, final SyncupEntry previous) {
+        compressionQueue.put(flowcapableNodePath, new SyncupEntry(actual.getAfter(), actual.getDsTypeAfter(),
+            previous.getBefore(), previous.getDsTypeBefore()));
     }
 
     private SyncupEntry removeLastCompressionState(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath) {
diff --git a/applications/forwardingrules-sync/src/main/resources/OSGI-INF/blueprint/forwardingrules-sync.xml b/applications/forwardingrules-sync/src/main/resources/OSGI-INF/blueprint/forwardingrules-sync.xml
deleted file mode 100644 (file)
index 29ae9e3..0000000
+++ /dev/null
@@ -1,24 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
-
- This program and the accompanying materials are made available under the
- terms of the Eclipse Public License v1.0 which accompanies this distribution,
- and is available at http://www.eclipse.org/legal/epl-v10.html
--->
-<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
-           xmlns:odl="http://opendaylight.org/xmlns/blueprint/v1.0.0"
-           odl:use-default-for-reference-types="true">
-
-    <reference id="dataBroker" interface="org.opendaylight.mdsal.binding.api.DataBroker"/>
-    <reference id="rpcRegistry" interface="org.opendaylight.mdsal.binding.api.RpcConsumerRegistry"/>
-    <reference id="clusterSingletonService" interface="org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider"/>
-
-    <bean id="frSync" class="org.opendaylight.openflowplugin.applications.frsync.impl.ForwardingRulesSyncProvider"
-          init-method="init" destroy-method="close">
-        <argument ref="dataBroker"/>
-        <argument ref="rpcRegistry"/>
-        <argument ref="clusterSingletonService"/>
-    </bean>
-
-</blueprint>
index 97a34d7ffa2fe933d0828bc9ba7ed75818223815..5f953be95aad92692364916200f788e5982e0841 100644 (file)
@@ -7,28 +7,27 @@
  */
 package org.opendaylight.openflowplugin.applications.frsync.impl;
 
-import org.junit.After;
-import org.junit.Before;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.ArgumentMatchers;
 import org.mockito.Mock;
-import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.opendaylight.mdsal.binding.api.DataBroker;
 import org.opendaylight.mdsal.binding.api.RpcConsumerRegistry;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatch;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.UpdateTable;
-import org.opendaylight.yangtools.yang.binding.Rpc;
 
 /**
  * Test for {@link ForwardingRulesSyncProvider}.
  */
 @RunWith(MockitoJUnitRunner.class)
 public class ForwardingRulesSyncProviderTest {
-
-    private ForwardingRulesSyncProvider provider;
     @Mock
     private DataBroker dataBroker;
     @Mock
@@ -36,31 +35,14 @@ public class ForwardingRulesSyncProviderTest {
     @Mock
     private ClusterSingletonServiceProvider clusterSingletonService;
 
-    @Before
-    public void setUp() {
-        Mockito.when(rpcRegistry.getRpc(ArgumentMatchers.<Class<? extends Rpc>>any()))
-                .thenAnswer(invocation -> {
-                    Class<? extends Rpc> serviceType =
-                            (Class<? extends Rpc>) invocation.getArguments()[0];
-                    return Mockito.mock(serviceType);
-                });
-
-        provider = new ForwardingRulesSyncProvider(dataBroker, rpcRegistry, clusterSingletonService);
-        Mockito.verify(rpcRegistry).getRpc(UpdateTable.class);
-        Mockito.verify(rpcRegistry).getRpc(ProcessFlatBatch.class);
-    }
-
     @Test
     public void testInit() {
-        provider.init();
-
-        Mockito.verify(dataBroker, Mockito.times(2)).registerDataTreeChangeListener(
-                ArgumentMatchers.any(), ArgumentMatchers.any());
-    }
+        when(rpcRegistry.getRpc(any())).thenAnswer(invocation -> mock(invocation.<Class<?>>getArgument(0)));
 
-    @After
-    public void tearDown() {
-        provider.close();
+        try (var provider = new ForwardingRulesSyncProvider(dataBroker, rpcRegistry, clusterSingletonService)) {
+            verify(rpcRegistry).getRpc(UpdateTable.class);
+            verify(rpcRegistry).getRpc(ProcessFlatBatch.class);
+            verify(dataBroker, times(2)).registerDataTreeChangeListener(any(), any());
+        }
     }
-
 }