*/
package org.opendaylight.openflowplugin.applications.frsync.impl.strategy;
+import static java.util.Objects.requireNonNull;
+
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.SendBarrier;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.UpdateTableOutput;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
-import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.ErrorType;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.slf4j.Logger;
* Execute CRUD API for flow + group + meter involving one-by-one (incremental) strategy.
*/
public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy {
-
private static final Logger LOG = LoggerFactory.getLogger(SyncPlanPushStrategyIncrementalImpl.class);
- private FlowForwarder flowForwarder;
- private MeterForwarder meterForwarder;
- private GroupForwarder groupForwarder;
- private TableForwarder tableForwarder;
- private FlowCapableTransactionService transactionService;
+ private final FlowForwarder flowForwarder;
+ private final MeterForwarder meterForwarder;
+ private final GroupForwarder groupForwarder;
+ private final SendBarrier sendBarrier;
+
+ public SyncPlanPushStrategyIncrementalImpl(final FlowForwarder flowForwarder, final MeterForwarder meterForwarder,
+ final GroupForwarder groupForwarder, final SendBarrier sendBarrier) {
+ this.flowForwarder = requireNonNull(flowForwarder);
+ this.meterForwarder = requireNonNull(meterForwarder);
+ this.groupForwarder = requireNonNull(groupForwarder);
+ this.sendBarrier = requireNonNull(sendBarrier);
+ }
@Override
public ListenableFuture<RpcResult<Void>> executeSyncStrategy(ListenableFuture<RpcResult<Void>> resultVehicle,
- final SynchronizationDiffInput diffInput,
- final SyncCrudCounters counters) {
+ final SynchronizationDiffInput diffInput, final SyncCrudCounters counters) {
final InstanceIdentifier<FlowCapableNode> nodeIdent = diffInput.getNodeIdent();
final NodeId nodeId = PathUtil.digNodeId(nodeIdent);
// TODO enable table-update when ready
//resultVehicle = updateTableFeatures(nodeIdent, configTree);
- resultVehicle = Futures.transformAsync(resultVehicle, input -> {
- // if (!input.isSuccessful()) {
- //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
- //final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
- // Futures.asList Arrays.asList(input, output),
- // ReconcileUtil.<UpdateFlowOutput>createRpcResultCondenser("TODO"));
- // }
- return addMissingGroups(nodeId, nodeIdent, diffInput.getGroupsToAddOrUpdate(), counters);
- }, MoreExecutors.directExecutor());
+ resultVehicle = Futures.transformAsync(resultVehicle,
+ input -> addMissingGroups(nodeId, nodeIdent, diffInput.getGroupsToAddOrUpdate(), counters),
+ MoreExecutors.directExecutor());
Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingGroups"),
- MoreExecutors.directExecutor());
- resultVehicle = Futures.transformAsync(resultVehicle, input -> {
- // if (!input.isSuccessful()) {
- //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
- // }
- return addMissingMeters(nodeId, nodeIdent, diffInput.getMetersToAddOrUpdate(), counters);
- }, MoreExecutors.directExecutor());
+ MoreExecutors.directExecutor());
+ resultVehicle = Futures.transformAsync(resultVehicle,
+ input -> addMissingMeters(nodeId, nodeIdent, diffInput.getMetersToAddOrUpdate(), counters),
+ MoreExecutors.directExecutor());
Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingMeters"),
- MoreExecutors.directExecutor());
- resultVehicle = Futures.transformAsync(resultVehicle, input -> {
- // if (!input.isSuccessful()) {
- //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
- // }
- return addMissingFlows(nodeId, nodeIdent, diffInput.getFlowsToAddOrUpdate(), counters);
- }, MoreExecutors.directExecutor());
+ MoreExecutors.directExecutor());
+ resultVehicle = Futures.transformAsync(resultVehicle,
+ input -> addMissingFlows(nodeId, nodeIdent, diffInput.getFlowsToAddOrUpdate(), counters),
+ MoreExecutors.directExecutor());
Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingFlows"),
- MoreExecutors.directExecutor());
+ MoreExecutors.directExecutor());
-
- resultVehicle = Futures.transformAsync(resultVehicle, input -> {
- // if (!input.isSuccessful()) {
- //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
- // }
- return removeRedundantFlows(nodeId, nodeIdent, diffInput.getFlowsToRemove(), counters);
- }, MoreExecutors.directExecutor());
+ resultVehicle = Futures.transformAsync(resultVehicle,
+ input -> removeRedundantFlows(nodeId, nodeIdent, diffInput.getFlowsToRemove(), counters),
+ MoreExecutors.directExecutor());
Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantFlows"),
- MoreExecutors.directExecutor());
- resultVehicle = Futures.transformAsync(resultVehicle, input -> {
- // if (!input.isSuccessful()) {
- //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
- // }
- return removeRedundantMeters(nodeId, nodeIdent, diffInput.getMetersToRemove(), counters);
- }, MoreExecutors.directExecutor());
+ MoreExecutors.directExecutor());
+ resultVehicle = Futures.transformAsync(resultVehicle,
+ input -> removeRedundantMeters(nodeId, nodeIdent, diffInput.getMetersToRemove(), counters),
+ MoreExecutors.directExecutor());
Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantMeters"),
- MoreExecutors.directExecutor());
- resultVehicle = Futures.transformAsync(resultVehicle, input -> {
- // if (!input.isSuccessful()) {
- //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
- // }
- return removeRedundantGroups(nodeId, nodeIdent, diffInput.getGroupsToRemove(), counters);
- }, MoreExecutors.directExecutor());
+ MoreExecutors.directExecutor());
+ resultVehicle = Futures.transformAsync(resultVehicle,
+ input -> removeRedundantGroups(nodeId, nodeIdent, diffInput.getGroupsToRemove(), counters),
+ MoreExecutors.directExecutor());
Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantGroups"),
- MoreExecutors.directExecutor());
+ MoreExecutors.directExecutor());
return resultVehicle;
}
ListenableFuture<RpcResult<Void>> addMissingFlows(final NodeId nodeId,
- final InstanceIdentifier<FlowCapableNode> nodeIdent,
- final Map<TableKey, ItemSyncBox<Flow>> flowsInTablesSyncBox,
- final SyncCrudCounters counters) {
+ final InstanceIdentifier<FlowCapableNode> nodeIdent,
+ final Map<TableKey, ItemSyncBox<Flow>> flowsInTablesSyncBox, final SyncCrudCounters counters) {
if (flowsInTablesSyncBox.isEmpty()) {
LOG.trace("no tables in config for node: {} -> SKIPPING", nodeId.getValue());
return RpcResultBuilder.<Void>success().buildFuture();
MoreExecutors.directExecutor());
return Futures.transformAsync(singleVoidResult,
- ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService),
+ ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), sendBarrier),
MoreExecutors.directExecutor());
}
}
} catch (IllegalStateException e) {
chainedResult = RpcResultBuilder.<Void>failed()
- .withError(RpcError.ErrorType.APPLICATION, "failed to add missing groups", e)
+ .withError(ErrorType.APPLICATION, "failed to add missing groups", e)
.buildFuture();
}
MoreExecutors.directExecutor());
return Futures.transformAsync(singleVoidResult,
- ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService),
+ ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), sendBarrier),
MoreExecutors.directExecutor());
}
// // at
// // org.opendaylight.openflowjava.protocol.impl.serialization.match.AbstractOxmMatchEntrySerializer
// // .serializeHeader(AbstractOxmMatchEntrySerializer.java:31
-// // allResults.add(JdkFutureAdapters.listenInPoolThread(
-// // tableForwarder.update(tableFeaturesII, null, tableFeaturesItem, nodeIdent)));
+// // allResults.add(
+// // tableForwarder.update(tableFeaturesII, null, tableFeaturesItem, nodeIdent));
// }
// }
// }
MoreExecutors.directExecutor());
return Futures.transformAsync(singleVoidResult,
- ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService),
+ ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), sendBarrier),
MoreExecutors.directExecutor());
}
return Futures.transformAsync(summaryResult, ReconcileUtil.chainBarrierFlush(
- PathUtil.digNodePath(nodeIdent), transactionService), MoreExecutors.directExecutor());
+ PathUtil.digNodePath(nodeIdent), sendBarrier), MoreExecutors.directExecutor());
}
ListenableFuture<RpcResult<Void>> addMissingMeters(final NodeId nodeId,
}
} catch (IllegalStateException e) {
chainedResult = RpcResultBuilder.<Void>failed()
- .withError(RpcError.ErrorType.APPLICATION, "failed to add missing groups", e)
+ .withError(ErrorType.APPLICATION, "failed to add missing groups", e)
.buildFuture();
}
return chainedResult;
}
-
-
- public SyncPlanPushStrategyIncrementalImpl setFlowForwarder(final FlowForwarder flowForwarder) {
- this.flowForwarder = flowForwarder;
- return this;
- }
-
- public SyncPlanPushStrategyIncrementalImpl setTableForwarder(final TableForwarder tableForwarder) {
- this.tableForwarder = tableForwarder;
- return this;
- }
-
- public SyncPlanPushStrategyIncrementalImpl setMeterForwarder(final MeterForwarder meterForwarder) {
- this.meterForwarder = meterForwarder;
- return this;
- }
-
- public SyncPlanPushStrategyIncrementalImpl setGroupForwarder(final GroupForwarder groupForwarder) {
- this.groupForwarder = groupForwarder;
- return this;
- }
-
- public SyncPlanPushStrategyIncrementalImpl setTransactionService(
- final FlowCapableTransactionService transactionService) {
- this.transactionService = transactionService;
- return this;
- }
-
}