Merge "Bug 6745 Improve compression queue locking and handle InterruptedException"
[openflowplugin.git] / applications / forwardingrules-sync / src / main / java / org / opendaylight / openflowplugin / applications / frsync / impl / ForwardingRulesSyncProvider.java
index 0e5ae5e568439bec77f662be6b8920c537ab698e..cdbec91a79d4e9c2e7d78d5a7bf974e0999f25db 100644 (file)
@@ -12,15 +12,17 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.Objects;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
 import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
 import org.opendaylight.openflowplugin.applications.frsync.NodeListener;
 import org.opendaylight.openflowplugin.applications.frsync.SyncPlanPushStrategy;
 import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
@@ -28,8 +30,10 @@ import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeCa
 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeDao;
 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeOdlDao;
 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeSnapshotDao;
+import org.opendaylight.openflowplugin.applications.frsync.impl.clustering.DeviceMastershipManager;
 import org.opendaylight.openflowplugin.applications.frsync.impl.strategy.SyncPlanPushStrategyFlatBatchImpl;
-import org.opendaylight.openflowplugin.applications.frsync.util.RetryRegistry;
+import org.opendaylight.openflowplugin.applications.frsync.impl.strategy.TableForwarder;
+import org.opendaylight.openflowplugin.applications.frsync.util.ReconciliationRegistry;
 import org.opendaylight.openflowplugin.applications.frsync.util.SemaphoreKeeperGuavaImpl;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.SalFlatBatchService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
@@ -47,8 +51,10 @@ import org.slf4j.LoggerFactory;
 public class ForwardingRulesSyncProvider implements AutoCloseable, BindingAwareProvider {
 
     private static final Logger LOG = LoggerFactory.getLogger(ForwardingRulesSyncProvider.class);
+    private static final String FRS_EXECUTOR_PREFIX = "FRS-executor-";
 
     private final DataBroker dataService;
+    private final ClusterSingletonServiceProvider clusterSingletonService;
     private final SalTableService salTableService;
     private final SalFlatBatchService flatBatchService;
 
@@ -69,9 +75,12 @@ public class ForwardingRulesSyncProvider implements AutoCloseable, BindingAwareP
 
     public ForwardingRulesSyncProvider(final BindingAwareBroker broker,
                                        final DataBroker dataBroker,
-                                       final RpcConsumerRegistry rpcRegistry) {
-        Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
+                                       final RpcConsumerRegistry rpcRegistry,
+                                       final ClusterSingletonServiceProvider clusterSingletonService) {
+        Preconditions.checkNotNull(rpcRegistry, "RpcConsumerRegistry can not be null!");
         this.dataService = Preconditions.checkNotNull(dataBroker, "DataBroker can not be null!");
+        this.clusterSingletonService = Preconditions.checkNotNull(clusterSingletonService,
+                "ClusterSingletonServiceProvider can not be null!");
         this.salTableService = Preconditions.checkNotNull(rpcRegistry.getRpcService(SalTableService.class),
                 "RPC SalTableService not found.");
         this.flatBatchService = Preconditions.checkNotNull(rpcRegistry.getRpcService(SalFlatBatchService.class),
@@ -81,31 +90,34 @@ public class ForwardingRulesSyncProvider implements AutoCloseable, BindingAwareP
         nodeOperationalDataTreePath = new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, NODE_WC_PATH);
 
         final ExecutorService executorService= Executors.newCachedThreadPool(new ThreadFactoryBuilder()
-                .setNameFormat(SyncReactorFutureDecorator.FRM_RPC_CLIENT_PREFIX + "%d")
+                .setNameFormat(FRS_EXECUTOR_PREFIX + "%d")
                 .setDaemon(false)
                 .setUncaughtExceptionHandler((thread, e) -> LOG.error("Uncaught exception {}", thread, e))
                 .build());
         syncThreadPool = MoreExecutors.listeningDecorator(executorService);
-
         broker.registerProvider(this);
     }
 
     @Override
-    public void onSessionInitiated(final BindingAwareBroker.ProviderContext providerContext) {
+    public void onSessionInitiated(final ProviderContext providerContext) {
         final TableForwarder tableForwarder = new TableForwarder(salTableService);
 
         final SyncPlanPushStrategy syncPlanPushStrategy = new SyncPlanPushStrategyFlatBatchImpl()
                 .setFlatBatchService(flatBatchService)
                 .setTableForwarder(tableForwarder);
 
-        final RetryRegistry retryRegistry = new RetryRegistry();
+        final ReconciliationRegistry reconciliationRegistry = new ReconciliationRegistry();
+        final DeviceMastershipManager deviceMastershipManager =
+                new DeviceMastershipManager(clusterSingletonService, reconciliationRegistry);
 
         final SyncReactor syncReactorImpl = new SyncReactorImpl(syncPlanPushStrategy);
-        final SyncReactor syncReactorRetry = new SyncReactorRetryDecorator(syncReactorImpl, retryRegistry);
+        final SyncReactor syncReactorRetry = new SyncReactorRetryDecorator(syncReactorImpl, reconciliationRegistry);
         final SyncReactor syncReactorGuard = new SyncReactorGuardDecorator(syncReactorRetry,
-                new SemaphoreKeeperGuavaImpl<InstanceIdentifier<FlowCapableNode>>(1, true));
+                new SemaphoreKeeperGuavaImpl<>(1, true));
+        final SyncReactor syncReactorFutureZip = new SyncReactorFutureZipDecorator(syncReactorGuard, syncThreadPool,
+                new SemaphoreKeeperGuavaImpl<>(1, true));
 
-        final SyncReactor reactor = new SyncReactorFutureZipDecorator(syncReactorGuard, syncThreadPool);
+        final SyncReactor reactor = new SyncReactorClusterDecorator(syncReactorFutureZip, deviceMastershipManager);
 
         final FlowCapableNodeSnapshotDao configSnapshot = new FlowCapableNodeSnapshotDao();
         final FlowCapableNodeSnapshotDao operationalSnapshot = new FlowCapableNodeSnapshotDao();
@@ -117,7 +129,7 @@ public class ForwardingRulesSyncProvider implements AutoCloseable, BindingAwareP
         final NodeListener<FlowCapableNode> nodeListenerConfig =
                 new SimplifiedConfigListener(reactor, configSnapshot, operationalDao);
         final NodeListener<Node> nodeListenerOperational =
-                new SimplifiedOperationalRetryListener(reactor, operationalSnapshot, configDao, retryRegistry);
+                new SimplifiedOperationalListener(reactor, operationalSnapshot, configDao, reconciliationRegistry, deviceMastershipManager);
 
         dataTreeConfigChangeListener =
                 dataService.registerDataTreeChangeListener(nodeConfigDataTreePath, nodeListenerConfig);
@@ -127,17 +139,18 @@ public class ForwardingRulesSyncProvider implements AutoCloseable, BindingAwareP
         LOG.info("ForwardingRulesSync has started.");
     }
 
-    public void close() throws InterruptedException {
-        if (dataTreeConfigChangeListener != null) {
+    public void close() {
+        if (Objects.nonNull(dataTreeConfigChangeListener)) {
             dataTreeConfigChangeListener.close();
             dataTreeConfigChangeListener = null;
         }
 
-        if (dataTreeOperationalChangeListener != null) {
+        if (Objects.nonNull(dataTreeOperationalChangeListener)) {
             dataTreeOperationalChangeListener.close();
             dataTreeOperationalChangeListener = null;
         }
 
         syncThreadPool.shutdown();
     }
+
 }