Merge "Bug 5577 Retry mechanism"
[openflowplugin.git] / applications / forwardingrules-sync / src / main / java / org / opendaylight / openflowplugin / applications / frsync / impl / ForwardingRulesSyncProvider.java
index 57c404b6d8cca5c7a0913e960e1e8cb5f189aa12..9376f8320e079121629197966fdd2b2404aa6edb 100644 (file)
@@ -27,16 +27,13 @@ import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeDa
 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeOdlDao;
 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeSnapshotDao;
 import org.opendaylight.openflowplugin.applications.frsync.impl.strategy.SyncPlanPushStrategyFlatBatchImpl;
+import org.opendaylight.openflowplugin.applications.frsync.util.RetryRegistry;
 import org.opendaylight.openflowplugin.applications.frsync.util.SemaphoreKeeperGuavaImpl;
 import org.opendaylight.openflowplugin.common.wait.SimpleTaskRetryLooper;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.SalFlatBatchService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.SalGroupService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.SalMeterService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.SalTableService;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
@@ -53,11 +50,7 @@ public class ForwardingRulesSyncProvider implements AutoCloseable, BindingAwareP
     public static final int STARTUP_LOOP_MAX_RETRIES = 8;
 
     private final DataBroker dataService;
-    private final SalFlowService salFlowService;
-    private final SalGroupService salGroupService;
-    private final SalMeterService salMeterService;
     private final SalTableService salTableService;
-    private final FlowCapableTransactionService transactionService;
     private final SalFlatBatchService flatBatchService;
 
     /** wildcard path to flow-capable-node augmentation of inventory node */
@@ -74,31 +67,17 @@ public class ForwardingRulesSyncProvider implements AutoCloseable, BindingAwareP
     private ListenerRegistration<NodeListener> dataTreeConfigChangeListener;
     private ListenerRegistration<NodeListener> dataTreeOperationalChangeListener;
 
-
     public ForwardingRulesSyncProvider(final BindingAwareBroker broker,
                                        final DataBroker dataBroker,
                                        final RpcConsumerRegistry rpcRegistry) {
-        this.dataService = Preconditions.checkNotNull(dataBroker, "DataBroker can not be null!");
-
         Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
-
-        this.salFlowService = Preconditions.checkNotNull(rpcRegistry.getRpcService(SalFlowService.class),
-                "RPC SalFlowService not found.");
-        this.salGroupService = Preconditions.checkNotNull(rpcRegistry.getRpcService(SalGroupService.class),
-                "RPC SalGroupService not found.");
-        this.salMeterService = Preconditions.checkNotNull(rpcRegistry.getRpcService(SalMeterService.class),
-                "RPC SalMeterService not found.");
+        this.dataService = Preconditions.checkNotNull(dataBroker, "DataBroker can not be null!");
         this.salTableService = Preconditions.checkNotNull(rpcRegistry.getRpcService(SalTableService.class),
                 "RPC SalTableService not found.");
-        this.transactionService =
-                Preconditions.checkNotNull(rpcRegistry.getRpcService(FlowCapableTransactionService.class),
-                        "RPC SalTableService not found.");
-        this.flatBatchService =
-                Preconditions.checkNotNull(rpcRegistry.getRpcService(SalFlatBatchService.class),
-                        "RPC SalFlatBatchService not found.");
-
-        nodeConfigDataTreePath =
-                new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, FLOW_CAPABLE_NODE_WC_PATH);
+        this.flatBatchService = Preconditions.checkNotNull(rpcRegistry.getRpcService(SalFlatBatchService.class),
+                "RPC SalFlatBatchService not found.");
+
+        nodeConfigDataTreePath = new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, FLOW_CAPABLE_NODE_WC_PATH);
         nodeOperationalDataTreePath = new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, NODE_WC_PATH);
 
         broker.registerProvider(this);
@@ -121,69 +100,56 @@ public class ForwardingRulesSyncProvider implements AutoCloseable, BindingAwareP
 
     @Override
     public void onSessionInitiated(final BindingAwareBroker.ProviderContext providerContext) {
-        final FlowForwarder flowForwarder = new FlowForwarder(salFlowService);
-        final GroupForwarder groupForwarder = new GroupForwarder(salGroupService);
-        final MeterForwarder meterForwarder = new MeterForwarder(salMeterService);
         final TableForwarder tableForwarder = new TableForwarder(salTableService);
 
-        {
-            //TODO: make is switchable
-//            final SyncPlanPushStrategy syncPlanPushStrategy = new SyncPlanPushStrategyIncrementalImpl()
-//                    .setFlowForwarder(flowForwarder)
-//                    .setGroupForwarder(groupForwarder)
-//                    .setMeterForwarder(meterForwarder)
-//                    .setTableForwarder(tableForwarder)
-//                    .setTransactionService(transactionService);
-
-            final SyncPlanPushStrategy syncPlanPushStrategy = new SyncPlanPushStrategyFlatBatchImpl()
-                    .setFlatBatchService(flatBatchService)
-                    .setTableForwarder(tableForwarder);
-
-            final SyncReactorImpl syncReactorImpl = new SyncReactorImpl(syncPlanPushStrategy);
-            final SyncReactor syncReactorGuard = new SyncReactorGuardDecorator(syncReactorImpl,
-                    new SemaphoreKeeperGuavaImpl<InstanceIdentifier<FlowCapableNode>>(1, true));
-
-            final SyncReactor cfgReactor = new SyncReactorFutureWithCompressionDecorator(syncReactorGuard, syncThreadPool);
-            final SyncReactor operReactor = new SyncReactorFutureWithCompressionDecorator(syncReactorGuard, syncThreadPool);
-
-            final FlowCapableNodeSnapshotDao configSnapshot = new FlowCapableNodeSnapshotDao();
-            final FlowCapableNodeSnapshotDao operationalSnapshot = new FlowCapableNodeSnapshotDao();
-            final FlowCapableNodeDao configDao = new FlowCapableNodeCachedDao(configSnapshot,
-                    new FlowCapableNodeOdlDao(dataService, LogicalDatastoreType.CONFIGURATION));
-            final FlowCapableNodeDao operationalDao = new FlowCapableNodeCachedDao(operationalSnapshot,
-                    new FlowCapableNodeOdlDao(dataService, LogicalDatastoreType.OPERATIONAL));
-
-            final NodeListener<FlowCapableNode> nodeListenerConfig =
-                    new SimplifiedConfigListener(
-                            cfgReactor,
-                            configSnapshot, operationalDao);
-            final NodeListener<Node> nodeListenerOperational =
-                    new SimplifiedOperationalListener(operReactor, operationalSnapshot, configDao);
-
-            try {
-                SimpleTaskRetryLooper looper1 = new SimpleTaskRetryLooper(STARTUP_LOOP_TICK, STARTUP_LOOP_MAX_RETRIES);
-                dataTreeConfigChangeListener = looper1.loopUntilNoException(
-                        new Callable<ListenerRegistration<NodeListener>>() {
-                            @Override
-                            public ListenerRegistration<NodeListener> call() throws Exception {
-                                return dataService.registerDataTreeChangeListener(
-                                        nodeConfigDataTreePath, nodeListenerConfig);
-                            }
-                        });
-
-                SimpleTaskRetryLooper looper2 = new SimpleTaskRetryLooper(STARTUP_LOOP_TICK, STARTUP_LOOP_MAX_RETRIES);
-                dataTreeOperationalChangeListener = looper2.loopUntilNoException(
-                        new Callable<ListenerRegistration<NodeListener>>() {
-                            @Override
-                            public ListenerRegistration<NodeListener> call() throws Exception {
-                                return dataService.registerDataTreeChangeListener(
-                                        nodeOperationalDataTreePath, nodeListenerOperational);
-                            }
-                        });
-            } catch (final Exception e) {
-                LOG.warn("FR-Sync node DataChange listener registration fail!", e);
-                throw new IllegalStateException("FR-Sync startup fail!", e);
-            }
+        final SyncPlanPushStrategy syncPlanPushStrategy = new SyncPlanPushStrategyFlatBatchImpl()
+                .setFlatBatchService(flatBatchService)
+                .setTableForwarder(tableForwarder);
+
+        final RetryRegistry retryRegistry = new RetryRegistry();
+
+        final SyncReactor syncReactorImpl = new SyncReactorImpl(syncPlanPushStrategy);
+        final SyncReactor syncReactorRetry = new SyncReactorRetryDecorator(syncReactorImpl, retryRegistry);
+        final SyncReactor syncReactorGuard = new SyncReactorGuardDecorator(syncReactorRetry,
+                new SemaphoreKeeperGuavaImpl<InstanceIdentifier<FlowCapableNode>>(1, true));
+
+        final SyncReactor reactor = new SyncReactorFutureZipDecorator(syncReactorGuard, syncThreadPool);
+
+        final FlowCapableNodeSnapshotDao configSnapshot = new FlowCapableNodeSnapshotDao();
+        final FlowCapableNodeSnapshotDao operationalSnapshot = new FlowCapableNodeSnapshotDao();
+        final FlowCapableNodeDao configDao = new FlowCapableNodeCachedDao(configSnapshot,
+                new FlowCapableNodeOdlDao(dataService, LogicalDatastoreType.CONFIGURATION));
+        final FlowCapableNodeDao operationalDao = new FlowCapableNodeCachedDao(operationalSnapshot,
+                new FlowCapableNodeOdlDao(dataService, LogicalDatastoreType.OPERATIONAL));
+
+        final NodeListener<FlowCapableNode> nodeListenerConfig =
+                new SimplifiedConfigListener(reactor, configSnapshot, operationalDao);
+        final NodeListener<Node> nodeListenerOperational =
+                new SimplifiedOperationalRetryListener(reactor, operationalSnapshot, configDao, retryRegistry);
+
+        try {
+            SimpleTaskRetryLooper looper1 = new SimpleTaskRetryLooper(STARTUP_LOOP_TICK, STARTUP_LOOP_MAX_RETRIES);
+            dataTreeConfigChangeListener = looper1.loopUntilNoException(
+                    new Callable<ListenerRegistration<NodeListener>>() {
+                        @Override
+                        public ListenerRegistration<NodeListener> call() throws Exception {
+                            return dataService.registerDataTreeChangeListener(
+                                    nodeConfigDataTreePath, nodeListenerConfig);
+                        }
+                    });
+
+            SimpleTaskRetryLooper looper2 = new SimpleTaskRetryLooper(STARTUP_LOOP_TICK, STARTUP_LOOP_MAX_RETRIES);
+            dataTreeOperationalChangeListener = looper2.loopUntilNoException(
+                    new Callable<ListenerRegistration<NodeListener>>() {
+                        @Override
+                        public ListenerRegistration<NodeListener> call() throws Exception {
+                            return dataService.registerDataTreeChangeListener(
+                                    nodeOperationalDataTreePath, nodeListenerOperational);
+                        }
+                    });
+        } catch (final Exception e) {
+            LOG.warn("FR-Sync node DataChange listener registration fail!", e);
+            throw new IllegalStateException("FR-Sync startup fail!", e);
         }
         LOG.info("ForwardingRulesSync has started.");
     }