OPNFLWPLUG-1075: Making Device Oper transactions atomic 32/85832/2
authorVenkataSatya Jonnadula <rsankar@luminanetworks.com>
Mon, 18 Nov 2019 14:22:24 +0000 (19:52 +0530)
committerVenkataSatya Jonnadula <rsankar@luminanetworks.com>
Mon, 25 Nov 2019 09:23:26 +0000 (14:53 +0530)
Signed-off-by: VenkataSatya Jonnadula <rsankar@luminanetworks.com>
Change-Id: I7b2ed09633367e01229c7ea83063525c688821e8

openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/TxFacade.java
openflowplugin-common/src/main/java/org/opendaylight/openflowplugin/common/txchain/TransactionChainManager.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsGatheringUtils.java

index 79e033abbcd00ecc66ad8f5e378c35edadd4131e..1fb146848c31597b7bd9c21671160f2be444a58b 100644 (file)
@@ -63,4 +63,15 @@ public interface TxFacade {
      * @return is transaction chain manager enabled
      */
     boolean isTransactionsEnabled();
+
+    /**
+     * Method to acquire a write transaction lock to perform atomic transactions to MD-SAL.
+     */
+    void acquireWriteTransactionLock();
+
+    /**
+     * Method to release the acquired write transaction lock.
+     */
+    void releaseWriteTransactionLock();
+
 }
index 655d935584fad19be382a309db189b730896d9fa..949290252cbe0a1fd7f6b266acc2ea999a7ba7f9 100755 (executable)
@@ -17,6 +17,8 @@ import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.checkerframework.checker.lock.qual.Holding;
 import org.eclipse.jdt.annotation.NonNull;
@@ -64,6 +66,7 @@ public class TransactionChainManager implements TransactionChainListener, AutoCl
 
     @GuardedBy("txLock")
     private TransactionChainManagerStatus transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
+    private ReadWriteLock readWriteTransactionLock = new ReentrantReadWriteLock();
 
     public TransactionChainManager(@NonNull final DataBroker dataBroker,
                                    @NonNull final String deviceIdentifier) {
@@ -369,4 +372,13 @@ public class TransactionChainManager implements TransactionChainListener, AutoCl
          */
         SHUTTING_DOWN
     }
+
+    public void acquireWriteTransactionLock() {
+        readWriteTransactionLock.writeLock().lock();
+    }
+
+    public void releaseWriteTransactionLock() {
+        readWriteTransactionLock.writeLock().unlock();
+    }
+
 }
index cf05cb1d279ba20960d5a471d531b2c4ea6837e5..b9ba54865411f0530962a6718973151d43fea283 100644 (file)
@@ -353,6 +353,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
         String datapathId = deviceInfo.getDatapathId().toString().intern();
         queuedNotificationManager.submitNotification(datapathId, () -> {
             try {
+                acquireWriteTransactionLock();
                 final FlowCapableNodeConnector flowCapableNodeConnector = portStatusTranslator
                         .translate(portStatusMessage, getDeviceInfo(), null);
                 OF_EVENT_LOG.debug("Node Connector Status, Node: {}, PortNumber: {}, PortName: {}, Reason: {}",
@@ -381,6 +382,8 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
             } catch (final Exception e) {
                 LOG.warn("Error processing port status message for port {} on device {}",
                         portStatusMessage.getPortNo(), datapathId, e);
+            } finally {
+                releaseWriteTransactionLock();
             }
         });
     }
@@ -606,6 +609,16 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
         return deviceInfo.getServiceIdentifier();
     }
 
+    @Override
+    public void acquireWriteTransactionLock() {
+        transactionChainManager.acquireWriteTransactionLock();
+    }
+
+    @Override
+    public void releaseWriteTransactionLock() {
+        transactionChainManager.releaseWriteTransactionLock();
+    }
+
     @Override
     public void close() {
         // Close all datastore registries and transactions
index b75f99b208f385b661e6e8f12573c53828996ebd..5296890d2caacca8d7f375bb37b7246ced506cfd 100755 (executable)
@@ -101,35 +101,43 @@ public final class StatisticsGatheringUtils {
             }), MoreExecutors.directExecutor());
     }
 
+    @SuppressWarnings("checkstyle:IllegalCatch")
     private static boolean processStatistics(final MultipartType type, final List<? extends DataContainer> statistics,
                                              final TxFacade txFacade, final DeviceRegistry deviceRegistry,
                                              final DeviceInfo deviceInfo,
                                              final MultipartWriterProvider statisticsWriterProvider) {
         final InstanceIdentifier<FlowCapableNode> instanceIdentifier = deviceInfo.getNodeInstanceIdentifier()
                 .augmentation(FlowCapableNode.class);
+        try {
+            txFacade.acquireWriteTransactionLock();
+            switch (type) {
+                case OFPMPFLOW:
+                    deleteAllKnownFlows(txFacade, instanceIdentifier, deviceRegistry.getDeviceFlowRegistry());
+                    deviceRegistry.getDeviceFlowRegistry().processMarks();
+                    break;
+                case OFPMPMETERCONFIG:
+                    deleteAllKnownMeters(txFacade, instanceIdentifier, deviceRegistry.getDeviceMeterRegistry());
+                    deviceRegistry.getDeviceMeterRegistry().processMarks();
+                    break;
+                case OFPMPGROUPDESC:
+                    deleteAllKnownGroups(txFacade, instanceIdentifier, deviceRegistry.getDeviceGroupRegistry());
+                    deviceRegistry.getDeviceGroupRegistry().processMarks();
+                    break;
+                default:
+                    // no operation
+            }
 
-        switch (type) {
-            case OFPMPFLOW:
-                deleteAllKnownFlows(txFacade, instanceIdentifier, deviceRegistry.getDeviceFlowRegistry());
-                deviceRegistry.getDeviceFlowRegistry().processMarks();
-                break;
-            case OFPMPMETERCONFIG:
-                deleteAllKnownMeters(txFacade, instanceIdentifier, deviceRegistry.getDeviceMeterRegistry());
-                deviceRegistry.getDeviceMeterRegistry().processMarks();
-                break;
-            case OFPMPGROUPDESC:
-                deleteAllKnownGroups(txFacade, instanceIdentifier, deviceRegistry.getDeviceGroupRegistry());
-                deviceRegistry.getDeviceGroupRegistry().processMarks();
-                break;
-            default:
-                // no operation
-        }
-
-        if (writeStatistics(type, statistics, deviceInfo, statisticsWriterProvider)) {
-            txFacade.submitTransaction();
+            if (writeStatistics(type, statistics, deviceInfo, statisticsWriterProvider)) {
+                txFacade.submitTransaction();
 
-            LOG.debug("Stats reply added to transaction for node {} of type {}", deviceInfo.getNodeId(), type);
-            return true;
+                LOG.debug("Stats reply added to transaction for node {} of type {}", deviceInfo.getNodeId(), type);
+                return true;
+            }
+        } catch (Exception e) {
+            LOG.error("Exception while writing statistics to operational inventory for the device {}",
+                    deviceInfo.getLOGValue(), e);
+        } finally {
+            txFacade.releaseWriteTransactionLock();
         }
 
         LOG.warn("Stats processing of type {} for node {} " + "failed during write-to-tx step", type, deviceInfo);