import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import com.google.common.collect.Range;
-import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.JdkFutureAdapters;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
-import javax.annotation.Nullable;
+import javax.annotation.Nonnull;
import org.opendaylight.openflowplugin.applications.frsync.SyncPlanPushStrategy;
import org.opendaylight.openflowplugin.applications.frsync.util.ItemSyncBox;
import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
counters.getGroupCrudCounts().setUpdated(ReconcileUtil.countTotalUpdated(diffInput.getGroupsToAddOrUpdate()));
counters.getGroupCrudCounts().setRemoved(ReconcileUtil.countTotalPushed(diffInput.getGroupsToRemove()));
- counters.getFlowCrudCounts().setAdded(ReconcileUtil.countTotalPushed(diffInput.getFlowsToAddOrUpdate().values()));
- counters.getFlowCrudCounts().setUpdated(ReconcileUtil.countTotalUpdated(diffInput.getFlowsToAddOrUpdate().values()));
+ counters.getFlowCrudCounts().setAdded(ReconcileUtil.countTotalPushed(
+ diffInput.getFlowsToAddOrUpdate().values()));
+ counters.getFlowCrudCounts().setUpdated(ReconcileUtil.countTotalUpdated(
+ diffInput.getFlowsToAddOrUpdate().values()));
counters.getFlowCrudCounts().setRemoved(ReconcileUtil.countTotalPushed(diffInput.getFlowsToRemove().values()));
counters.getMeterCrudCounts().setAdded(diffInput.getMetersToAddOrUpdate().getItemsToPush().size());
// TODO enable table-update when ready
//resultVehicle = updateTableFeatures(nodeIdent, configTree);
- resultVehicle = Futures.transformAsync(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
- @Override
- public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
- final List<Batch> batchBag = new ArrayList<>();
- int batchOrder = 0;
+ resultVehicle = Futures.transformAsync(resultVehicle, input -> {
+ final List<Batch> batchBag = new ArrayList<>();
+ int batchOrder = 0;
- batchOrder = assembleAddOrUpdateGroups(batchBag, batchOrder, diffInput.getGroupsToAddOrUpdate());
- batchOrder = assembleAddOrUpdateMeters(batchBag, batchOrder, diffInput.getMetersToAddOrUpdate());
- batchOrder = assembleAddOrUpdateFlows(batchBag, batchOrder, diffInput.getFlowsToAddOrUpdate());
+ batchOrder = assembleAddOrUpdateGroups(batchBag, batchOrder, diffInput.getGroupsToAddOrUpdate());
+ batchOrder = assembleAddOrUpdateMeters(batchBag, batchOrder, diffInput.getMetersToAddOrUpdate());
+ batchOrder = assembleAddOrUpdateFlows(batchBag, batchOrder, diffInput.getFlowsToAddOrUpdate());
- batchOrder = assembleRemoveFlows(batchBag, batchOrder, diffInput.getFlowsToRemove());
- batchOrder = assembleRemoveMeters(batchBag, batchOrder, diffInput.getMetersToRemove());
- batchOrder = assembleRemoveGroups(batchBag, batchOrder, diffInput.getGroupsToRemove());
+ batchOrder = assembleRemoveFlows(batchBag, batchOrder, diffInput.getFlowsToRemove());
+ batchOrder = assembleRemoveMeters(batchBag, batchOrder, diffInput.getMetersToRemove());
+ batchOrder = assembleRemoveGroups(batchBag, batchOrder, diffInput.getGroupsToRemove());
- LOG.trace("Index of last batch step: {}", batchOrder);
+ LOG.trace("Index of last batch step: {}", batchOrder);
- final ProcessFlatBatchInput flatBatchInput = new ProcessFlatBatchInputBuilder()
- .setNode(new NodeRef(PathUtil.digNodePath(diffInput.getNodeIdent())))
- // TODO: propagate from input
- .setExitOnFirstError(false)
- .setBatch(batchBag)
- .build();
+ final ProcessFlatBatchInput flatBatchInput = new ProcessFlatBatchInputBuilder()
+ .setNode(new NodeRef(PathUtil.digNodePath(diffInput.getNodeIdent())))
+ // TODO: propagate from input
+ .setExitOnFirstError(false)
+ .setBatch(batchBag)
+ .build();
- final Future<RpcResult<ProcessFlatBatchOutput>> rpcResultFuture = flatBatchService.processFlatBatch(flatBatchInput);
+ final Future<RpcResult<ProcessFlatBatchOutput>> rpcResultFuture =
+ flatBatchService.processFlatBatch(flatBatchInput);
- if (LOG.isDebugEnabled()) {
- Futures.addCallback(JdkFutureAdapters.listenInPoolThread(rpcResultFuture),
- createCounterCallback(batchBag, batchOrder, counters), MoreExecutors.directExecutor());
- }
-
- return Futures.transform(JdkFutureAdapters.listenInPoolThread(rpcResultFuture),
- ReconcileUtil.<ProcessFlatBatchOutput>createRpcResultToVoidFunction("flat-batch"));
+ if (LOG.isDebugEnabled()) {
+ Futures.addCallback(JdkFutureAdapters.listenInPoolThread(rpcResultFuture),
+ createCounterCallback(batchBag, batchOrder, counters), MoreExecutors.directExecutor());
}
- });
+
+ return Futures.transform(JdkFutureAdapters.listenInPoolThread(rpcResultFuture),
+ ReconcileUtil.createRpcResultToVoidFunction("flat-batch"),
+ MoreExecutors.directExecutor());
+ }, MoreExecutors.directExecutor());
return resultVehicle;
}
final SyncCrudCounters counters) {
return new FutureCallback<RpcResult<ProcessFlatBatchOutput>>() {
@Override
- public void onSuccess(@Nullable final RpcResult<ProcessFlatBatchOutput> result) {
- if (!result.isSuccessful() && result.getResult() != null && !result.getResult().getBatchFailure().isEmpty()) {
+ public void onSuccess(@Nonnull final RpcResult<ProcessFlatBatchOutput> result) {
+ if (!result.isSuccessful() && result.getResult() != null
+ && !result.getResult().getBatchFailure().isEmpty()) {
Map<Range<Integer>, Batch> batchMap = mapBatchesToRanges(inputBatchBag, failureIndexLimit);
decrementBatchFailuresCounters(result.getResult().getBatchFailure(), batchMap, counters);
}
}
@Override
- public void onFailure(final Throwable t) {
+ public void onFailure(final Throwable failure) {
counters.resetAll();
}
};
}
@VisibleForTesting
- static int assembleRemoveFlows(final List<Batch> batchBag, int batchOrder, final Map<TableKey, ItemSyncBox<Flow>> flowItemSyncTableMap) {
+ static int assembleRemoveFlows(final List<Batch> batchBag, int batchOrder,
+ final Map<TableKey, ItemSyncBox<Flow>> flowItemSyncTableMap) {
// process flow remove
int order = batchOrder;
if (flowItemSyncTableMap != null) {
}
@VisibleForTesting
- static int assembleAddOrUpdateGroups(final List<Batch> batchBag, int batchOrder, final List<ItemSyncBox<Group>> groupsToAddOrUpdate) {
+ static int assembleAddOrUpdateGroups(final List<Batch> batchBag, int batchOrder,
+ final List<ItemSyncBox<Group>> groupsToAddOrUpdate) {
// process group add+update
int order = batchOrder;
if (groupsToAddOrUpdate != null) {
new ArrayList<>(groupItemSyncBox.getItemsToUpdate().size());
int itemOrder = 0;
for (Group group : groupItemSyncBox.getItemsToPush()) {
- flatBatchAddGroupBag.add(new FlatBatchAddGroupBuilder(group).setBatchOrder(itemOrder++).build());
+ flatBatchAddGroupBag.add(new FlatBatchAddGroupBuilder(group)
+ .setBatchOrder(itemOrder++).build());
}
final Batch batch = new BatchBuilder()
.setBatchChoice(new FlatBatchAddGroupCaseBuilder()
int itemOrder = 0;
for (ItemSyncBox.ItemUpdateTuple<Group> groupUpdate : groupItemSyncBox.getItemsToUpdate()) {
flatBatchUpdateGroupBag.add(new FlatBatchUpdateGroupBuilder()
- .setBatchOrder(itemOrder++)
- .setOriginalBatchedGroup(new OriginalBatchedGroupBuilder(groupUpdate.getOriginal()).build())
- .setUpdatedBatchedGroup(new UpdatedBatchedGroupBuilder(groupUpdate.getUpdated()).build())
- .build());
+ .setBatchOrder(itemOrder++)
+ .setOriginalBatchedGroup(new OriginalBatchedGroupBuilder(groupUpdate.getOriginal()).build())
+ .setUpdatedBatchedGroup(new UpdatedBatchedGroupBuilder(groupUpdate.getUpdated()).build())
+ .build());
}
final Batch batch = new BatchBuilder()
.setBatchChoice(new FlatBatchUpdateGroupCaseBuilder()
}
@VisibleForTesting
- static int assembleRemoveGroups(final List<Batch> batchBag, int batchOrder, final List<ItemSyncBox<Group>> groupsToRemoveOrUpdate) {
+ static int assembleRemoveGroups(final List<Batch> batchBag, int batchOrder,
+ final List<ItemSyncBox<Group>> groupsToRemoveOrUpdate) {
// process group add+update
int order = batchOrder;
if (groupsToRemoveOrUpdate != null) {
new ArrayList<>(groupItemSyncBox.getItemsToUpdate().size());
int itemOrder = 0;
for (Group group : groupItemSyncBox.getItemsToPush()) {
- flatBatchRemoveGroupBag.add(new FlatBatchRemoveGroupBuilder(group).setBatchOrder(itemOrder++).build());
+ flatBatchRemoveGroupBag.add(new FlatBatchRemoveGroupBuilder(group)
+ .setBatchOrder(itemOrder++).build());
}
final Batch batch = new BatchBuilder()
.setBatchChoice(new FlatBatchRemoveGroupCaseBuilder()
}
@VisibleForTesting
- static int assembleAddOrUpdateMeters(final List<Batch> batchBag, int batchOrder, final ItemSyncBox<Meter> meterItemSyncBox) {
+ static int assembleAddOrUpdateMeters(final List<Batch> batchBag, int batchOrder,
+ final ItemSyncBox<Meter> meterItemSyncBox) {
// process meter add+update
int order = batchOrder;
if (meterItemSyncBox != null) {
}
@VisibleForTesting
- static int assembleRemoveMeters(final List<Batch> batchBag, int batchOrder, final ItemSyncBox<Meter> meterItemSyncBox) {
+ static int assembleRemoveMeters(final List<Batch> batchBag, int batchOrder,
+ final ItemSyncBox<Meter> meterItemSyncBox) {
// process meter remove
int order = batchOrder;
if (meterItemSyncBox != null && !meterItemSyncBox.getItemsToPush().isEmpty()) {
}
@VisibleForTesting
- static int assembleAddOrUpdateFlows(final List<Batch> batchBag, int batchOrder, final Map<TableKey, ItemSyncBox<Flow>> flowItemSyncTableMap) {
+ static int assembleAddOrUpdateFlows(final List<Batch> batchBag, int batchOrder,
+ final Map<TableKey, ItemSyncBox<Flow>> flowItemSyncTableMap) {
// process flow add+update
int order = batchOrder;
if (flowItemSyncTableMap != null) {
int itemOrder = 0;
for (ItemSyncBox.ItemUpdateTuple<Flow> flowUpdate : flowItemSyncBox.getItemsToUpdate()) {
flatBatchUpdateFlowBag.add(new FlatBatchUpdateFlowBuilder()
- .setBatchOrder(itemOrder++)
- .setFlowId(flowUpdate.getUpdated().getId())
- .setOriginalBatchedFlow(new OriginalBatchedFlowBuilder(flowUpdate.getOriginal()).build())
- .setUpdatedBatchedFlow(new UpdatedBatchedFlowBuilder(flowUpdate.getUpdated()).build())
- .build());
+ .setBatchOrder(itemOrder++)
+ .setFlowId(flowUpdate.getUpdated().getId())
+ .setOriginalBatchedFlow(new OriginalBatchedFlowBuilder(flowUpdate.getOriginal()).build())
+ .setUpdatedBatchedFlow(new UpdatedBatchedFlowBuilder(flowUpdate.getUpdated()).build())
+ .build());
}
final Batch batch = new BatchBuilder()
.setBatchChoice(new FlatBatchUpdateFlowCaseBuilder()