import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeOdlDao;
import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeSnapshotDao;
import org.opendaylight.openflowplugin.applications.frsync.impl.strategy.SyncPlanPushStrategyFlatBatchImpl;
+import org.opendaylight.openflowplugin.applications.frsync.util.RetryRegistry;
import org.opendaylight.openflowplugin.applications.frsync.util.SemaphoreKeeperGuavaImpl;
import org.opendaylight.openflowplugin.common.wait.SimpleTaskRetryLooper;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.SalFlatBatchService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.SalGroupService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.SalMeterService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.SalTableService;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
public static final int STARTUP_LOOP_MAX_RETRIES = 8;
private final DataBroker dataService;
- private final SalFlowService salFlowService;
- private final SalGroupService salGroupService;
- private final SalMeterService salMeterService;
private final SalTableService salTableService;
- private final FlowCapableTransactionService transactionService;
private final SalFlatBatchService flatBatchService;
/** wildcard path to flow-capable-node augmentation of inventory node */
private ListenerRegistration<NodeListener> dataTreeConfigChangeListener;
private ListenerRegistration<NodeListener> dataTreeOperationalChangeListener;
-
public ForwardingRulesSyncProvider(final BindingAwareBroker broker,
final DataBroker dataBroker,
final RpcConsumerRegistry rpcRegistry) {
- this.dataService = Preconditions.checkNotNull(dataBroker, "DataBroker can not be null!");
-
Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
-
- this.salFlowService = Preconditions.checkNotNull(rpcRegistry.getRpcService(SalFlowService.class),
- "RPC SalFlowService not found.");
- this.salGroupService = Preconditions.checkNotNull(rpcRegistry.getRpcService(SalGroupService.class),
- "RPC SalGroupService not found.");
- this.salMeterService = Preconditions.checkNotNull(rpcRegistry.getRpcService(SalMeterService.class),
- "RPC SalMeterService not found.");
+ this.dataService = Preconditions.checkNotNull(dataBroker, "DataBroker can not be null!");
this.salTableService = Preconditions.checkNotNull(rpcRegistry.getRpcService(SalTableService.class),
"RPC SalTableService not found.");
- this.transactionService =
- Preconditions.checkNotNull(rpcRegistry.getRpcService(FlowCapableTransactionService.class),
- "RPC SalTableService not found.");
- this.flatBatchService =
- Preconditions.checkNotNull(rpcRegistry.getRpcService(SalFlatBatchService.class),
- "RPC SalFlatBatchService not found.");
-
- nodeConfigDataTreePath =
- new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, FLOW_CAPABLE_NODE_WC_PATH);
+ this.flatBatchService = Preconditions.checkNotNull(rpcRegistry.getRpcService(SalFlatBatchService.class),
+ "RPC SalFlatBatchService not found.");
+
+ nodeConfigDataTreePath = new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, FLOW_CAPABLE_NODE_WC_PATH);
nodeOperationalDataTreePath = new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, NODE_WC_PATH);
broker.registerProvider(this);
@Override
public void onSessionInitiated(final BindingAwareBroker.ProviderContext providerContext) {
- final FlowForwarder flowForwarder = new FlowForwarder(salFlowService);
- final GroupForwarder groupForwarder = new GroupForwarder(salGroupService);
- final MeterForwarder meterForwarder = new MeterForwarder(salMeterService);
final TableForwarder tableForwarder = new TableForwarder(salTableService);
- {
- //TODO: make is switchable
-// final SyncPlanPushStrategy syncPlanPushStrategy = new SyncPlanPushStrategyIncrementalImpl()
-// .setFlowForwarder(flowForwarder)
-// .setGroupForwarder(groupForwarder)
-// .setMeterForwarder(meterForwarder)
-// .setTableForwarder(tableForwarder)
-// .setTransactionService(transactionService);
-
- final SyncPlanPushStrategy syncPlanPushStrategy = new SyncPlanPushStrategyFlatBatchImpl()
- .setFlatBatchService(flatBatchService)
- .setTableForwarder(tableForwarder);
-
- final SyncReactorImpl syncReactorImpl = new SyncReactorImpl(syncPlanPushStrategy);
- final SyncReactor syncReactorGuard = new SyncReactorGuardDecorator(syncReactorImpl,
- new SemaphoreKeeperGuavaImpl<InstanceIdentifier<FlowCapableNode>>(1, true));
-
- final SyncReactor cfgReactor = new SyncReactorFutureWithCompressionDecorator(syncReactorGuard, syncThreadPool);
- final SyncReactor operReactor = new SyncReactorFutureWithCompressionDecorator(syncReactorGuard, syncThreadPool);
-
- final FlowCapableNodeSnapshotDao configSnapshot = new FlowCapableNodeSnapshotDao();
- final FlowCapableNodeSnapshotDao operationalSnapshot = new FlowCapableNodeSnapshotDao();
- final FlowCapableNodeDao configDao = new FlowCapableNodeCachedDao(configSnapshot,
- new FlowCapableNodeOdlDao(dataService, LogicalDatastoreType.CONFIGURATION));
- final FlowCapableNodeDao operationalDao = new FlowCapableNodeCachedDao(operationalSnapshot,
- new FlowCapableNodeOdlDao(dataService, LogicalDatastoreType.OPERATIONAL));
-
- final NodeListener<FlowCapableNode> nodeListenerConfig =
- new SimplifiedConfigListener(
- cfgReactor,
- configSnapshot, operationalDao);
- final NodeListener<Node> nodeListenerOperational =
- new SimplifiedOperationalListener(operReactor, operationalSnapshot, configDao);
-
- try {
- SimpleTaskRetryLooper looper1 = new SimpleTaskRetryLooper(STARTUP_LOOP_TICK, STARTUP_LOOP_MAX_RETRIES);
- dataTreeConfigChangeListener = looper1.loopUntilNoException(
- new Callable<ListenerRegistration<NodeListener>>() {
- @Override
- public ListenerRegistration<NodeListener> call() throws Exception {
- return dataService.registerDataTreeChangeListener(
- nodeConfigDataTreePath, nodeListenerConfig);
- }
- });
-
- SimpleTaskRetryLooper looper2 = new SimpleTaskRetryLooper(STARTUP_LOOP_TICK, STARTUP_LOOP_MAX_RETRIES);
- dataTreeOperationalChangeListener = looper2.loopUntilNoException(
- new Callable<ListenerRegistration<NodeListener>>() {
- @Override
- public ListenerRegistration<NodeListener> call() throws Exception {
- return dataService.registerDataTreeChangeListener(
- nodeOperationalDataTreePath, nodeListenerOperational);
- }
- });
- } catch (final Exception e) {
- LOG.warn("FR-Sync node DataChange listener registration fail!", e);
- throw new IllegalStateException("FR-Sync startup fail!", e);
- }
+ final SyncPlanPushStrategy syncPlanPushStrategy = new SyncPlanPushStrategyFlatBatchImpl()
+ .setFlatBatchService(flatBatchService)
+ .setTableForwarder(tableForwarder);
+
+ final RetryRegistry retryRegistry = new RetryRegistry();
+
+ final SyncReactor syncReactorImpl = new SyncReactorImpl(syncPlanPushStrategy);
+ final SyncReactor syncReactorRetry = new SyncReactorRetryDecorator(syncReactorImpl, retryRegistry);
+ final SyncReactor syncReactorGuard = new SyncReactorGuardDecorator(syncReactorRetry,
+ new SemaphoreKeeperGuavaImpl<InstanceIdentifier<FlowCapableNode>>(1, true));
+
+ final SyncReactor reactor = new SyncReactorFutureZipDecorator(syncReactorGuard, syncThreadPool);
+
+ final FlowCapableNodeSnapshotDao configSnapshot = new FlowCapableNodeSnapshotDao();
+ final FlowCapableNodeSnapshotDao operationalSnapshot = new FlowCapableNodeSnapshotDao();
+ final FlowCapableNodeDao configDao = new FlowCapableNodeCachedDao(configSnapshot,
+ new FlowCapableNodeOdlDao(dataService, LogicalDatastoreType.CONFIGURATION));
+ final FlowCapableNodeDao operationalDao = new FlowCapableNodeCachedDao(operationalSnapshot,
+ new FlowCapableNodeOdlDao(dataService, LogicalDatastoreType.OPERATIONAL));
+
+ final NodeListener<FlowCapableNode> nodeListenerConfig =
+ new SimplifiedConfigListener(reactor, configSnapshot, operationalDao);
+ final NodeListener<Node> nodeListenerOperational =
+ new SimplifiedOperationalRetryListener(reactor, operationalSnapshot, configDao, retryRegistry);
+
+ try {
+ SimpleTaskRetryLooper looper1 = new SimpleTaskRetryLooper(STARTUP_LOOP_TICK, STARTUP_LOOP_MAX_RETRIES);
+ dataTreeConfigChangeListener = looper1.loopUntilNoException(
+ new Callable<ListenerRegistration<NodeListener>>() {
+ @Override
+ public ListenerRegistration<NodeListener> call() throws Exception {
+ return dataService.registerDataTreeChangeListener(
+ nodeConfigDataTreePath, nodeListenerConfig);
+ }
+ });
+
+ SimpleTaskRetryLooper looper2 = new SimpleTaskRetryLooper(STARTUP_LOOP_TICK, STARTUP_LOOP_MAX_RETRIES);
+ dataTreeOperationalChangeListener = looper2.loopUntilNoException(
+ new Callable<ListenerRegistration<NodeListener>>() {
+ @Override
+ public ListenerRegistration<NodeListener> call() throws Exception {
+ return dataService.registerDataTreeChangeListener(
+ nodeOperationalDataTreePath, nodeListenerOperational);
+ }
+ });
+ } catch (final Exception e) {
+ LOG.warn("FR-Sync node DataChange listener registration fail!", e);
+ throw new IllegalStateException("FR-Sync startup fail!", e);
}
LOG.info("ForwardingRulesSync has started.");
}