Merge "bug 3126 - shift to new notification service"
authormichal rehak <mirehak@cisco.com>
Wed, 6 May 2015 08:37:23 +0000 (08:37 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 6 May 2015 08:37:23 +0000 (08:37 +0000)
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/TransactionChainManager.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/registry/flow/FlowHashFactory.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/SalTableServiceImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsManagerImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/util/MatchUtil.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/TransactionChainManagerTest.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/registry/flow/FlowHashFactoryTest.java

index 7229a28eb5f5d4ae2f6759d9c45049c05b820b44..b7e8d6477a881a2290c3e675c2ef505f8f67d3c4 100644 (file)
@@ -131,7 +131,7 @@ public class DeviceContextImpl implements DeviceContext {
      * and we are able to set a scheduler for an automatic transaction submitting by time (0,5sec).
      */
     void submitTransaction() {
-        txChainManager.enableCounter();
+        txChainManager.enableSubmit();
         txChainManager.submitTransaction();
     }
 
@@ -245,7 +245,7 @@ public class DeviceContextImpl implements DeviceContext {
                 //TODO : this is the point, where we can discover that add flow operation failed and where we should
                 //TODO : remove this flow from deviceFlowRegistry
                 final Error error = (Error) ofHeader;
-                final String message = "Operation on device failed";
+                final String message = "Operation on device failed with xid "+ofHeader.getXid()+".";
                 rpcResult = RpcResultBuilder
                         .<OfHeader>failed()
                         .withError(RpcError.ErrorType.APPLICATION, message, new DeviceDataException(message, error))
index 4ae4d3e42ddc420568d84b64c1ba22c33d4bb628..f4e9c097e906dd9e97668b7b9d26ec3cee070dd2 100644 (file)
@@ -10,6 +10,9 @@ package org.opendaylight.openflowplugin.impl.device;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
@@ -22,6 +25,7 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
@@ -53,7 +57,7 @@ class TransactionChainManager implements TransactionChainListener {
     private WriteTransaction wTx;
     private Timeout submitTaskTime;
     private long nrOfActualTx;
-    private boolean counterIsEnabled;
+    private boolean submitIsEnabled;
 
     TransactionChainManager(@Nonnull final DataBroker dataBroker,
                             @Nonnull final HashedWheelTimer hashedWheelTimer,
@@ -70,32 +74,41 @@ class TransactionChainManager implements TransactionChainListener {
 
 
     public void commitOperationsGatheredInOneTransaction(){
-        enableCounter();
+        enableSubmit();
         submitTransaction();
     }
     public void startGatheringOperationsToOneTransaction(){
-        counterIsEnabled = false;
+        submitIsEnabled = false;
     }
 
     synchronized <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
                                                                 final InstanceIdentifier<T> path, final T data) {
-        if (wTx == null) {
-            wTx = txChainFactory.newWriteOnlyTransaction();
-        }
-        wTx.put(store, path, data);
-        if (counterIsEnabled) {
+        try {
+            WriteTransaction writeTx = getTransactionSafely();
+            writeTx.put(store, path, data);
             countTxInAndCommit();
+        } catch (Exception e) {
+            LOG.warn("failed to put into writeOnlyTransaction: {}", e.getMessage());
+            LOG.trace("failed to put into writeOnlyTransaction.. ", e);
         }
     }
 
-    synchronized <T extends DataObject> void addDeleteOperationTotTxChain(final LogicalDatastoreType store,
-                                                                          final InstanceIdentifier<T> path) {
+    private WriteTransaction getTransactionSafely() {
         if (wTx == null) {
             wTx = txChainFactory.newWriteOnlyTransaction();
         }
-        wTx.delete(store, path);
-        if (counterIsEnabled) {
+        return wTx;
+    }
+
+    synchronized <T extends DataObject> void addDeleteOperationTotTxChain(final LogicalDatastoreType store,
+                                                                          final InstanceIdentifier<T> path) {
+        try {
+            WriteTransaction writeTx = getTransactionSafely();
+            writeTx.delete(store, path);
             countTxInAndCommit();
+        } catch (Exception e) {
+            LOG.warn("failed to put into writeOnlyTransaction [{}]: {}", e.getMessage());
+            LOG.trace("failed to put into writeOnlyTransaction.. ", e);
         }
     }
 
@@ -106,30 +119,62 @@ class TransactionChainManager implements TransactionChainListener {
         }
     }
 
+    synchronized void submitScheduledTransaction(Timeout timeout) {
+        if (timeout.isCancelled()) {
+            // zombie timer executed
+            return;
+        }
+
+        if (submitIsEnabled) {
+            submitTransaction();
+        } else {
+            LOG.info("transaction submit task will not be scheduled - submit block issued.");
+        }
+    }
+
     synchronized void submitTransaction() {
-        if (counterIsEnabled) {
-            if (wTx != null) {
+        if (submitIsEnabled) {
+            if (wTx != null && nrOfActualTx > 0) {
                 LOG.trace("submitting transaction, counter: {}", nrOfActualTx);
-                wTx.submit();
+                CheckedFuture<Void, TransactionCommitFailedException> submitResult = wTx.submit();
+                hookTimeExpenseCounter(submitResult, String.valueOf(wTx.getIdentifier()) + "::" + nrOfActualTx);
                 wTx = null;
                 nrOfActualTx = 0L;
             }
-            if (submitTaskTime != null && !submitTaskTime.isExpired()) {
+            if (submitTaskTime != null) {
+                // if possible then cancel current timer (even if being executed via timer)
                 submitTaskTime.cancel();
             }
             submitTaskTime = hashedWheelTimer.newTimeout(new TimerTask() {
                 @Override
                 public void run(final Timeout timeout) throws Exception {
-                    submitTransaction();
+                    submitScheduledTransaction(timeout);
                 }
             }, timerValue, TimeUnit.MILLISECONDS);
+
         } else {
-            LOG.info("Task will not be scheduled - submit block issued.");
+            LOG.trace("transaction not committed - submit block issued");
         }
     }
 
-    synchronized void enableCounter() {
-        counterIsEnabled = true;
+    private void hookTimeExpenseCounter(CheckedFuture<Void, TransactionCommitFailedException> submitResult, final String name) {
+        final long submitFiredTime = System.currentTimeMillis();
+        LOG.debug("submit of {} fired", name);
+        Futures.addCallback(submitResult, new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(Void result) {
+                LOG.debug("submit of {} finished in {} ms", name, System.currentTimeMillis() - submitFiredTime);
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                LOG.warn("transaction submit failed: {}", t.getMessage());
+            }
+        });
+    }
+
+    synchronized void enableSubmit() {
+        submitIsEnabled = true;
     }
 
     @Override
index ea0feeddd1a0144f5d789c90a0e6ad930844d336..9747d014409ea774bd0fd9f3616431c627bee453 100644 (file)
@@ -83,17 +83,17 @@ public class FlowHashFactory {
 
         @Override
         public short getTableId() {
-            return 0;
+            return tableId;
         }
 
         @Override
         public int getPriority() {
-            return 0;
+            return priority;
         }
 
         @Override
         public BigInteger getCookie() {
-            return null;
+            return cookie;
         }
     }
 }
index 5de219d7be71d66c26b9b6e82e4208b2c49b53a8..b0434a9ab0b1994fddd4f47a66da40e1feece6cd 100644 (file)
@@ -7,6 +7,23 @@
  */
 package org.opendaylight.openflowplugin.impl.services;
 
+import java.util.ArrayList;
+
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeaturesKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.TableFeaturesReplyConvertor;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.multipart.reply.table.features._case.MultipartReplyTableFeatures;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.MultipartReplyTableFeaturesCase;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.MultipartReplyBody;
 import java.math.BigInteger;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.UpdateTableOutputBuilder;
@@ -48,15 +65,16 @@ public class SalTableServiceImpl extends CommonService implements SalTableServic
 
     @Override
     public Future<RpcResult<UpdateTableOutput>> updateTable(final UpdateTableInput input) {
-        class FunctionImpl implements Function<DataCrate<List<MultipartReply>>,ListenableFuture<RpcResult<List<MultipartReply>>>> {
+        class FunctionImpl implements
+                Function<DataCrate<List<MultipartReply>>, ListenableFuture<RpcResult<List<MultipartReply>>>> {
 
             @Override
             public ListenableFuture<RpcResult<List<MultipartReply>>> apply(final DataCrate<List<MultipartReply>> data) {
-                messageSpy.spyMessage(input.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMITTED_SUCCESS);
+                messageSpy.spyMessage(input.getImplementedInterface(),
+                        MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMITTED_SUCCESS);
 
                 final SettableFuture<RpcResult<List<MultipartReply>>> result = SettableFuture.create();
 
-
                 final MultipartRequestTableFeaturesCaseBuilder caseBuilder = new MultipartRequestTableFeaturesCaseBuilder();
                 final MultipartRequestTableFeaturesBuilder requestBuilder = new MultipartRequestTableFeaturesBuilder();
                 final List<TableFeatures> ofTableFeatureList = TableFeaturesConvertor.toTableFeaturesRequest(input
@@ -80,7 +98,8 @@ public class SalTableServiceImpl extends CommonService implements SalTableServic
             }
         }
 
-        final ListenableFuture<RpcResult<List<MultipartReply>>> multipartFuture = handleServiceCall( PRIMARY_CONNECTION, new FunctionImpl());
+        final ListenableFuture<RpcResult<List<MultipartReply>>> multipartFuture = handleServiceCall(PRIMARY_CONNECTION,
+                new FunctionImpl());
         final SettableFuture<RpcResult<UpdateTableOutput>> finalFuture = SettableFuture.create();
 
         class CallBackImpl implements FutureCallback<RpcResult<List<MultipartReply>>> {
@@ -93,31 +112,74 @@ public class SalTableServiceImpl extends CommonService implements SalTableServic
                     final List<MultipartReply> multipartReplies = result.getResult();
                     if (multipartReplies.isEmpty()) {
                         LOGGER.debug("Multipart reply to table features request shouldn't be empty list.");
-                        finalFuture.set(RpcResultBuilder.<UpdateTableOutput>failed().withError(ErrorType.RPC, "Multipart reply list is empty.").build());
+                        finalFuture.set(RpcResultBuilder.<UpdateTableOutput> failed()
+                                .withError(ErrorType.RPC, "Multipart reply list is empty.").build());
                     } else {
                         final Long xid = multipartReplies.get(0).getXid();
-                        LOGGER.debug("OnSuccess, rpc result successful, multipart response for rpc update-table with xid {} obtained.",xid);
+                        LOGGER.debug(
+                                "OnSuccess, rpc result successful, multipart response for rpc update-table with xid {} obtained.",
+                                xid);
                         final UpdateTableOutputBuilder updateTableOutputBuilder = new UpdateTableOutputBuilder();
                         updateTableOutputBuilder.setTransactionId(new TransactionId(BigInteger.valueOf(xid)));
                         finalFuture.set(RpcResultBuilder.success(updateTableOutputBuilder.build()).build());
+                        writeResponseToOperationalDatastore(multipartReplies);
                     }
-                    //TODO: output could contain more interesting things then only xid.
-                    //(According to rfc output for table-update it is only xid)
-//                    for (MultipartReply multipartReply : result.getResult()) {
-//                        if (multipartReply.getType().equals(MultipartType.OFPMPTABLEFEATURES)) {
-//                        }
-//                    }
                 } else {
                     LOGGER.debug("OnSuccess, rpc result unsuccessful, multipart response for rpc update-table was unsuccessful.");
-                    finalFuture.set(RpcResultBuilder.<UpdateTableOutput>failed().withRpcErrors(result.getErrors()).build());
+                    finalFuture.set(RpcResultBuilder.<UpdateTableOutput> failed().withRpcErrors(result.getErrors())
+                            .build());
                 }
             }
 
             @Override
             public void onFailure(Throwable t) {
                 LOGGER.debug("Failure multipart response for table features request. Exception: {}", t);
-                finalFuture.set(RpcResultBuilder.<UpdateTableOutput>failed().withError(ErrorType.RPC, "Future error", t).build());
+                finalFuture.set(RpcResultBuilder.<UpdateTableOutput> failed()
+                        .withError(ErrorType.RPC, "Future error", t).build());
+            }
+
+            /**
+             * @param multipartReplies
+             */
+            private void writeResponseToOperationalDatastore(final List<MultipartReply> multipartReplies) {
+
+                final List<org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures> salTableFeatures = convertToSalTableFeatures(multipartReplies);
+
+                final NodeId nodeId = deviceContext.getPrimaryConnectionContext().getNodeId();
+                final InstanceIdentifier<FlowCapableNode> flowCapableNodeII = InstanceIdentifier.create(Nodes.class)
+                        .child(Node.class, new NodeKey(nodeId)).augmentation(FlowCapableNode.class);
+                for (org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures tableFeatureData : salTableFeatures) {
+                    final Short tableId = tableFeatureData.getTableId();
+                    KeyedInstanceIdentifier<org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures, TableFeaturesKey> tableFeaturesII = flowCapableNodeII
+                            .child(Table.class, new TableKey(tableId))
+                            .child(org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures.class,
+                                    new TableFeaturesKey(tableId));
+                    deviceContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, tableFeaturesII,
+                            tableFeatureData);
+                }
+
             }
+
+            private List<org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures> convertToSalTableFeatures(
+                    final List<MultipartReply> multipartReplies) {
+                final List<org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures> salTableFeaturesAll = new ArrayList<>();
+                for (MultipartReply multipartReply : multipartReplies) {
+                    if (multipartReply.getType().equals(MultipartType.OFPMPTABLEFEATURES)) {
+                        MultipartReplyBody multipartReplyBody = multipartReply.getMultipartReplyBody();
+                        if (multipartReplyBody instanceof MultipartReplyTableFeaturesCase) {
+                            MultipartReplyTableFeaturesCase tableFeaturesCase = ((MultipartReplyTableFeaturesCase) multipartReplyBody);
+                            MultipartReplyTableFeatures salTableFeatures = tableFeaturesCase
+                                    .getMultipartReplyTableFeatures();
+                            List<org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures> salTableFeaturesPartial = TableFeaturesReplyConvertor
+                                    .toTableFeaturesReply(salTableFeatures);
+                            salTableFeaturesAll.addAll(salTableFeaturesPartial);
+                            LOGGER.debug("TableFeature {} for xid {}.", salTableFeatures, multipartReply.getXid());
+                        }
+                    }
+                }
+                return salTableFeaturesAll;
+            }
+
         }
 
         Futures.addCallback(multipartFuture, new CallBackImpl());
@@ -125,7 +187,6 @@ public class SalTableServiceImpl extends CommonService implements SalTableServic
         return finalFuture;
     }
 
-
     private MultipartRequestInputBuilder createMultipartHeader(final MultipartType multipart, final Long xid) {
         final MultipartRequestInputBuilder mprInput = new MultipartRequestInputBuilder();
         mprInput.setType(multipart);
index 96a0ca4ebba70f3f12ea99641c83d20303696cfe..eaf1aa320ab379fce00d2c8ebc943d6fe150654f 100644 (file)
@@ -12,7 +12,6 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
-import com.sun.org.apache.xpath.internal.operations.Bool;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -24,14 +23,7 @@ import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext
 import org.opendaylight.openflowplugin.impl.rpc.RequestContextImpl;
 import org.opendaylight.openflowplugin.impl.services.RequestContextUtil;
 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,21 +47,12 @@ public class StatisticsContextImpl implements StatisticsContext {
 
     }
 
-    private void pollFlowStatistics() {
-        final KeyedInstanceIdentifier<Node, NodeKey> nodeII = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(deviceContext.getPrimaryConnectionContext().getNodeId()));
-        final NodeRef nodeRef = new NodeRef(nodeII);
-        final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder builder =
-                new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
-        builder.setNode(nodeRef);
-        //TODO : process data from result
-    }
-
     @Override
     public ListenableFuture<Boolean> gatherDynamicData() {
 
-
         final SettableFuture settableResultingFuture = SettableFuture.create();
-        ListenableFuture<Boolean> resultingFuture = settableResultingFuture ;
+        ListenableFuture<Boolean> resultingFuture = settableResultingFuture;
+
 
         if (ConnectionContext.CONNECTION_STATE.WORKING.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
             final DeviceState devState = deviceContext.getDeviceState();
@@ -99,6 +82,7 @@ public class StatisticsContextImpl implements StatisticsContext {
                     }
                     settableResultingFuture.set(new Boolean(atLeastOneSuccess));
                 }
+
                 @Override
                 public void onFailure(final Throwable throwable) {
                     settableResultingFuture.setException(throwable);
index 86cb2eecf68e94b619a42933f8dfbc3cee251aee..6e99aa76540234515686d309b530fc60f59858cb 100644 (file)
@@ -36,6 +36,12 @@ public class StatisticsManagerImpl implements StatisticsManager {
 
     private ConcurrentHashMap<DeviceContext, StatisticsContext> contexts = new ConcurrentHashMap();
 
+    private final TimeCounter timeCounter = new TimeCounter();
+
+    private static final long basicTimerDelay = 3000;
+    private static long currentTimerDelay = basicTimerDelay;
+    private static long maximumTimerDelay = 900000; //wait max 15 minutes for next statistics
+
     @Override
     public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
         deviceInitPhaseHandler = handler;
@@ -78,27 +84,49 @@ public class StatisticsManagerImpl implements StatisticsManager {
     }
 
     private void pollStatistics() {
-        for (final StatisticsContext statisticsContext : contexts.values()) {
-            ListenableFuture deviceStatisticsCollectionFuture = statisticsContext.gatherDynamicData();
-            Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback() {
-                @Override
-                public void onSuccess(final Object o) {
-                    //nothing to do here
-                }
+        try {
+            timeCounter.markStart();
+            for (final StatisticsContext statisticsContext : contexts.values()) {
+                ListenableFuture deviceStatisticsCollectionFuture = statisticsContext.gatherDynamicData();
+                Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback() {
+                    @Override
+                    public void onSuccess(final Object o) {
+                        timeCounter.addTimeMark();
+                    }
 
-                @Override
-                public void onFailure(final Throwable throwable) {
-                    LOG.info("Statistics gathering for single node was not successful.");
-                }
-            });
+                    @Override
+                    public void onFailure(final Throwable throwable) {
+                        timeCounter.addTimeMark();
+                        LOG.info("Statistics gathering for single node was not successful: {}", throwable.getMessage());
+                        LOG.debug("Statistics gathering for single node was not successful.. ", throwable);
+                    }
+                });
+            }
+        } finally {
+            calculateTimerDelay();
+            if (null != hashedWheelTimer) {
+                hashedWheelTimer.newTimeout(new TimerTask() {
+                    @Override
+                    public void run(final Timeout timeout) throws Exception {
+                        pollStatistics();
+                    }
+                }, currentTimerDelay, TimeUnit.MILLISECONDS);
+            }
         }
-        if (null != hashedWheelTimer) {
-            hashedWheelTimer.newTimeout(new TimerTask() {
-                @Override
-                public void run(final Timeout timeout) throws Exception {
-                    pollStatistics();
-                }
-            }, 3000, TimeUnit.MILLISECONDS);
+    }
+
+    private void calculateTimerDelay() {
+        long averageStatisticsGatheringTime = timeCounter.getAverageTimeBetweenMarks();
+        int numberOfDevices = contexts.size();
+        if ((averageStatisticsGatheringTime * numberOfDevices) > currentTimerDelay) {
+            currentTimerDelay *= 2;
+            if (currentTimerDelay > maximumTimerDelay) {
+                currentTimerDelay = maximumTimerDelay;
+            }
+        } else {
+            if (currentTimerDelay > basicTimerDelay) {
+                currentTimerDelay /= 2;
+            }
         }
     }
 
@@ -115,4 +143,30 @@ public class StatisticsManagerImpl implements StatisticsManager {
             }
         }
     }
+
+    private final class TimeCounter {
+        private long beginningOfTime;
+        private long delta;
+        private int marksCount = 0;
+
+        public void markStart() {
+            beginningOfTime = System.currentTimeMillis();
+            delta = 0;
+            marksCount = 0;
+        }
+
+        public void addTimeMark() {
+            delta += System.currentTimeMillis() - beginningOfTime;
+            marksCount++;
+        }
+
+        public long getAverageTimeBetweenMarks() {
+            long average = 0;
+            if (marksCount > 0) {
+                average = delta / marksCount;
+            }
+            return average;
+        }
+
+    }
 }
index f855c2ed858394fab326a93cc02a507322ae7d07..9a9738f1cc18c4d188868531db9b14eda3a30f3c 100644 (file)
@@ -43,7 +43,7 @@ public final class MatchUtil {
         matchV10Builder.setNwTos(zeroShort);
         matchV10Builder.setTpDst(zeroInteger);
         matchV10Builder.setTpSrc(zeroInteger);
-        FlowWildcardsV10 flowWildcardsV10 = new FlowWildcardsV10(false, false, false, false, false, false, false, false, false, false);
+        FlowWildcardsV10 flowWildcardsV10 = new FlowWildcardsV10(true, true, true, true, true, true, true, true, true, true);
         matchV10Builder.setWildcards(flowWildcardsV10);
         return matchV10Builder;
     }
index 9a6bb6322cbdb4719ed175282a56765d07d799c4..88c6a6211887b64f8388e54c6513323784104948 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.openflowplugin.impl.device;
 
+import com.google.common.util.concurrent.Futures;
 import io.netty.util.HashedWheelTimer;
 import org.junit.After;
 import org.junit.Before;
@@ -24,6 +25,7 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
@@ -61,6 +63,8 @@ public class TransactionChainManagerTest {
 
         nodeId = new NodeId("h2g2:42");
         path = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(nodeId));
+
+        Mockito.when(writeTx.submit()).thenReturn(Futures.<Void, TransactionCommitFailedException>immediateCheckedFuture(null));
     }
 
     @After
@@ -80,17 +84,18 @@ public class TransactionChainManagerTest {
     @Test
     public void testSubmitTransaction() throws Exception {
         final Node data = new NodeBuilder().setId(nodeId).build();
-        txChainManager.enableCounter();
+        txChainManager.enableSubmit();
         txChainManager.writeToTransaction(LogicalDatastoreType.CONFIGURATION, path, data);
         txChainManager.submitTransaction();
 
         Mockito.verify(txChain).newWriteOnlyTransaction();
         Mockito.verify(writeTx).put(LogicalDatastoreType.CONFIGURATION, path, data);
         Mockito.verify(writeTx).submit();
+        Mockito.verify(writeTx).getIdentifier();
     }
 
     /**
-     * test of {@link TransactionChainManager#enableCounter()}: no submit - counter is not active
+     * test of {@link TransactionChainManager#enableSubmit()}: no submit - counter is not active
      * @throws Exception
      */
     @Test
@@ -104,12 +109,12 @@ public class TransactionChainManagerTest {
     }
 
     /**
-     * test of {@link TransactionChainManager#enableCounter()}: submit - after counter activated
+     * test of {@link TransactionChainManager#enableSubmit()}: submit - after counter activated
      * @throws Exception
      */
     @Test
     public void testEnableCounter2() throws Exception {
-        txChainManager.enableCounter();
+        txChainManager.enableSubmit();
 
         final Node data = new NodeBuilder().setId(nodeId).build();
         txChainManager.writeToTransaction(LogicalDatastoreType.CONFIGURATION, path, data);
@@ -125,19 +130,29 @@ public class TransactionChainManagerTest {
         Mockito.verify(txChain, Mockito.times(2)).newWriteOnlyTransaction();
         Mockito.verify(writeTx, Mockito.times(4)).put(LogicalDatastoreType.CONFIGURATION, path, data);
         Mockito.verify(writeTx, Mockito.times(2)).submit();
+        Mockito.verify(writeTx, Mockito.times(2)).getIdentifier();
     }
 
     @Test
     public void testOnTransactionChainFailed() throws Exception {
+        txChainManager.onTransactionChainFailed(transactionChain, Mockito.mock(AsyncTransaction.class), Mockito.mock(Throwable.class));
+
+        Mockito.verify(txChain).close();
+        Mockito.verify(dataBroker, Mockito.times(2)).createTransactionChain(txChainManager);
+    }
+
+    @Test
+    public void testOnTransactionChainSuccessful() throws Exception {
         txChainManager.onTransactionChainSuccessful(transactionChain);
         // NOOP
+        Mockito.verifyZeroInteractions(transactionChain);
     }
 
     @Test
-    public void testOnTransactionChainSuccessful() throws Exception {
-        txChainManager.onTransactionChainFailed(transactionChain, Mockito.mock(AsyncTransaction.class), Mockito.mock(Throwable.class));
+    public void testAddDeleteOperationTotTxChain() throws Exception {
+        txChainManager.addDeleteOperationTotTxChain(LogicalDatastoreType.CONFIGURATION, path);
 
-        Mockito.verify(txChain).close();
-        Mockito.verify(dataBroker, Mockito.times(2)).createTransactionChain(txChainManager);
+        Mockito.verify(txChain).newWriteOnlyTransaction();
+        Mockito.verify(writeTx).delete(LogicalDatastoreType.CONFIGURATION, path);
     }
 }
\ No newline at end of file
index f13d30185021be0a58dbc78bad0f1ab51f00ce5b..13210aa328cc3af35d9ccf3b22e5142a31b3f340 100644 (file)
@@ -93,9 +93,8 @@ public class FlowHashFactoryTest {
 
         HashSet<FlowHash> flowHashs = new HashSet();
         for (FlowAndStatisticsMapList item : flowStats.getFlowAndStatisticsMapList()) {
-            FlowHash flowHash = FlowHashFactory.create(item, deviceContext);
-            flowHashs.add(flowHash);
-            flowHashs.add(flowHash);
+            flowHashs.add(FlowHashFactory.create(item, deviceContext));
+            flowHashs.add(FlowHashFactory.create(item, deviceContext));
         }
         assertEquals(3, flowHashs.size());
     }