Ditch blueprint from frm-sync
[openflowplugin.git] / applications / forwardingrules-sync / src / main / java / org / opendaylight / openflowplugin / applications / frsync / impl / ForwardingRulesSyncProvider.java
index 24bfeafe06522d730c736a3bd82a83491562a42e..b4db87d40da7d66206c2020ae827de6bcc43416f 100644 (file)
@@ -1,57 +1,59 @@
-/**
+/*
  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.openflowplugin.applications.frsync.impl;
 
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ListeningExecutorService;
+import static java.util.Objects.requireNonNull;
+
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.lang.Thread.UncaughtExceptionHandler;
-import java.util.concurrent.Callable;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
-import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
-import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import javax.annotation.PreDestroy;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
+import org.opendaylight.mdsal.binding.api.RpcConsumerRegistry;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
 import org.opendaylight.openflowplugin.applications.frsync.NodeListener;
-import org.opendaylight.openflowplugin.applications.frsync.SyncPlanPushStrategy;
-import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeCachedDao;
-import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeDao;
 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeOdlDao;
 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeSnapshotDao;
+import org.opendaylight.openflowplugin.applications.frsync.impl.clustering.DeviceMastershipManager;
 import org.opendaylight.openflowplugin.applications.frsync.impl.strategy.SyncPlanPushStrategyFlatBatchImpl;
-import org.opendaylight.openflowplugin.applications.frsync.util.RetryRegistry;
-import org.opendaylight.openflowplugin.applications.frsync.util.SemaphoreKeeperGuavaImpl;
-import org.opendaylight.openflowplugin.common.wait.SimpleTaskRetryLooper;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.SalFlatBatchService;
+import org.opendaylight.openflowplugin.applications.frsync.util.ReconciliationRegistry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatch;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.SalTableService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.UpdateTable;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Top provider of forwarding rules synchronization functionality.
  */
-public class ForwardingRulesSyncProvider implements AutoCloseable, BindingAwareProvider {
-
+@Singleton
+@Component(service = { })
+public class ForwardingRulesSyncProvider implements AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(ForwardingRulesSyncProvider.class);
-    private static final int STARTUP_LOOP_TICK = 500;
-    private static final int STARTUP_LOOP_MAX_RETRIES = 8;
+    private static final String FRS_EXECUTOR_PREFIX = "FRS-executor-";
 
     private final DataBroker dataService;
-    private final SalTableService salTableService;
-    private final SalFlatBatchService flatBatchService;
+    private final ClusterSingletonServiceProvider clusterSingletonService;
+    private final UpdateTable updateTable;
+    private final ProcessFlatBatch processFlatBatch;
 
     /** Wildcard path to flow-capable-node augmentation of inventory node. */
     private static final InstanceIdentifier<FlowCapableNode> FLOW_CAPABLE_NODE_WC_PATH =
@@ -60,110 +62,85 @@ public class ForwardingRulesSyncProvider implements AutoCloseable, BindingAwareP
     private static final InstanceIdentifier<Node> NODE_WC_PATH =
             InstanceIdentifier.create(Nodes.class).child(Node.class);
 
-
     private final DataTreeIdentifier<FlowCapableNode> nodeConfigDataTreePath;
     private final DataTreeIdentifier<Node> nodeOperationalDataTreePath;
 
-    private ListenerRegistration<NodeListener> dataTreeConfigChangeListener;
-    private ListenerRegistration<NodeListener> dataTreeOperationalChangeListener;
-
-    public ForwardingRulesSyncProvider(final BindingAwareBroker broker,
-                                       final DataBroker dataBroker,
-                                       final RpcConsumerRegistry rpcRegistry) {
-        Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
-        this.dataService = Preconditions.checkNotNull(dataBroker, "DataBroker can not be null!");
-        this.salTableService = Preconditions.checkNotNull(rpcRegistry.getRpcService(SalTableService.class),
-                "RPC SalTableService not found.");
-        this.flatBatchService = Preconditions.checkNotNull(rpcRegistry.getRpcService(SalFlatBatchService.class),
+    private ListenerRegistration<?> dataTreeConfigChangeListener;
+    private ListenerRegistration<?> dataTreeOperationalChangeListener;
+
+    private final ExecutorService syncThreadPool;
+
+    @Inject
+    @Activate
+    public ForwardingRulesSyncProvider(@Reference final DataBroker dataBroker,
+            @Reference final RpcConsumerRegistry rpcRegistry,
+            @Reference final ClusterSingletonServiceProvider clusterSingletonService) {
+        requireNonNull(rpcRegistry, "RpcConsumerRegistry can not be null!");
+        dataService = requireNonNull(dataBroker, "DataBroker can not be null!");
+        this.clusterSingletonService = requireNonNull(clusterSingletonService,
+                "ClusterSingletonServiceProvider can not be null!");
+        updateTable = requireNonNull(rpcRegistry.getRpc(UpdateTable.class),
+                "RPC UpdateTable not found.");
+        processFlatBatch = requireNonNull(rpcRegistry.getRpc(ProcessFlatBatch.class),
                 "RPC SalFlatBatchService not found.");
 
-        nodeConfigDataTreePath = new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, FLOW_CAPABLE_NODE_WC_PATH);
-        nodeOperationalDataTreePath = new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, NODE_WC_PATH);
-
-        broker.registerProvider(this);
-    }
-
-    private final ListeningExecutorService syncThreadPool = FrmExecutors.instance()
-            // TODO improve log in ThreadPoolExecutor.afterExecute
-            // TODO max bloking queue size
-            // TODO core/min pool size
-            .newFixedThreadPool(6, new ThreadFactoryBuilder()
-                    .setNameFormat(SyncReactorFutureDecorator.FRM_RPC_CLIENT_PREFIX + "%d")
-                    .setDaemon(false)
-                    .setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
-                        @Override
-                        public void uncaughtException(Thread thread, Throwable e) {
-                            LOG.error("uncaught exception {}", thread, e);
-                        }
-                    })
-                    .build());
+        nodeConfigDataTreePath = DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION,
+                FLOW_CAPABLE_NODE_WC_PATH);
+        nodeOperationalDataTreePath = DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL, NODE_WC_PATH);
 
-    @Override
-    public void onSessionInitiated(final BindingAwareBroker.ProviderContext providerContext) {
-        final TableForwarder tableForwarder = new TableForwarder(salTableService);
+        syncThreadPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
+            .setNameFormat(FRS_EXECUTOR_PREFIX + "%d")
+            .setDaemon(false)
+            .setUncaughtExceptionHandler((thread, ex) -> LOG.error("Uncaught exception {}", thread, ex))
+            .build());
 
-        final SyncPlanPushStrategy syncPlanPushStrategy = new SyncPlanPushStrategyFlatBatchImpl()
-                .setFlatBatchService(flatBatchService)
-                .setTableForwarder(tableForwarder);
+        final var syncPlanPushStrategy = new SyncPlanPushStrategyFlatBatchImpl(processFlatBatch);
 
-        final RetryRegistry retryRegistry = new RetryRegistry();
+        final var reconciliationRegistry = new ReconciliationRegistry();
+        final var deviceMastershipManager = new DeviceMastershipManager(clusterSingletonService,
+            reconciliationRegistry);
 
-        final SyncReactor syncReactorImpl = new SyncReactorImpl(syncPlanPushStrategy);
-        final SyncReactor syncReactorRetry = new SyncReactorRetryDecorator(syncReactorImpl, retryRegistry);
-        final SyncReactor syncReactorGuard = new SyncReactorGuardDecorator(syncReactorRetry,
-                new SemaphoreKeeperGuavaImpl<InstanceIdentifier<FlowCapableNode>>(1, true));
+        final var syncReactorImpl = new SyncReactorImpl(syncPlanPushStrategy);
+        final var syncReactorRetry = new SyncReactorRetryDecorator(syncReactorImpl, reconciliationRegistry);
+        final var syncReactorGuard = new SyncReactorGuardDecorator(syncReactorRetry);
+        final var syncReactorFutureZip = new SyncReactorFutureZipDecorator(syncReactorGuard, syncThreadPool);
 
-        final SyncReactor reactor = new SyncReactorFutureZipDecorator(syncReactorGuard, syncThreadPool);
+        final var reactor = new SyncReactorClusterDecorator(syncReactorFutureZip, deviceMastershipManager);
 
-        final FlowCapableNodeSnapshotDao configSnapshot = new FlowCapableNodeSnapshotDao();
-        final FlowCapableNodeSnapshotDao operationalSnapshot = new FlowCapableNodeSnapshotDao();
-        final FlowCapableNodeDao configDao = new FlowCapableNodeCachedDao(configSnapshot,
+        final var configSnapshot = new FlowCapableNodeSnapshotDao();
+        final var operationalSnapshot = new FlowCapableNodeSnapshotDao();
+        final var configDao = new FlowCapableNodeCachedDao(configSnapshot,
                 new FlowCapableNodeOdlDao(dataService, LogicalDatastoreType.CONFIGURATION));
-        final FlowCapableNodeDao operationalDao = new FlowCapableNodeCachedDao(operationalSnapshot,
+        final var operationalDao = new FlowCapableNodeCachedDao(operationalSnapshot,
                 new FlowCapableNodeOdlDao(dataService, LogicalDatastoreType.OPERATIONAL));
 
         final NodeListener<FlowCapableNode> nodeListenerConfig =
                 new SimplifiedConfigListener(reactor, configSnapshot, operationalDao);
-        final NodeListener<Node> nodeListenerOperational =
-                new SimplifiedOperationalRetryListener(reactor, operationalSnapshot, configDao, retryRegistry);
-
-        try {
-            SimpleTaskRetryLooper looper1 = new SimpleTaskRetryLooper(STARTUP_LOOP_TICK, STARTUP_LOOP_MAX_RETRIES);
-            dataTreeConfigChangeListener = looper1.loopUntilNoException(
-                    new Callable<ListenerRegistration<NodeListener>>() {
-                        @Override
-                        public ListenerRegistration<NodeListener> call() throws Exception {
-                            return dataService.registerDataTreeChangeListener(
-                                    nodeConfigDataTreePath, nodeListenerConfig);
-                        }
-                    });
-
-            SimpleTaskRetryLooper looper2 = new SimpleTaskRetryLooper(STARTUP_LOOP_TICK, STARTUP_LOOP_MAX_RETRIES);
-            dataTreeOperationalChangeListener = looper2.loopUntilNoException(
-                    new Callable<ListenerRegistration<NodeListener>>() {
-                        @Override
-                        public ListenerRegistration<NodeListener> call() throws Exception {
-                            return dataService.registerDataTreeChangeListener(
-                                    nodeOperationalDataTreePath, nodeListenerOperational);
-                        }
-                    });
-        } catch (final Exception e) {
-            LOG.warn("FR-Sync node DataChange listener registration fail!", e);
-            throw new IllegalStateException("FR-Sync startup fail!", e);
-        }
-        LOG.info("ForwardingRulesSync has started.");
+        final NodeListener<Node> nodeListenerOperational = new SimplifiedOperationalListener(reactor,
+                operationalSnapshot, configDao, reconciliationRegistry, deviceMastershipManager);
+
+        dataTreeConfigChangeListener =
+                dataService.registerDataTreeChangeListener(nodeConfigDataTreePath, nodeListenerConfig);
+        dataTreeOperationalChangeListener =
+                dataService.registerDataTreeChangeListener(nodeOperationalDataTreePath, nodeListenerOperational);
+        LOG.info("ForwardingRulesSync started");
     }
 
-    public void close() throws Exception {
+    @PreDestroy
+    @Deactivate
+    @Override
+    public void close() {
         if (dataTreeConfigChangeListener != null) {
             dataTreeConfigChangeListener.close();
             dataTreeConfigChangeListener = null;
         }
+
         if (dataTreeOperationalChangeListener != null) {
             dataTreeOperationalChangeListener.close();
             dataTreeOperationalChangeListener = null;
         }
 
         syncThreadPool.shutdown();
+        LOG.info("ForwardingRulesSync stopped");
     }
 }