Bug 5575 flat-bulk strategy wired 28/38328/7
authorAndrej Leitner <anleitne@cisco.com>
Tue, 3 May 2016 12:31:20 +0000 (14:31 +0200)
committerAndrej Leitner <anleitne@cisco.com>
Tue, 24 May 2016 06:32:07 +0000 (08:32 +0200)
  - FR-sync currently using flat-batch strategy
  - renaming, time

Change-Id: Id59ff6c15ac25649df16db9b786b1d6b85b75806
Signed-off-by: Andrej Leitner <anleitne@cisco.com>
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/ForwardingRulesSyncProvider.java
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorImpl.java
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/strategy/SyncPlanPushStrategyFlatBatchImpl.java
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/strategy/SyncPlanPushStrategyIncrementalImpl.java
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/ReconcileUtil.java
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/SyncCrudCounters.java
applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/util/ReconcileUtilTest.java

index 0b1b529794d3354baa0cc50c8983e1ff490a85aa..57c404b6d8cca5c7a0913e960e1e8cb5f189aa12 100644 (file)
@@ -8,9 +8,11 @@
 
 package org.opendaylight.openflowplugin.applications.frsync.impl;
 
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.concurrent.Callable;
-
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
@@ -24,9 +26,10 @@ import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeCa
 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeDao;
 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeOdlDao;
 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeSnapshotDao;
-import org.opendaylight.openflowplugin.applications.frsync.impl.strategy.SyncPlanPushStrategyIncrementalImpl;
+import org.opendaylight.openflowplugin.applications.frsync.impl.strategy.SyncPlanPushStrategyFlatBatchImpl;
 import org.opendaylight.openflowplugin.applications.frsync.util.SemaphoreKeeperGuavaImpl;
 import org.opendaylight.openflowplugin.common.wait.SimpleTaskRetryLooper;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.SalFlatBatchService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
@@ -40,10 +43,6 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
 /**
  * Top provider of forwarding rules synchronization functionality.
  */
@@ -59,6 +58,7 @@ public class ForwardingRulesSyncProvider implements AutoCloseable, BindingAwareP
     private final SalMeterService salMeterService;
     private final SalTableService salTableService;
     private final FlowCapableTransactionService transactionService;
+    private final SalFlatBatchService flatBatchService;
 
     /** wildcard path to flow-capable-node augmentation of inventory node */
     private static final InstanceIdentifier<FlowCapableNode> FLOW_CAPABLE_NODE_WC_PATH =
@@ -93,6 +93,9 @@ public class ForwardingRulesSyncProvider implements AutoCloseable, BindingAwareP
         this.transactionService =
                 Preconditions.checkNotNull(rpcRegistry.getRpcService(FlowCapableTransactionService.class),
                         "RPC SalTableService not found.");
+        this.flatBatchService =
+                Preconditions.checkNotNull(rpcRegistry.getRpcService(SalFlatBatchService.class),
+                        "RPC SalFlatBatchService not found.");
 
         nodeConfigDataTreePath =
                 new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, FLOW_CAPABLE_NODE_WC_PATH);
@@ -115,6 +118,7 @@ public class ForwardingRulesSyncProvider implements AutoCloseable, BindingAwareP
                         }
                     })
                     .build());
+
     @Override
     public void onSessionInitiated(final BindingAwareBroker.ProviderContext providerContext) {
         final FlowForwarder flowForwarder = new FlowForwarder(salFlowService);
@@ -124,12 +128,16 @@ public class ForwardingRulesSyncProvider implements AutoCloseable, BindingAwareP
 
         {
             //TODO: make is switchable
-            final SyncPlanPushStrategy syncPlanPushStrategy = new SyncPlanPushStrategyIncrementalImpl()
-                    .setFlowForwarder(flowForwarder)
-                    .setGroupForwarder(groupForwarder)
-                    .setMeterForwarder(meterForwarder)
-                   .setTableForwarder(tableForwarder)
-                    .setTransactionService(transactionService);
+//            final SyncPlanPushStrategy syncPlanPushStrategy = new SyncPlanPushStrategyIncrementalImpl()
+//                    .setFlowForwarder(flowForwarder)
+//                    .setGroupForwarder(groupForwarder)
+//                    .setMeterForwarder(meterForwarder)
+//                    .setTableForwarder(tableForwarder)
+//                    .setTransactionService(transactionService);
+
+            final SyncPlanPushStrategy syncPlanPushStrategy = new SyncPlanPushStrategyFlatBatchImpl()
+                    .setFlatBatchService(flatBatchService)
+                    .setTableForwarder(tableForwarder);
 
             final SyncReactorImpl syncReactorImpl = new SyncReactorImpl(syncPlanPushStrategy);
             final SyncReactor syncReactorGuard = new SyncReactorGuardDecorator(syncReactorImpl,
index b7bc472031ca1d511038d4ce4ab7d052d4dc60e0..de4819e11d79787886ffb527125f3890ea35ad60 100644 (file)
@@ -17,6 +17,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import org.opendaylight.openflowplugin.applications.frsync.SyncPlanPushStrategy;
 import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
 import org.opendaylight.openflowplugin.applications.frsync.impl.strategy.SynchronizationDiffInput;
@@ -86,6 +87,7 @@ public class SyncReactorImpl implements SyncReactor {
                 groupsToAddOrUpdate, metersToAddOrUpdate, flowsToAddOrUpdate,
                 flowsToRemove, metersToRemove, groupsToRemove);
 
+        counters.setStartNano(System.nanoTime());
         final ListenableFuture<RpcResult<Void>> bootstrapResultFuture = RpcResultBuilder.<Void>success().buildFuture();
         final ListenableFuture<RpcResult<Void>> resultVehicle = syncPlanPushStrategy.executeSyncStrategy(
                 bootstrapResultFuture, input, counters);
@@ -104,7 +106,7 @@ public class SyncReactorImpl implements SyncReactor {
                     final CrudCounts flowCrudCounts = counters.getFlowCrudCounts();
                     final CrudCounts meterCrudCounts = counters.getMeterCrudCounts();
                     final CrudCounts groupCrudCounts = counters.getGroupCrudCounts();
-                    LOG.debug("sync-outcome[{}] (added/updated/removed): flow={}/{}/{}, meter={}/{}/{}, group={}/{}/{}",
+                    LOG.debug("sync-outcome[{}] (added/updated/removed): flow={}/{}/{}, meter={}/{}/{}, group={}/{}/{}, took={} ms",
                             nodeId.getValue(),
                             flowCrudCounts.getAdded(),
                             flowCrudCounts.getUpdated(),
@@ -114,7 +116,8 @@ public class SyncReactorImpl implements SyncReactor {
                             meterCrudCounts.getRemoved(),
                             groupCrudCounts.getAdded(),
                             groupCrudCounts.getUpdated(),
-                            groupCrudCounts.getRemoved()
+                            groupCrudCounts.getRemoved(),
+                            TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - counters.getStartNano())
                     );
                 }
 
index 7db755ec54e7f6514e33edf5683fca7a20b6a3f3..c42b31dcfbcfbc93560c513bed027c7baa51d461 100644 (file)
@@ -120,7 +120,7 @@ public class SyncPlanPushStrategyFlatBatchImpl implements SyncPlanPushStrategy {
                 LOG.trace("Index of last batch step: {}", batchOrder);
 
                 final ProcessFlatBatchInput flatBatchInput = new ProcessFlatBatchInputBuilder()
-                        .setNode(new NodeRef(diffInput.getNodeIdent()))
+                        .setNode(new NodeRef(PathUtil.digNodePath(diffInput.getNodeIdent())))
                         .setExitOnFirstError(false) // TODO: propagate from input
                         .setBatch(batchBag)
                         .build();
@@ -128,11 +128,11 @@ public class SyncPlanPushStrategyFlatBatchImpl implements SyncPlanPushStrategy {
                 final Future<RpcResult<ProcessFlatBatchOutput>> rpcResultFuture = flatBatchService.processFlatBatch(flatBatchInput);
 
                 return Futures.transform(JdkFutureAdapters.listenInPoolThread(rpcResultFuture),
-                        ReconcileUtil.<ProcessFlatBatchOutput>createRpcResultToVoidFunction("flat-bulk"));
+                        ReconcileUtil.<ProcessFlatBatchOutput>createRpcResultToVoidFunction("flat-batch"));
             }
         });
 
-        Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantGroups"));
+        Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "flat-batch"));
         return resultVehicle;
     }
 
@@ -148,8 +148,11 @@ public class SyncPlanPushStrategyFlatBatchImpl implements SyncPlanPushStrategy {
                     final List<FlatBatchRemoveFlow> flatBatchRemoveFlowBag =
                             new ArrayList<>(flowItemSyncBox.getItemsToUpdate().size());
                     int itemOrder = 0;
-                    for (Flow Flow : flowItemSyncBox.getItemsToPush()) {
-                        flatBatchRemoveFlowBag.add(new FlatBatchRemoveFlowBuilder(Flow).setBatchOrder(itemOrder++).build());
+                    for (Flow flow : flowItemSyncBox.getItemsToPush()) {
+                        flatBatchRemoveFlowBag.add(new FlatBatchRemoveFlowBuilder(flow)
+                                .setBatchOrder(itemOrder++)
+                                .setFlowId(flow.getId())
+                                .build());
                     }
                     final Batch batch = new BatchBuilder()
                             .setBatchChoice(new FlatBatchRemoveFlowCaseBuilder()
@@ -332,8 +335,11 @@ public class SyncPlanPushStrategyFlatBatchImpl implements SyncPlanPushStrategy {
                     final List<FlatBatchAddFlow> flatBatchAddFlowBag =
                             new ArrayList<>(flowItemSyncBox.getItemsToUpdate().size());
                     int itemOrder = 0;
-                    for (Flow Flow : flowItemSyncBox.getItemsToPush()) {
-                        flatBatchAddFlowBag.add(new FlatBatchAddFlowBuilder(Flow).setBatchOrder(itemOrder++).build());
+                    for (Flow flow : flowItemSyncBox.getItemsToPush()) {
+                        flatBatchAddFlowBag.add(new FlatBatchAddFlowBuilder(flow)
+                                .setBatchOrder(itemOrder++)
+                                .setFlowId(flow.getId())
+                                .build());
                     }
                     final Batch batch = new BatchBuilder()
                             .setBatchChoice(new FlatBatchAddFlowCaseBuilder()
@@ -348,11 +354,12 @@ public class SyncPlanPushStrategyFlatBatchImpl implements SyncPlanPushStrategy {
                     final List<FlatBatchUpdateFlow> flatBatchUpdateFlowBag =
                             new ArrayList<>(flowItemSyncBox.getItemsToUpdate().size());
                     int itemOrder = 0;
-                    for (ItemSyncBox.ItemUpdateTuple<Flow> FlowUpdate : flowItemSyncBox.getItemsToUpdate()) {
+                    for (ItemSyncBox.ItemUpdateTuple<Flow> flowUpdate : flowItemSyncBox.getItemsToUpdate()) {
                         flatBatchUpdateFlowBag.add(new FlatBatchUpdateFlowBuilder()
                                 .setBatchOrder(itemOrder++)
-                                .setOriginalBatchedFlow(new OriginalBatchedFlowBuilder(FlowUpdate.getOriginal()).build())
-                                .setUpdatedBatchedFlow(new UpdatedBatchedFlowBuilder(FlowUpdate.getUpdated()).build())
+                                .setFlowId(flowUpdate.getUpdated().getId())
+                                .setOriginalBatchedFlow(new OriginalBatchedFlowBuilder(flowUpdate.getOriginal()).build())
+                                .setUpdatedBatchedFlow(new UpdatedBatchedFlowBuilder(flowUpdate.getUpdated()).build())
                                 .build());
                     }
                     final Batch batch = new BatchBuilder()
@@ -368,11 +375,13 @@ public class SyncPlanPushStrategyFlatBatchImpl implements SyncPlanPushStrategy {
         return batchOrder;
     }
 
-    public void setFlatBatchService(final SalFlatBatchService flatBatchService) {
+    public SyncPlanPushStrategyFlatBatchImpl setFlatBatchService(final SalFlatBatchService flatBatchService) {
         this.flatBatchService = flatBatchService;
+        return this;
     }
 
-    public void setTableForwarder(final TableForwarder tableForwarder) {
+    public SyncPlanPushStrategyFlatBatchImpl setTableForwarder(final TableForwarder tableForwarder) {
         this.tableForwarder = tableForwarder;
+        return this;
     }
 }
index d79c5fd6685505f37b5d1a8febfccf6a5ef1a0fa..4a56acb38b124625ef4dfd9828b864f6d5e7b238 100644 (file)
@@ -294,7 +294,7 @@ public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy
 
         ListenableFuture<RpcResult<Void>> chainedResult = RpcResultBuilder.<Void>success().buildFuture();
         try {
-            groupCrudCounts.setRemoved(ReconcileUtil.countTotalAdds(groupsRemovalPlan));
+            groupCrudCounts.setRemoved(ReconcileUtil.countTotalPushed(groupsRemovalPlan));
             if (LOG.isDebugEnabled()) {
                 LOG.debug("removing groups: planSteps={}, toRemoveTotal={}",
                         groupsRemovalPlan.size(), groupCrudCounts.getRemoved());
@@ -476,7 +476,7 @@ public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy
         try {
             if (!groupsAddPlan.isEmpty()) {
                 final CrudCounts groupCrudCounts = counters.getGroupCrudCounts();
-                groupCrudCounts.setAdded(ReconcileUtil.countTotalAdds(groupsAddPlan));
+                groupCrudCounts.setAdded(ReconcileUtil.countTotalPushed(groupsAddPlan));
                 groupCrudCounts.setUpdated(ReconcileUtil.countTotalUpdated(groupsAddPlan));
 
                 if (LOG.isDebugEnabled()) {
index e4a12499645583662d7556f48269089a43b98a87..e339ccdd4a7f181077a8e25bdf8dde17e7d9c753 100644 (file)
@@ -235,7 +235,7 @@ public class ReconcileUtil {
         return okToInstall;
     }
 
-    public static <E> int countTotalAdds(final List<ItemSyncBox<E>> groupsAddPlan) {
+    public static <E> int countTotalPushed(final Iterable<ItemSyncBox<E>> groupsAddPlan) {
         int count = 0;
         for (ItemSyncBox<E> groupItemSyncBox : groupsAddPlan) {
             count += groupItemSyncBox.getItemsToPush().size();
@@ -243,7 +243,7 @@ public class ReconcileUtil {
         return count;
     }
 
-    public static <E> int countTotalUpdated(final List<ItemSyncBox<E>> groupsAddPlan) {
+    public static <E> int countTotalUpdated(final Iterable<ItemSyncBox<E>> groupsAddPlan) {
         int count = 0;
         for (ItemSyncBox<E> groupItemSyncBox : groupsAddPlan) {
             count += groupItemSyncBox.getItemsToUpdate().size();
index a832d2cc340c67703b88167794fb474d278b0b49..5e3cfe3bb1a7db43176506113397332a7612e050 100644 (file)
@@ -15,6 +15,7 @@ public class SyncCrudCounters {
     private final CrudCounts flowCrudCounts;
     private final CrudCounts groupCrudCounts;
     private final CrudCounts meterCrudCounts;
+    private long startNano;
 
     public SyncCrudCounters() {
         flowCrudCounts = new CrudCounts();
@@ -34,4 +35,12 @@ public class SyncCrudCounters {
         return meterCrudCounts;
     }
 
+
+    public long getStartNano() {
+        return startNano;
+    }
+
+    public void setStartNano(final long startNano) {
+        this.startNano = startNano;
+    }
 }
index ac474a3397964178f8cd23895d0f86b6a71888f1..b5d68931b916609f0763fd1d82452d87e0536181 100644 (file)
@@ -312,7 +312,7 @@ public class ReconcileUtilTest {
     }
 
     /**
-     * covers {@link ReconcileUtil#countTotalUpdated(List)} too
+     * covers {@link ReconcileUtil#countTotalUpdated(Iterable)} too
      *
      * @throws Exception
      */
@@ -322,7 +322,7 @@ public class ReconcileUtilTest {
         ItemSyncBox<String> syncBox1 = createSyncBox("a,b", "x,y,z");
         syncPlan.add(syncBox1);
         syncPlan.add(syncBox1);
-        Assert.assertEquals(4, ReconcileUtil.countTotalAdds(syncPlan));
+        Assert.assertEquals(4, ReconcileUtil.countTotalPushed(syncPlan));
         Assert.assertEquals(6, ReconcileUtil.countTotalUpdated(syncPlan));
     }