package org.opendaylight.openflowplugin.applications.frsync.impl;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.Callable;
-
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeDao;
import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeOdlDao;
import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeSnapshotDao;
-import org.opendaylight.openflowplugin.applications.frsync.impl.strategy.SyncPlanPushStrategyIncrementalImpl;
+import org.opendaylight.openflowplugin.applications.frsync.impl.strategy.SyncPlanPushStrategyFlatBatchImpl;
import org.opendaylight.openflowplugin.applications.frsync.util.SemaphoreKeeperGuavaImpl;
import org.opendaylight.openflowplugin.common.wait.SimpleTaskRetryLooper;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.SalFlatBatchService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
/**
* Top provider of forwarding rules synchronization functionality.
*/
private final SalMeterService salMeterService;
private final SalTableService salTableService;
private final FlowCapableTransactionService transactionService;
+ private final SalFlatBatchService flatBatchService;
/** wildcard path to flow-capable-node augmentation of inventory node */
private static final InstanceIdentifier<FlowCapableNode> FLOW_CAPABLE_NODE_WC_PATH =
this.transactionService =
Preconditions.checkNotNull(rpcRegistry.getRpcService(FlowCapableTransactionService.class),
"RPC SalTableService not found.");
+ this.flatBatchService =
+ Preconditions.checkNotNull(rpcRegistry.getRpcService(SalFlatBatchService.class),
+ "RPC SalFlatBatchService not found.");
nodeConfigDataTreePath =
new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, FLOW_CAPABLE_NODE_WC_PATH);
}
})
.build());
+
@Override
public void onSessionInitiated(final BindingAwareBroker.ProviderContext providerContext) {
final FlowForwarder flowForwarder = new FlowForwarder(salFlowService);
{
//TODO: make is switchable
- final SyncPlanPushStrategy syncPlanPushStrategy = new SyncPlanPushStrategyIncrementalImpl()
- .setFlowForwarder(flowForwarder)
- .setGroupForwarder(groupForwarder)
- .setMeterForwarder(meterForwarder)
- .setTableForwarder(tableForwarder)
- .setTransactionService(transactionService);
+// final SyncPlanPushStrategy syncPlanPushStrategy = new SyncPlanPushStrategyIncrementalImpl()
+// .setFlowForwarder(flowForwarder)
+// .setGroupForwarder(groupForwarder)
+// .setMeterForwarder(meterForwarder)
+// .setTableForwarder(tableForwarder)
+// .setTransactionService(transactionService);
+
+ final SyncPlanPushStrategy syncPlanPushStrategy = new SyncPlanPushStrategyFlatBatchImpl()
+ .setFlatBatchService(flatBatchService)
+ .setTableForwarder(tableForwarder);
final SyncReactorImpl syncReactorImpl = new SyncReactorImpl(syncPlanPushStrategy);
final SyncReactor syncReactorGuard = new SyncReactorGuardDecorator(syncReactorImpl,
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.opendaylight.openflowplugin.applications.frsync.SyncPlanPushStrategy;
import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
import org.opendaylight.openflowplugin.applications.frsync.impl.strategy.SynchronizationDiffInput;
groupsToAddOrUpdate, metersToAddOrUpdate, flowsToAddOrUpdate,
flowsToRemove, metersToRemove, groupsToRemove);
+ counters.setStartNano(System.nanoTime());
final ListenableFuture<RpcResult<Void>> bootstrapResultFuture = RpcResultBuilder.<Void>success().buildFuture();
final ListenableFuture<RpcResult<Void>> resultVehicle = syncPlanPushStrategy.executeSyncStrategy(
bootstrapResultFuture, input, counters);
final CrudCounts flowCrudCounts = counters.getFlowCrudCounts();
final CrudCounts meterCrudCounts = counters.getMeterCrudCounts();
final CrudCounts groupCrudCounts = counters.getGroupCrudCounts();
- LOG.debug("sync-outcome[{}] (added/updated/removed): flow={}/{}/{}, meter={}/{}/{}, group={}/{}/{}",
+ LOG.debug("sync-outcome[{}] (added/updated/removed): flow={}/{}/{}, meter={}/{}/{}, group={}/{}/{}, took={} ms",
nodeId.getValue(),
flowCrudCounts.getAdded(),
flowCrudCounts.getUpdated(),
meterCrudCounts.getRemoved(),
groupCrudCounts.getAdded(),
groupCrudCounts.getUpdated(),
- groupCrudCounts.getRemoved()
+ groupCrudCounts.getRemoved(),
+ TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - counters.getStartNano())
);
}
LOG.trace("Index of last batch step: {}", batchOrder);
final ProcessFlatBatchInput flatBatchInput = new ProcessFlatBatchInputBuilder()
- .setNode(new NodeRef(diffInput.getNodeIdent()))
+ .setNode(new NodeRef(PathUtil.digNodePath(diffInput.getNodeIdent())))
.setExitOnFirstError(false) // TODO: propagate from input
.setBatch(batchBag)
.build();
final Future<RpcResult<ProcessFlatBatchOutput>> rpcResultFuture = flatBatchService.processFlatBatch(flatBatchInput);
return Futures.transform(JdkFutureAdapters.listenInPoolThread(rpcResultFuture),
- ReconcileUtil.<ProcessFlatBatchOutput>createRpcResultToVoidFunction("flat-bulk"));
+ ReconcileUtil.<ProcessFlatBatchOutput>createRpcResultToVoidFunction("flat-batch"));
}
});
- Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantGroups"));
+ Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "flat-batch"));
return resultVehicle;
}
final List<FlatBatchRemoveFlow> flatBatchRemoveFlowBag =
new ArrayList<>(flowItemSyncBox.getItemsToUpdate().size());
int itemOrder = 0;
- for (Flow Flow : flowItemSyncBox.getItemsToPush()) {
- flatBatchRemoveFlowBag.add(new FlatBatchRemoveFlowBuilder(Flow).setBatchOrder(itemOrder++).build());
+ for (Flow flow : flowItemSyncBox.getItemsToPush()) {
+ flatBatchRemoveFlowBag.add(new FlatBatchRemoveFlowBuilder(flow)
+ .setBatchOrder(itemOrder++)
+ .setFlowId(flow.getId())
+ .build());
}
final Batch batch = new BatchBuilder()
.setBatchChoice(new FlatBatchRemoveFlowCaseBuilder()
final List<FlatBatchAddFlow> flatBatchAddFlowBag =
new ArrayList<>(flowItemSyncBox.getItemsToUpdate().size());
int itemOrder = 0;
- for (Flow Flow : flowItemSyncBox.getItemsToPush()) {
- flatBatchAddFlowBag.add(new FlatBatchAddFlowBuilder(Flow).setBatchOrder(itemOrder++).build());
+ for (Flow flow : flowItemSyncBox.getItemsToPush()) {
+ flatBatchAddFlowBag.add(new FlatBatchAddFlowBuilder(flow)
+ .setBatchOrder(itemOrder++)
+ .setFlowId(flow.getId())
+ .build());
}
final Batch batch = new BatchBuilder()
.setBatchChoice(new FlatBatchAddFlowCaseBuilder()
final List<FlatBatchUpdateFlow> flatBatchUpdateFlowBag =
new ArrayList<>(flowItemSyncBox.getItemsToUpdate().size());
int itemOrder = 0;
- for (ItemSyncBox.ItemUpdateTuple<Flow> FlowUpdate : flowItemSyncBox.getItemsToUpdate()) {
+ for (ItemSyncBox.ItemUpdateTuple<Flow> flowUpdate : flowItemSyncBox.getItemsToUpdate()) {
flatBatchUpdateFlowBag.add(new FlatBatchUpdateFlowBuilder()
.setBatchOrder(itemOrder++)
- .setOriginalBatchedFlow(new OriginalBatchedFlowBuilder(FlowUpdate.getOriginal()).build())
- .setUpdatedBatchedFlow(new UpdatedBatchedFlowBuilder(FlowUpdate.getUpdated()).build())
+ .setFlowId(flowUpdate.getUpdated().getId())
+ .setOriginalBatchedFlow(new OriginalBatchedFlowBuilder(flowUpdate.getOriginal()).build())
+ .setUpdatedBatchedFlow(new UpdatedBatchedFlowBuilder(flowUpdate.getUpdated()).build())
.build());
}
final Batch batch = new BatchBuilder()
return batchOrder;
}
- public void setFlatBatchService(final SalFlatBatchService flatBatchService) {
+ public SyncPlanPushStrategyFlatBatchImpl setFlatBatchService(final SalFlatBatchService flatBatchService) {
this.flatBatchService = flatBatchService;
+ return this;
}
- public void setTableForwarder(final TableForwarder tableForwarder) {
+ public SyncPlanPushStrategyFlatBatchImpl setTableForwarder(final TableForwarder tableForwarder) {
this.tableForwarder = tableForwarder;
+ return this;
}
}
ListenableFuture<RpcResult<Void>> chainedResult = RpcResultBuilder.<Void>success().buildFuture();
try {
- groupCrudCounts.setRemoved(ReconcileUtil.countTotalAdds(groupsRemovalPlan));
+ groupCrudCounts.setRemoved(ReconcileUtil.countTotalPushed(groupsRemovalPlan));
if (LOG.isDebugEnabled()) {
LOG.debug("removing groups: planSteps={}, toRemoveTotal={}",
groupsRemovalPlan.size(), groupCrudCounts.getRemoved());
try {
if (!groupsAddPlan.isEmpty()) {
final CrudCounts groupCrudCounts = counters.getGroupCrudCounts();
- groupCrudCounts.setAdded(ReconcileUtil.countTotalAdds(groupsAddPlan));
+ groupCrudCounts.setAdded(ReconcileUtil.countTotalPushed(groupsAddPlan));
groupCrudCounts.setUpdated(ReconcileUtil.countTotalUpdated(groupsAddPlan));
if (LOG.isDebugEnabled()) {
return okToInstall;
}
- public static <E> int countTotalAdds(final List<ItemSyncBox<E>> groupsAddPlan) {
+ public static <E> int countTotalPushed(final Iterable<ItemSyncBox<E>> groupsAddPlan) {
int count = 0;
for (ItemSyncBox<E> groupItemSyncBox : groupsAddPlan) {
count += groupItemSyncBox.getItemsToPush().size();
return count;
}
- public static <E> int countTotalUpdated(final List<ItemSyncBox<E>> groupsAddPlan) {
+ public static <E> int countTotalUpdated(final Iterable<ItemSyncBox<E>> groupsAddPlan) {
int count = 0;
for (ItemSyncBox<E> groupItemSyncBox : groupsAddPlan) {
count += groupItemSyncBox.getItemsToUpdate().size();
private final CrudCounts flowCrudCounts;
private final CrudCounts groupCrudCounts;
private final CrudCounts meterCrudCounts;
+ private long startNano;
public SyncCrudCounters() {
flowCrudCounts = new CrudCounts();
return meterCrudCounts;
}
+
+ public long getStartNano() {
+ return startNano;
+ }
+
+ public void setStartNano(final long startNano) {
+ this.startNano = startNano;
+ }
}
}
/**
- * covers {@link ReconcileUtil#countTotalUpdated(List)} too
+ * covers {@link ReconcileUtil#countTotalUpdated(Iterable)} too
*
* @throws Exception
*/
ItemSyncBox<String> syncBox1 = createSyncBox("a,b", "x,y,z");
syncPlan.add(syncBox1);
syncPlan.add(syncBox1);
- Assert.assertEquals(4, ReconcileUtil.countTotalAdds(syncPlan));
+ Assert.assertEquals(4, ReconcileUtil.countTotalPushed(syncPlan));
Assert.assertEquals(6, ReconcileUtil.countTotalUpdated(syncPlan));
}