From cea3db872fbd1bb614fdea67f5b70dbfbdc473cc Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Wed, 7 Feb 2024 08:44:42 +0100 Subject: [PATCH] Ditch blueprint from frm-sync forwarding-rules-sync has an extremely simplistic blueprint, replace it with Declarative Services wiring. JIRA: OPNFLWPLUG-1112 Change-Id: I35d9add3e1b861cdd59000ed9ba0e395f706e6a6 Signed-off-by: Robert Varga --- applications/forwardingrules-sync/pom.xml | 60 ++++++---------- .../impl/ForwardingRulesSyncProvider.java | 70 ++++++++++--------- .../impl/SyncReactorFutureDecorator.java | 27 +++---- .../impl/SyncReactorFutureZipDecorator.java | 27 +++---- .../blueprint/forwardingrules-sync.xml | 24 ------- .../impl/ForwardingRulesSyncProviderTest.java | 42 ++++------- 6 files changed, 97 insertions(+), 153 deletions(-) delete mode 100644 applications/forwardingrules-sync/src/main/resources/OSGI-INF/blueprint/forwardingrules-sync.xml diff --git a/applications/forwardingrules-sync/pom.xml b/applications/forwardingrules-sync/pom.xml index 6ff0d095d8..5266aa91f1 100644 --- a/applications/forwardingrules-sync/pom.xml +++ b/applications/forwardingrules-sync/pom.xml @@ -13,52 +13,53 @@ bundle + + com.google.guava + guava + + + com.guicedee.services + javax.inject + true + + + jakarta.annotation + jakarta.annotation-api + true + org.opendaylight.mdsal mdsal-binding-api - + + org.opendaylight.mdsal + mdsal-singleton-common-api + org.opendaylight.openflowplugin.model model-inventory - org.opendaylight.openflowplugin.model model-flow-service - - - org.opendaylight.yangtools - yang-common - - org.opendaylight.yangtools concepts - - org.opendaylight.mdsal - mdsal-singleton-common-api + org.opendaylight.yangtools + yang-common - - com.google.guava - guava + org.osgi + org.osgi.service.component.annotations org.opendaylight.mdsal mdsal-binding-test-utils - - - org.slf4j - slf4j-simple - test - - org.sonarsource.java sonar-jacoco-listeners @@ -67,27 +68,10 @@ - - - - true - src/main/resources - - - - - - org.apache.felix - maven-bundle-plugin - - - - scm:git:ssh://git.opendaylight.org:29418/openflowplugin.git scm:git:ssh://git.opendaylight.org:29418/openflowplugin.git https://wiki.opendaylight.org/view/OpenDaylight_OpenFlow_Plugin:Main HEAD - diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/ForwardingRulesSyncProvider.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/ForwardingRulesSyncProvider.java index 537d16f38b..b4db87d40d 100644 --- a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/ForwardingRulesSyncProvider.java +++ b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/ForwardingRulesSyncProvider.java @@ -9,21 +9,19 @@ package org.opendaylight.openflowplugin.applications.frsync.impl; import static java.util.Objects.requireNonNull; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import javax.annotation.PreDestroy; +import javax.inject.Inject; +import javax.inject.Singleton; import org.opendaylight.mdsal.binding.api.DataBroker; import org.opendaylight.mdsal.binding.api.DataTreeIdentifier; import org.opendaylight.mdsal.binding.api.RpcConsumerRegistry; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider; import org.opendaylight.openflowplugin.applications.frsync.NodeListener; -import org.opendaylight.openflowplugin.applications.frsync.SyncPlanPushStrategy; -import org.opendaylight.openflowplugin.applications.frsync.SyncReactor; import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeCachedDao; -import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeDao; import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeOdlDao; import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeSnapshotDao; import org.opendaylight.openflowplugin.applications.frsync.impl.clustering.DeviceMastershipManager; @@ -36,14 +34,19 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.N import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.UpdateTable; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Deactivate; +import org.osgi.service.component.annotations.Reference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Top provider of forwarding rules synchronization functionality. */ +@Singleton +@Component(service = { }) public class ForwardingRulesSyncProvider implements AutoCloseable { - private static final Logger LOG = LoggerFactory.getLogger(ForwardingRulesSyncProvider.class); private static final String FRS_EXECUTOR_PREFIX = "FRS-executor-"; @@ -65,11 +68,13 @@ public class ForwardingRulesSyncProvider implements AutoCloseable { private ListenerRegistration dataTreeConfigChangeListener; private ListenerRegistration dataTreeOperationalChangeListener; - private final ListeningExecutorService syncThreadPool; + private final ExecutorService syncThreadPool; - public ForwardingRulesSyncProvider(final DataBroker dataBroker, - final RpcConsumerRegistry rpcRegistry, - final ClusterSingletonServiceProvider clusterSingletonService) { + @Inject + @Activate + public ForwardingRulesSyncProvider(@Reference final DataBroker dataBroker, + @Reference final RpcConsumerRegistry rpcRegistry, + @Reference final ClusterSingletonServiceProvider clusterSingletonService) { requireNonNull(rpcRegistry, "RpcConsumerRegistry can not be null!"); dataService = requireNonNull(dataBroker, "DataBroker can not be null!"); this.clusterSingletonService = requireNonNull(clusterSingletonService, @@ -83,33 +88,30 @@ public class ForwardingRulesSyncProvider implements AutoCloseable { FLOW_CAPABLE_NODE_WC_PATH); nodeOperationalDataTreePath = DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL, NODE_WC_PATH); - final ExecutorService executorService = Executors.newCachedThreadPool(new ThreadFactoryBuilder() - .setNameFormat(FRS_EXECUTOR_PREFIX + "%d") - .setDaemon(false) - .setUncaughtExceptionHandler((thread, ex) -> LOG.error("Uncaught exception {}", thread, ex)) - .build()); - syncThreadPool = MoreExecutors.listeningDecorator(executorService); - } + syncThreadPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder() + .setNameFormat(FRS_EXECUTOR_PREFIX + "%d") + .setDaemon(false) + .setUncaughtExceptionHandler((thread, ex) -> LOG.error("Uncaught exception {}", thread, ex)) + .build()); - public void init() { - final SyncPlanPushStrategy syncPlanPushStrategy = new SyncPlanPushStrategyFlatBatchImpl(processFlatBatch); + final var syncPlanPushStrategy = new SyncPlanPushStrategyFlatBatchImpl(processFlatBatch); - final ReconciliationRegistry reconciliationRegistry = new ReconciliationRegistry(); - final DeviceMastershipManager deviceMastershipManager = - new DeviceMastershipManager(clusterSingletonService, reconciliationRegistry); + final var reconciliationRegistry = new ReconciliationRegistry(); + final var deviceMastershipManager = new DeviceMastershipManager(clusterSingletonService, + reconciliationRegistry); - final SyncReactor syncReactorImpl = new SyncReactorImpl(syncPlanPushStrategy); - final SyncReactor syncReactorRetry = new SyncReactorRetryDecorator(syncReactorImpl, reconciliationRegistry); - final SyncReactor syncReactorGuard = new SyncReactorGuardDecorator(syncReactorRetry); - final SyncReactor syncReactorFutureZip = new SyncReactorFutureZipDecorator(syncReactorGuard, syncThreadPool); + final var syncReactorImpl = new SyncReactorImpl(syncPlanPushStrategy); + final var syncReactorRetry = new SyncReactorRetryDecorator(syncReactorImpl, reconciliationRegistry); + final var syncReactorGuard = new SyncReactorGuardDecorator(syncReactorRetry); + final var syncReactorFutureZip = new SyncReactorFutureZipDecorator(syncReactorGuard, syncThreadPool); - final SyncReactor reactor = new SyncReactorClusterDecorator(syncReactorFutureZip, deviceMastershipManager); + final var reactor = new SyncReactorClusterDecorator(syncReactorFutureZip, deviceMastershipManager); - final FlowCapableNodeSnapshotDao configSnapshot = new FlowCapableNodeSnapshotDao(); - final FlowCapableNodeSnapshotDao operationalSnapshot = new FlowCapableNodeSnapshotDao(); - final FlowCapableNodeDao configDao = new FlowCapableNodeCachedDao(configSnapshot, + final var configSnapshot = new FlowCapableNodeSnapshotDao(); + final var operationalSnapshot = new FlowCapableNodeSnapshotDao(); + final var configDao = new FlowCapableNodeCachedDao(configSnapshot, new FlowCapableNodeOdlDao(dataService, LogicalDatastoreType.CONFIGURATION)); - final FlowCapableNodeDao operationalDao = new FlowCapableNodeCachedDao(operationalSnapshot, + final var operationalDao = new FlowCapableNodeCachedDao(operationalSnapshot, new FlowCapableNodeOdlDao(dataService, LogicalDatastoreType.OPERATIONAL)); final NodeListener nodeListenerConfig = @@ -121,10 +123,11 @@ public class ForwardingRulesSyncProvider implements AutoCloseable { dataService.registerDataTreeChangeListener(nodeConfigDataTreePath, nodeListenerConfig); dataTreeOperationalChangeListener = dataService.registerDataTreeChangeListener(nodeOperationalDataTreePath, nodeListenerOperational); - - LOG.info("ForwardingRulesSync has started."); + LOG.info("ForwardingRulesSync started"); } + @PreDestroy + @Deactivate @Override public void close() { if (dataTreeConfigChangeListener != null) { @@ -138,5 +141,6 @@ public class ForwardingRulesSyncProvider implements AutoCloseable { } syncThreadPool.shutdown(); + LOG.info("ForwardingRulesSync stopped"); } } diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureDecorator.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureDecorator.java index bb418b2b44..82bc6b1d74 100644 --- a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureDecorator.java +++ b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureDecorator.java @@ -7,15 +7,17 @@ */ package org.opendaylight.openflowplugin.applications.frsync.impl; +import static java.util.Objects.requireNonNull; + +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.opendaylight.openflowplugin.applications.frsync.SyncReactor; import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil; import org.opendaylight.openflowplugin.applications.frsync.util.SyncupEntry; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,31 +26,32 @@ import org.slf4j.LoggerFactory; * Decorator for running delegate syncup in Future. */ public class SyncReactorFutureDecorator implements SyncReactor { - private static final Logger LOG = LoggerFactory.getLogger(SyncReactorFutureDecorator.class); + private final SyncReactor delegate; - private final ListeningExecutorService executorService; + private final Executor executor; - public SyncReactorFutureDecorator(final SyncReactor delegate, final ListeningExecutorService executorService) { - this.delegate = delegate; - this.executorService = executorService; + public SyncReactorFutureDecorator(final SyncReactor delegate, final Executor executor) { + this.delegate = requireNonNull(delegate); + this.executor = requireNonNull(executor); } + @Override public ListenableFuture syncup(final InstanceIdentifier flowcapableNodePath, - final SyncupEntry syncupEntry) { - final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath); - return executorService.submit(() -> { + final SyncupEntry syncupEntry) { + final var nodeId = PathUtil.digNodeId(flowcapableNodePath); + return Futures.submit(() -> { try { return doSyncupInFuture(flowcapableNodePath, syncupEntry).get(10000, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { LOG.warn("Syncup future timeout occured {}", nodeId.getValue()); return Boolean.FALSE; } - }); + }, executor); } protected ListenableFuture doSyncupInFuture(final InstanceIdentifier flowcapableNodePath, - final SyncupEntry syncupEntry) { + final SyncupEntry syncupEntry) { return delegate.syncup(flowcapableNodePath, syncupEntry); } } diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureZipDecorator.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureZipDecorator.java index 8f49b42a9d..5b77f5089e 100644 --- a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureZipDecorator.java +++ b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureZipDecorator.java @@ -9,9 +9,9 @@ package org.opendaylight.openflowplugin.applications.frsync.impl; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.Executor; import java.util.concurrent.Semaphore; import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper; import org.opendaylight.openflowplugin.applications.frsync.SyncReactor; @@ -24,18 +24,17 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; * Enriches {@link SyncReactorFutureDecorator} with state compression. */ public class SyncReactorFutureZipDecorator extends SyncReactorFutureDecorator { - private final Map, SyncupEntry> compressionQueue = new HashMap<>(); private final SemaphoreKeeper> semaphoreKeeper = new SemaphoreKeeperGuavaImpl<>(1, true); - public SyncReactorFutureZipDecorator(final SyncReactor delegate, final ListeningExecutorService executorService) { - super(delegate, executorService); + public SyncReactorFutureZipDecorator(final SyncReactor delegate, final Executor executor) { + super(delegate, executor); } @Override public ListenableFuture syncup(final InstanceIdentifier flowcapableNodePath, - final SyncupEntry syncupEntry) { + final SyncupEntry syncupEntry) { Semaphore guard = null; try { guard = semaphoreKeeper.summonGuardAndAcquire(flowcapableNodePath); @@ -54,9 +53,8 @@ public class SyncReactorFutureZipDecorator extends SyncReactorFutureDecorator { @Override protected ListenableFuture doSyncupInFuture(final InstanceIdentifier flowcapableNodePath, - final SyncupEntry syncupEntry) { - final SyncupEntry lastCompressionState = removeLastCompressionState(flowcapableNodePath); - + final SyncupEntry syncupEntry) { + final var lastCompressionState = removeLastCompressionState(flowcapableNodePath); if (lastCompressionState == null) { return Futures.immediateFuture(Boolean.TRUE); } else { @@ -70,9 +68,8 @@ public class SyncReactorFutureZipDecorator extends SyncReactorFutureDecorator { * entry (config vs. operational is coming) in queue otherwise. */ private boolean updateCompressionState(final InstanceIdentifier flowcapableNodePath, - final SyncupEntry syncupEntry) { - final SyncupEntry previousEntry = compressionQueue.get(flowcapableNodePath); - + final SyncupEntry syncupEntry) { + final var previousEntry = compressionQueue.get(flowcapableNodePath); if (previousEntry != null && syncupEntry.isOptimizedConfigDelta()) { updateOptimizedConfigDelta(flowcapableNodePath, syncupEntry, previousEntry); } else { @@ -82,11 +79,9 @@ public class SyncReactorFutureZipDecorator extends SyncReactorFutureDecorator { } private void updateOptimizedConfigDelta(final InstanceIdentifier flowcapableNodePath, - final SyncupEntry actual, - final SyncupEntry previous) { - final SyncupEntry updatedEntry = new SyncupEntry(actual.getAfter(), actual.getDsTypeAfter(), - previous.getBefore(), previous.getDsTypeBefore()); - compressionQueue.put(flowcapableNodePath, updatedEntry); + final SyncupEntry actual, final SyncupEntry previous) { + compressionQueue.put(flowcapableNodePath, new SyncupEntry(actual.getAfter(), actual.getDsTypeAfter(), + previous.getBefore(), previous.getDsTypeBefore())); } private SyncupEntry removeLastCompressionState(final InstanceIdentifier flowcapableNodePath) { diff --git a/applications/forwardingrules-sync/src/main/resources/OSGI-INF/blueprint/forwardingrules-sync.xml b/applications/forwardingrules-sync/src/main/resources/OSGI-INF/blueprint/forwardingrules-sync.xml deleted file mode 100644 index 29ae9e37c8..0000000000 --- a/applications/forwardingrules-sync/src/main/resources/OSGI-INF/blueprint/forwardingrules-sync.xml +++ /dev/null @@ -1,24 +0,0 @@ - - - - - - - - - - - - - - - diff --git a/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/ForwardingRulesSyncProviderTest.java b/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/ForwardingRulesSyncProviderTest.java index 97a34d7ffa..5f953be95a 100644 --- a/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/ForwardingRulesSyncProviderTest.java +++ b/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/ForwardingRulesSyncProviderTest.java @@ -7,28 +7,27 @@ */ package org.opendaylight.openflowplugin.applications.frsync.impl; -import org.junit.After; -import org.junit.Before; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.ArgumentMatchers; import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; import org.opendaylight.mdsal.binding.api.DataBroker; import org.opendaylight.mdsal.binding.api.RpcConsumerRegistry; import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider; import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatch; import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.UpdateTable; -import org.opendaylight.yangtools.yang.binding.Rpc; /** * Test for {@link ForwardingRulesSyncProvider}. */ @RunWith(MockitoJUnitRunner.class) public class ForwardingRulesSyncProviderTest { - - private ForwardingRulesSyncProvider provider; @Mock private DataBroker dataBroker; @Mock @@ -36,31 +35,14 @@ public class ForwardingRulesSyncProviderTest { @Mock private ClusterSingletonServiceProvider clusterSingletonService; - @Before - public void setUp() { - Mockito.when(rpcRegistry.getRpc(ArgumentMatchers.>any())) - .thenAnswer(invocation -> { - Class serviceType = - (Class) invocation.getArguments()[0]; - return Mockito.mock(serviceType); - }); - - provider = new ForwardingRulesSyncProvider(dataBroker, rpcRegistry, clusterSingletonService); - Mockito.verify(rpcRegistry).getRpc(UpdateTable.class); - Mockito.verify(rpcRegistry).getRpc(ProcessFlatBatch.class); - } - @Test public void testInit() { - provider.init(); - - Mockito.verify(dataBroker, Mockito.times(2)).registerDataTreeChangeListener( - ArgumentMatchers.any(), ArgumentMatchers.any()); - } + when(rpcRegistry.getRpc(any())).thenAnswer(invocation -> mock(invocation.>getArgument(0))); - @After - public void tearDown() { - provider.close(); + try (var provider = new ForwardingRulesSyncProvider(dataBroker, rpcRegistry, clusterSingletonService)) { + verify(rpcRegistry).getRpc(UpdateTable.class); + verify(rpcRegistry).getRpc(ProcessFlatBatch.class); + verify(dataBroker, times(2)).registerDataTreeChangeListener(any(), any()); + } } - } -- 2.36.6