<packaging>bundle</packaging>
<dependencies>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.guicedee.services</groupId>
+ <artifactId>javax.inject</artifactId>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>jakarta.annotation</groupId>
+ <artifactId>jakarta.annotation-api</artifactId>
+ <optional>true</optional>
+ </dependency>
<dependency>
<groupId>org.opendaylight.mdsal</groupId>
<artifactId>mdsal-binding-api</artifactId>
</dependency>
-
+ <dependency>
+ <groupId>org.opendaylight.mdsal</groupId>
+ <artifactId>mdsal-singleton-common-api</artifactId>
+ </dependency>
<dependency>
<groupId>org.opendaylight.openflowplugin.model</groupId>
<artifactId>model-inventory</artifactId>
</dependency>
-
<dependency>
<groupId>org.opendaylight.openflowplugin.model</groupId>
<artifactId>model-flow-service</artifactId>
</dependency>
-
- <dependency>
- <groupId>org.opendaylight.yangtools</groupId>
- <artifactId>yang-common</artifactId>
- </dependency>
-
<dependency>
<groupId>org.opendaylight.yangtools</groupId>
<artifactId>concepts</artifactId>
</dependency>
-
<dependency>
- <groupId>org.opendaylight.mdsal</groupId>
- <artifactId>mdsal-singleton-common-api</artifactId>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-common</artifactId>
</dependency>
-
<dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.service.component.annotations</artifactId>
</dependency>
<dependency>
<groupId>org.opendaylight.mdsal</groupId>
<artifactId>mdsal-binding-test-utils</artifactId>
</dependency>
-
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-simple</artifactId>
- <scope>test</scope>
- </dependency>
-
<dependency>
<groupId>org.sonarsource.java</groupId>
<artifactId>sonar-jacoco-listeners</artifactId>
</dependency>
</dependencies>
- <build>
- <resources>
- <resource>
- <filtering>true</filtering>
- <directory>src/main/resources</directory>
- </resource>
- </resources>
-
- <plugins>
- <plugin>
- <groupId>org.apache.felix</groupId>
- <artifactId>maven-bundle-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
-
<scm>
<connection>scm:git:ssh://git.opendaylight.org:29418/openflowplugin.git</connection>
<developerConnection>scm:git:ssh://git.opendaylight.org:29418/openflowplugin.git</developerConnection>
<url>https://wiki.opendaylight.org/view/OpenDaylight_OpenFlow_Plugin:Main</url>
<tag>HEAD</tag>
</scm>
-
</project>
import static java.util.Objects.requireNonNull;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import javax.annotation.PreDestroy;
+import javax.inject.Inject;
+import javax.inject.Singleton;
import org.opendaylight.mdsal.binding.api.DataBroker;
import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
import org.opendaylight.mdsal.binding.api.RpcConsumerRegistry;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
import org.opendaylight.openflowplugin.applications.frsync.NodeListener;
-import org.opendaylight.openflowplugin.applications.frsync.SyncPlanPushStrategy;
-import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeCachedDao;
-import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeDao;
import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeOdlDao;
import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeSnapshotDao;
import org.opendaylight.openflowplugin.applications.frsync.impl.clustering.DeviceMastershipManager;
import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.UpdateTable;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Top provider of forwarding rules synchronization functionality.
*/
+@Singleton
+@Component(service = { })
public class ForwardingRulesSyncProvider implements AutoCloseable {
-
private static final Logger LOG = LoggerFactory.getLogger(ForwardingRulesSyncProvider.class);
private static final String FRS_EXECUTOR_PREFIX = "FRS-executor-";
private ListenerRegistration<?> dataTreeConfigChangeListener;
private ListenerRegistration<?> dataTreeOperationalChangeListener;
- private final ListeningExecutorService syncThreadPool;
+ private final ExecutorService syncThreadPool;
- public ForwardingRulesSyncProvider(final DataBroker dataBroker,
- final RpcConsumerRegistry rpcRegistry,
- final ClusterSingletonServiceProvider clusterSingletonService) {
+ @Inject
+ @Activate
+ public ForwardingRulesSyncProvider(@Reference final DataBroker dataBroker,
+ @Reference final RpcConsumerRegistry rpcRegistry,
+ @Reference final ClusterSingletonServiceProvider clusterSingletonService) {
requireNonNull(rpcRegistry, "RpcConsumerRegistry can not be null!");
dataService = requireNonNull(dataBroker, "DataBroker can not be null!");
this.clusterSingletonService = requireNonNull(clusterSingletonService,
FLOW_CAPABLE_NODE_WC_PATH);
nodeOperationalDataTreePath = DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL, NODE_WC_PATH);
- final ExecutorService executorService = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
- .setNameFormat(FRS_EXECUTOR_PREFIX + "%d")
- .setDaemon(false)
- .setUncaughtExceptionHandler((thread, ex) -> LOG.error("Uncaught exception {}", thread, ex))
- .build());
- syncThreadPool = MoreExecutors.listeningDecorator(executorService);
- }
+ syncThreadPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
+ .setNameFormat(FRS_EXECUTOR_PREFIX + "%d")
+ .setDaemon(false)
+ .setUncaughtExceptionHandler((thread, ex) -> LOG.error("Uncaught exception {}", thread, ex))
+ .build());
- public void init() {
- final SyncPlanPushStrategy syncPlanPushStrategy = new SyncPlanPushStrategyFlatBatchImpl(processFlatBatch);
+ final var syncPlanPushStrategy = new SyncPlanPushStrategyFlatBatchImpl(processFlatBatch);
- final ReconciliationRegistry reconciliationRegistry = new ReconciliationRegistry();
- final DeviceMastershipManager deviceMastershipManager =
- new DeviceMastershipManager(clusterSingletonService, reconciliationRegistry);
+ final var reconciliationRegistry = new ReconciliationRegistry();
+ final var deviceMastershipManager = new DeviceMastershipManager(clusterSingletonService,
+ reconciliationRegistry);
- final SyncReactor syncReactorImpl = new SyncReactorImpl(syncPlanPushStrategy);
- final SyncReactor syncReactorRetry = new SyncReactorRetryDecorator(syncReactorImpl, reconciliationRegistry);
- final SyncReactor syncReactorGuard = new SyncReactorGuardDecorator(syncReactorRetry);
- final SyncReactor syncReactorFutureZip = new SyncReactorFutureZipDecorator(syncReactorGuard, syncThreadPool);
+ final var syncReactorImpl = new SyncReactorImpl(syncPlanPushStrategy);
+ final var syncReactorRetry = new SyncReactorRetryDecorator(syncReactorImpl, reconciliationRegistry);
+ final var syncReactorGuard = new SyncReactorGuardDecorator(syncReactorRetry);
+ final var syncReactorFutureZip = new SyncReactorFutureZipDecorator(syncReactorGuard, syncThreadPool);
- final SyncReactor reactor = new SyncReactorClusterDecorator(syncReactorFutureZip, deviceMastershipManager);
+ final var reactor = new SyncReactorClusterDecorator(syncReactorFutureZip, deviceMastershipManager);
- final FlowCapableNodeSnapshotDao configSnapshot = new FlowCapableNodeSnapshotDao();
- final FlowCapableNodeSnapshotDao operationalSnapshot = new FlowCapableNodeSnapshotDao();
- final FlowCapableNodeDao configDao = new FlowCapableNodeCachedDao(configSnapshot,
+ final var configSnapshot = new FlowCapableNodeSnapshotDao();
+ final var operationalSnapshot = new FlowCapableNodeSnapshotDao();
+ final var configDao = new FlowCapableNodeCachedDao(configSnapshot,
new FlowCapableNodeOdlDao(dataService, LogicalDatastoreType.CONFIGURATION));
- final FlowCapableNodeDao operationalDao = new FlowCapableNodeCachedDao(operationalSnapshot,
+ final var operationalDao = new FlowCapableNodeCachedDao(operationalSnapshot,
new FlowCapableNodeOdlDao(dataService, LogicalDatastoreType.OPERATIONAL));
final NodeListener<FlowCapableNode> nodeListenerConfig =
dataService.registerDataTreeChangeListener(nodeConfigDataTreePath, nodeListenerConfig);
dataTreeOperationalChangeListener =
dataService.registerDataTreeChangeListener(nodeOperationalDataTreePath, nodeListenerOperational);
-
- LOG.info("ForwardingRulesSync has started.");
+ LOG.info("ForwardingRulesSync started");
}
+ @PreDestroy
+ @Deactivate
@Override
public void close() {
if (dataTreeConfigChangeListener != null) {
}
syncThreadPool.shutdown();
+ LOG.info("ForwardingRulesSync stopped");
}
}
*/
package org.opendaylight.openflowplugin.applications.frsync.impl;
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
import org.opendaylight.openflowplugin.applications.frsync.util.SyncupEntry;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* Decorator for running delegate syncup in Future.
*/
public class SyncReactorFutureDecorator implements SyncReactor {
-
private static final Logger LOG = LoggerFactory.getLogger(SyncReactorFutureDecorator.class);
+
private final SyncReactor delegate;
- private final ListeningExecutorService executorService;
+ private final Executor executor;
- public SyncReactorFutureDecorator(final SyncReactor delegate, final ListeningExecutorService executorService) {
- this.delegate = delegate;
- this.executorService = executorService;
+ public SyncReactorFutureDecorator(final SyncReactor delegate, final Executor executor) {
+ this.delegate = requireNonNull(delegate);
+ this.executor = requireNonNull(executor);
}
+ @Override
public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
- final SyncupEntry syncupEntry) {
- final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
- return executorService.submit(() -> {
+ final SyncupEntry syncupEntry) {
+ final var nodeId = PathUtil.digNodeId(flowcapableNodePath);
+ return Futures.submit(() -> {
try {
return doSyncupInFuture(flowcapableNodePath, syncupEntry).get(10000, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
LOG.warn("Syncup future timeout occured {}", nodeId.getValue());
return Boolean.FALSE;
}
- });
+ }, executor);
}
protected ListenableFuture<Boolean> doSyncupInFuture(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
- final SyncupEntry syncupEntry) {
+ final SyncupEntry syncupEntry) {
return delegate.syncup(flowcapableNodePath, syncupEntry);
}
}
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper;
import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
* Enriches {@link SyncReactorFutureDecorator} with state compression.
*/
public class SyncReactorFutureZipDecorator extends SyncReactorFutureDecorator {
-
private final Map<InstanceIdentifier<FlowCapableNode>, SyncupEntry> compressionQueue = new HashMap<>();
private final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper =
new SemaphoreKeeperGuavaImpl<>(1, true);
- public SyncReactorFutureZipDecorator(final SyncReactor delegate, final ListeningExecutorService executorService) {
- super(delegate, executorService);
+ public SyncReactorFutureZipDecorator(final SyncReactor delegate, final Executor executor) {
+ super(delegate, executor);
}
@Override
public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
- final SyncupEntry syncupEntry) {
+ final SyncupEntry syncupEntry) {
Semaphore guard = null;
try {
guard = semaphoreKeeper.summonGuardAndAcquire(flowcapableNodePath);
@Override
protected ListenableFuture<Boolean> doSyncupInFuture(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
- final SyncupEntry syncupEntry) {
- final SyncupEntry lastCompressionState = removeLastCompressionState(flowcapableNodePath);
-
+ final SyncupEntry syncupEntry) {
+ final var lastCompressionState = removeLastCompressionState(flowcapableNodePath);
if (lastCompressionState == null) {
return Futures.immediateFuture(Boolean.TRUE);
} else {
* entry (config vs. operational is coming) in queue otherwise.
*/
private boolean updateCompressionState(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
- final SyncupEntry syncupEntry) {
- final SyncupEntry previousEntry = compressionQueue.get(flowcapableNodePath);
-
+ final SyncupEntry syncupEntry) {
+ final var previousEntry = compressionQueue.get(flowcapableNodePath);
if (previousEntry != null && syncupEntry.isOptimizedConfigDelta()) {
updateOptimizedConfigDelta(flowcapableNodePath, syncupEntry, previousEntry);
} else {
}
private void updateOptimizedConfigDelta(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
- final SyncupEntry actual,
- final SyncupEntry previous) {
- final SyncupEntry updatedEntry = new SyncupEntry(actual.getAfter(), actual.getDsTypeAfter(),
- previous.getBefore(), previous.getDsTypeBefore());
- compressionQueue.put(flowcapableNodePath, updatedEntry);
+ final SyncupEntry actual, final SyncupEntry previous) {
+ compressionQueue.put(flowcapableNodePath, new SyncupEntry(actual.getAfter(), actual.getDsTypeAfter(),
+ previous.getBefore(), previous.getDsTypeBefore()));
}
private SyncupEntry removeLastCompressionState(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath) {
+++ /dev/null
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
-
- This program and the accompanying materials are made available under the
- terms of the Eclipse Public License v1.0 which accompanies this distribution,
- and is available at http://www.eclipse.org/legal/epl-v10.html
--->
-<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
- xmlns:odl="http://opendaylight.org/xmlns/blueprint/v1.0.0"
- odl:use-default-for-reference-types="true">
-
- <reference id="dataBroker" interface="org.opendaylight.mdsal.binding.api.DataBroker"/>
- <reference id="rpcRegistry" interface="org.opendaylight.mdsal.binding.api.RpcConsumerRegistry"/>
- <reference id="clusterSingletonService" interface="org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider"/>
-
- <bean id="frSync" class="org.opendaylight.openflowplugin.applications.frsync.impl.ForwardingRulesSyncProvider"
- init-method="init" destroy-method="close">
- <argument ref="dataBroker"/>
- <argument ref="rpcRegistry"/>
- <argument ref="clusterSingletonService"/>
- </bean>
-
-</blueprint>
*/
package org.opendaylight.openflowplugin.applications.frsync.impl;
-import org.junit.After;
-import org.junit.Before;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
-import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.opendaylight.mdsal.binding.api.DataBroker;
import org.opendaylight.mdsal.binding.api.RpcConsumerRegistry;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatch;
import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.UpdateTable;
-import org.opendaylight.yangtools.yang.binding.Rpc;
/**
* Test for {@link ForwardingRulesSyncProvider}.
*/
@RunWith(MockitoJUnitRunner.class)
public class ForwardingRulesSyncProviderTest {
-
- private ForwardingRulesSyncProvider provider;
@Mock
private DataBroker dataBroker;
@Mock
@Mock
private ClusterSingletonServiceProvider clusterSingletonService;
- @Before
- public void setUp() {
- Mockito.when(rpcRegistry.getRpc(ArgumentMatchers.<Class<? extends Rpc>>any()))
- .thenAnswer(invocation -> {
- Class<? extends Rpc> serviceType =
- (Class<? extends Rpc>) invocation.getArguments()[0];
- return Mockito.mock(serviceType);
- });
-
- provider = new ForwardingRulesSyncProvider(dataBroker, rpcRegistry, clusterSingletonService);
- Mockito.verify(rpcRegistry).getRpc(UpdateTable.class);
- Mockito.verify(rpcRegistry).getRpc(ProcessFlatBatch.class);
- }
-
@Test
public void testInit() {
- provider.init();
-
- Mockito.verify(dataBroker, Mockito.times(2)).registerDataTreeChangeListener(
- ArgumentMatchers.any(), ArgumentMatchers.any());
- }
+ when(rpcRegistry.getRpc(any())).thenAnswer(invocation -> mock(invocation.<Class<?>>getArgument(0)));
- @After
- public void tearDown() {
- provider.close();
+ try (var provider = new ForwardingRulesSyncProvider(dataBroker, rpcRegistry, clusterSingletonService)) {
+ verify(rpcRegistry).getRpc(UpdateTable.class);
+ verify(rpcRegistry).getRpc(ProcessFlatBatch.class);
+ verify(dataBroker, times(2)).registerDataTreeChangeListener(any(), any());
+ }
}
-
}