Use managed transactions in statistics-impl 77/70777/4
authorStephen Kitt <skitt@redhat.com>
Wed, 11 Apr 2018 12:48:04 +0000 (14:48 +0200)
committerSam Hague <shague@redhat.com>
Tue, 1 May 2018 00:46:40 +0000 (00:46 +0000)
This also enforces restrictions on newReadWriteTransaction and
newWriteOnlyTransaction calls, to prevent new code introducing
unmanaged transactions.

Change-Id: Ic4cb8f955fe7942214a740115934014c0815524c
Signed-off-by: Stephen Kitt <skitt@redhat.com>
statistics/impl/pom.xml
statistics/impl/src/main/java/org/opendaylight/netvirt/statistics/StatisticsImpl.java

index f57d86e4a09cc5ff08d1ae94f03a25fcdee3e44f..30e10f1feb6e6483a17ae6519023061b9379e4e8 100644 (file)
@@ -10,9 +10,9 @@ at http://www.eclipse.org/legal/epl-v10.html
 
     <parent>
         <groupId>org.opendaylight.netvirt</groupId>
-        <artifactId>binding-parent</artifactId>
+        <artifactId>managed-tx-parent</artifactId>
         <version>0.7.0-SNAPSHOT</version>
-        <relativePath>../../commons/binding-parent</relativePath>
+        <relativePath>../../commons/managed-tx-parent</relativePath>
     </parent>
 
     <artifactId>statistics-impl</artifactId>
index 6a8798101633148d1423fc02d94a30410a32c82b..fbe04c5ea53ad3ddaf81791ad50d77574fd1d239 100644 (file)
@@ -13,6 +13,7 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.JdkFutureAdapters;
 import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.math.BigInteger;
 import java.util.ArrayList;
@@ -33,12 +34,15 @@ import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
 import org.opendaylight.genius.interfacemanager.globals.InterfaceInfo;
 import org.opendaylight.genius.interfacemanager.interfaces.IInterfaceManager;
 import org.opendaylight.genius.mdsalutil.MDSALUtil;
 import org.opendaylight.genius.mdsalutil.MatchInfoBase;
 import org.opendaylight.genius.mdsalutil.interfaces.IMdsalApiManager;
 import org.opendaylight.infrautils.counters.api.OccurenceCounter;
+import org.opendaylight.infrautils.utils.concurrent.ListenableFutures;
 import org.opendaylight.netvirt.statistics.api.ICountersInterfaceChangeHandler;
 import org.opendaylight.netvirt.vpnmanager.api.InterfaceUtils;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.interfaces.rev140508.interfaces.state.Interface;
@@ -105,6 +109,7 @@ import org.slf4j.LoggerFactory;
 public class StatisticsImpl implements StatisticsService, ICountersInterfaceChangeHandler {
     private static final Logger LOG = LoggerFactory.getLogger(StatisticsImpl.class);
     private final DataBroker db;
+    private final ManagedNewTransactionRunner txRunner;
     private final CounterRetriever counterRetriever;
     private final IInterfaceManager interfaceManager;
     private final IMdsalApiManager mdsalApiManager;
@@ -113,6 +118,7 @@ public class StatisticsImpl implements StatisticsService, ICountersInterfaceChan
     public StatisticsImpl(DataBroker db, CounterRetriever counterRetriever, IInterfaceManager interfaceManager,
             IMdsalApiManager mdsalApiManager, IdManagerService idManagerService) {
         this.db = db;
+        this.txRunner = new ManagedNewTransactionRunnerImpl(db);
         this.counterRetriever = counterRetriever;
         this.interfaceManager = interfaceManager;
         this.mdsalApiManager = mdsalApiManager;
@@ -215,7 +221,6 @@ public class StatisticsImpl implements StatisticsService, ICountersInterfaceChan
             AcquireElementCountersRequestHandlerInput input) {
         AcquireElementCountersRequestHandlerOutputBuilder aecrhob =
                 new AcquireElementCountersRequestHandlerOutputBuilder();
-        ReadWriteTransaction transaction = db.newReadWriteTransaction();
         UUID randomNumber = UUID.randomUUID();
         Integer intRequestKey = allocateId(randomNumber.toString());
         if (intRequestKey == null) {
@@ -225,66 +230,68 @@ public class StatisticsImpl implements StatisticsService, ICountersInterfaceChan
                     .withError(ErrorType.APPLICATION, "failed generating unique request identifier").buildFuture();
         }
         String requestKey = String.valueOf(intRequestKey);
-
-        try {
-            if (input.getIncomingTraffic() != null) {
-                CheckedFuture<Optional<EgressElementCountersRequestConfig>, ReadFailedException> eecrc =
-                        transaction.read(LogicalDatastoreType.CONFIGURATION, CountersServiceUtils.EECRC_IDENTIFIER);
-                Optional<EgressElementCountersRequestConfig> eecrcOpt = eecrc.get();
-                if (!eecrcOpt.isPresent()) {
-                    LOG.warn("failed creating incoming traffic counter request data container in DB");
-                    StatisticsPluginImplCounters.failed_creating_egress_counter_data_config.inc();
-                    return RpcResultBuilder.<AcquireElementCountersRequestHandlerOutput>failed()
-                            .withError(ErrorType.APPLICATION,
-                                    "failed creating egress counter request data container in DB")
-                            .buildFuture();
-                }
-                if (!isIdenticalCounterRequestExist(input.getPortId(), ElementCountersDirection.EGRESS.toString(),
-                        input.getIncomingTraffic().getFilters(), eecrcOpt.get().getCounterRequests())) {
-                    installCounterSpecificRules(input.getPortId(), getLportTag(input.getPortId()),
-                            getDpn(input.getPortId()), ElementCountersDirection.EGRESS,
-                            input.getIncomingTraffic().getFilters());
+        SettableFuture<RpcResult<AcquireElementCountersRequestHandlerOutput>> result = SettableFuture.create();
+
+        ListenableFutures.addErrorLogging(
+            txRunner.callWithNewReadWriteTransactionAndSubmit(transaction -> {
+                if (input.getIncomingTraffic() != null) {
+                    Optional<EgressElementCountersRequestConfig> eecrcOpt =
+                            transaction.read(LogicalDatastoreType.CONFIGURATION,
+                                    CountersServiceUtils.EECRC_IDENTIFIER).checkedGet();
+                    if (!eecrcOpt.isPresent()) {
+                        LOG.warn("failed creating incoming traffic counter request data container in DB");
+                        StatisticsPluginImplCounters.failed_creating_egress_counter_data_config.inc();
+                        result.setFuture(RpcResultBuilder.<AcquireElementCountersRequestHandlerOutput>failed()
+                                .withError(ErrorType.APPLICATION,
+                                        "failed creating egress counter request data container in DB")
+                                .buildFuture());
+                        return;
+                    }
+                    if (!isIdenticalCounterRequestExist(input.getPortId(), ElementCountersDirection.EGRESS.toString(),
+                            input.getIncomingTraffic().getFilters(), eecrcOpt.get().getCounterRequests())) {
+                        installCounterSpecificRules(input.getPortId(), getLportTag(input.getPortId()),
+                                getDpn(input.getPortId()), ElementCountersDirection.EGRESS,
+                                input.getIncomingTraffic().getFilters());
+                    }
+                    putEgressElementCounterRequestInConfig(input, ElementCountersDirection.EGRESS, transaction,
+                            requestKey, CountersServiceUtils.EECRC_IDENTIFIER, eecrcOpt, randomNumber.toString());
+
+                    aecrhob.setIncomingTrafficHandler(requestKey);
+
+                    bindCountersServiceIfUnbound(input.getPortId(), ElementCountersDirection.EGRESS);
                 }
-                putEgressElementCounterRequestInConfig(input, ElementCountersDirection.EGRESS, transaction, requestKey,
-                        CountersServiceUtils.EECRC_IDENTIFIER, eecrcOpt, randomNumber.toString());
-                transaction.submit();
-                aecrhob.setIncomingTrafficHandler(requestKey);
 
-                bindCountersServiceIfUnbound(input.getPortId(), ElementCountersDirection.EGRESS);
-            }
-            if (input.getOutgoingTraffic() != null) {
-                transaction = db.newReadWriteTransaction();
-                CheckedFuture<Optional<IngressElementCountersRequestConfig>, ReadFailedException> iecrc =
-                        transaction.read(LogicalDatastoreType.CONFIGURATION, CountersServiceUtils.IECRC_IDENTIFIER);
-                Optional<IngressElementCountersRequestConfig> iecrcOpt = iecrc.get();
-                if (!iecrcOpt.isPresent()) {
-                    LOG.warn("failed creating outgoing traffc counter request data container in DB");
-                    StatisticsPluginImplCounters.failed_creating_ingress_counter_data_config.inc();
-                    return RpcResultBuilder.<AcquireElementCountersRequestHandlerOutput>failed()
-                            .withError(ErrorType.APPLICATION,
-                                    "failed creating ingress counter request data container in DB")
-                            .buildFuture();
-                }
-                if (!isIdenticalCounterRequestExist(input.getPortId(), ElementCountersDirection.INGRESS.toString(),
-                        input.getOutgoingTraffic().getFilters(), iecrcOpt.get().getCounterRequests())) {
-                    installCounterSpecificRules(input.getPortId(), getLportTag(input.getPortId()),
-                            getDpn(input.getPortId()), ElementCountersDirection.INGRESS,
-                            input.getOutgoingTraffic().getFilters());
+                if (input.getOutgoingTraffic() != null) {
+                    Optional<IngressElementCountersRequestConfig> iecrcOpt =
+                            transaction.read(LogicalDatastoreType.CONFIGURATION,
+                                    CountersServiceUtils.IECRC_IDENTIFIER).checkedGet();
+                    if (!iecrcOpt.isPresent()) {
+                        LOG.warn("failed creating outgoing traffc counter request data container in DB");
+                        StatisticsPluginImplCounters.failed_creating_ingress_counter_data_config.inc();
+                        result.setFuture(RpcResultBuilder.<AcquireElementCountersRequestHandlerOutput>failed()
+                                .withError(ErrorType.APPLICATION,
+                                        "failed creating ingress counter request data container in DB")
+                                .buildFuture());
+                        return;
+                    }
+                    if (!isIdenticalCounterRequestExist(input.getPortId(), ElementCountersDirection.INGRESS.toString(),
+                            input.getOutgoingTraffic().getFilters(), iecrcOpt.get().getCounterRequests())) {
+                        installCounterSpecificRules(input.getPortId(), getLportTag(input.getPortId()),
+                                getDpn(input.getPortId()), ElementCountersDirection.INGRESS,
+                                input.getOutgoingTraffic().getFilters());
+                    }
+                    putIngressElementCounterRequestInConfig(input, ElementCountersDirection.INGRESS, transaction,
+                            requestKey, CountersServiceUtils.IECRC_IDENTIFIER, iecrcOpt, randomNumber.toString());
+
+                    aecrhob.setIncomingTrafficHandler(requestKey);
+
+                    bindCountersServiceIfUnbound(input.getPortId(), ElementCountersDirection.INGRESS);
+
+                    result.setFuture(RpcResultBuilder.success(aecrhob.build()).buildFuture());
                 }
-                putIngressElementCounterRequestInConfig(input, ElementCountersDirection.INGRESS, transaction,
-                        requestKey, CountersServiceUtils.IECRC_IDENTIFIER, iecrcOpt, randomNumber.toString());
-                transaction.submit();
-                aecrhob.setIncomingTrafficHandler(requestKey);
-
-                bindCountersServiceIfUnbound(input.getPortId(), ElementCountersDirection.INGRESS);
-            }
-        } catch (InterruptedException | ExecutionException e) {
-            LOG.warn("failed to get counter request data from DB");
-            return RpcResultBuilder.<AcquireElementCountersRequestHandlerOutput>failed()
-                    .withError(ErrorType.APPLICATION, "failed to get counter request data from DB").buildFuture();
-        }
+            }), LOG, "Error acquiring element counters");
 
-        return RpcResultBuilder.success(aecrhob.build()).buildFuture();
+        return result;
     }
 
     @Override
@@ -297,49 +304,46 @@ public class StatisticsImpl implements StatisticsService, ICountersInterfaceChan
                 InstanceIdentifier.builder(EgressElementCountersRequestConfig.class)
                         .child(CounterRequests.class, new CounterRequestsKey(input.getHandler())).build();
 
-        ReadWriteTransaction tx = db.newReadWriteTransaction();
-        CheckedFuture<Optional<CounterRequests>, ReadFailedException> ingressRequestData =
-                tx.read(LogicalDatastoreType.CONFIGURATION, ingressPath);
-        CheckedFuture<Optional<CounterRequests>, ReadFailedException> egressRequestData =
-                tx.read(LogicalDatastoreType.CONFIGURATION, egressPath);
-        CheckedFuture<Optional<IngressElementCountersRequestConfig>, ReadFailedException> iecrc =
-                tx.read(LogicalDatastoreType.CONFIGURATION, CountersServiceUtils.IECRC_IDENTIFIER);
-        CheckedFuture<Optional<EgressElementCountersRequestConfig>, ReadFailedException> eecrc =
-                tx.read(LogicalDatastoreType.CONFIGURATION, CountersServiceUtils.EECRC_IDENTIFIER);
-
-        try {
-            Optional<IngressElementCountersRequestConfig> iecrcOpt = iecrc.get();
-            Optional<EgressElementCountersRequestConfig> eecrcOpt = eecrc.get();
+        SettableFuture<RpcResult<ReleaseElementCountersRequestHandlerOutput>> result = SettableFuture.create();
+        ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
+            Optional<IngressElementCountersRequestConfig> iecrcOpt =
+                    tx.read(LogicalDatastoreType.CONFIGURATION, CountersServiceUtils.IECRC_IDENTIFIER).checkedGet();
+            Optional<EgressElementCountersRequestConfig> eecrcOpt =
+                    tx.read(LogicalDatastoreType.CONFIGURATION, CountersServiceUtils.EECRC_IDENTIFIER).checkedGet();
             if (!iecrcOpt.isPresent() || !eecrcOpt.isPresent()) {
                 LOG.warn("Couldn't read element counters config data from DB");
                 StatisticsPluginImplCounters.failed_reading_counter_data_from_config.inc();
-                return RpcResultBuilder.<ReleaseElementCountersRequestHandlerOutput>failed()
+                result.setFuture(RpcResultBuilder.<ReleaseElementCountersRequestHandlerOutput>failed()
                         .withError(ErrorType.APPLICATION, "Couldn't read element counters config data from DB")
-                        .buildFuture();
+                        .buildFuture());
+                return;
             }
-            if (!ingressRequestData.get().isPresent() && !egressRequestData.get().isPresent()) {
+            Optional<CounterRequests> ingressRequestOpt =
+                    tx.read(LogicalDatastoreType.CONFIGURATION, ingressPath).checkedGet();
+            Optional<CounterRequests> egressRequestOpt =
+                    tx.read(LogicalDatastoreType.CONFIGURATION, egressPath).checkedGet();
+            if (!ingressRequestOpt.isPresent() && !egressRequestOpt.isPresent()) {
                 LOG.warn("Handler does not exists");
                 StatisticsPluginImplCounters.unknown_request_handler.inc();
-                return RpcResultBuilder.<ReleaseElementCountersRequestHandlerOutput>failed()
-                        .withError(ErrorType.APPLICATION, "Handler does not exists").buildFuture();
+                result.setFuture(RpcResultBuilder.<ReleaseElementCountersRequestHandlerOutput>failed()
+                        .withError(ErrorType.APPLICATION, "Handler does not exists").buildFuture());
+                return;
             }
             String generatedKey = null;
-            if (ingressRequestData.get().isPresent()) {
-                handleReleaseTransaction(input, ingressPath, ingressRequestData, iecrcOpt.get().getCounterRequests());
-                generatedKey = ingressRequestData.get().get().getGeneratedUniqueId();
+            if (ingressRequestOpt.isPresent()) {
+                handleReleaseTransaction(tx, input, ingressPath, ingressRequestOpt,
+                        iecrcOpt.get().getCounterRequests());
+                generatedKey = ingressRequestOpt.get().getGeneratedUniqueId();
             }
-            if (egressRequestData.get().isPresent()) {
-                handleReleaseTransaction(input, egressPath, egressRequestData, eecrcOpt.get().getCounterRequests());
-                generatedKey = egressRequestData.get().get().getGeneratedUniqueId();
+            if (egressRequestOpt.isPresent()) {
+                handleReleaseTransaction(tx, input, egressPath, egressRequestOpt, eecrcOpt.get().getCounterRequests());
+                generatedKey = egressRequestOpt.get().getGeneratedUniqueId();
             }
             releaseId(generatedKey);
-        } catch (InterruptedException | ExecutionException e) {
-            LOG.warn("failed to get counter request data from DB");
-            return RpcResultBuilder.<ReleaseElementCountersRequestHandlerOutput>failed()
-                    .withError(ErrorType.APPLICATION, "failed to get counter request data from DB").buildFuture();
-        }
+            result.setFuture(RpcResultBuilder.<ReleaseElementCountersRequestHandlerOutput>success().buildFuture());
+        }), LOG, "Error releasing element counters");
 
-        return RpcResultBuilder.<ReleaseElementCountersRequestHandlerOutput>success().buildFuture();
+        return result;
     }
 
     @Override
@@ -532,19 +536,22 @@ public class StatisticsImpl implements StatisticsService, ICountersInterfaceChan
     }
 
     private void deleteCounterRequest(CounterRequests counterRequest, ElementCountersDirection direction) {
-        WriteTransaction tx = db.newWriteOnlyTransaction();
-        if (ElementCountersDirection.INGRESS.equals(direction)) {
-            tx.delete(LogicalDatastoreType.CONFIGURATION, InstanceIdentifier
-                    .builder(IngressElementCountersRequestConfig.class)
-                    .child(CounterRequests.class, new CounterRequestsKey(counterRequest.getKey().getRequestId()))
-                    .build());
-        } else if (ElementCountersDirection.EGRESS.equals(direction)) {
-            tx.delete(LogicalDatastoreType.CONFIGURATION, InstanceIdentifier
-                    .builder(EgressElementCountersRequestConfig.class)
-                    .child(CounterRequests.class, new CounterRequestsKey(counterRequest.getKey().getRequestId()))
-                    .build());
-        }
-        tx.submit();
+        ListenableFutures.addErrorLogging(
+            txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
+                if (ElementCountersDirection.INGRESS.equals(direction)) {
+                    tx.delete(LogicalDatastoreType.CONFIGURATION, InstanceIdentifier
+                            .builder(IngressElementCountersRequestConfig.class)
+                            .child(CounterRequests.class,
+                                    new CounterRequestsKey(counterRequest.getKey().getRequestId()))
+                            .build());
+                } else if (ElementCountersDirection.EGRESS.equals(direction)) {
+                    tx.delete(LogicalDatastoreType.CONFIGURATION, InstanceIdentifier
+                            .builder(EgressElementCountersRequestConfig.class)
+                            .child(CounterRequests.class,
+                                    new CounterRequestsKey(counterRequest.getKey().getRequestId()))
+                            .build());
+                }
+            }), LOG, "Error deleting counter");
     }
 
     private CounterResultDataStructure createElementCountersResult(CounterRequests counterRequest) {
@@ -565,36 +572,30 @@ public class StatisticsImpl implements StatisticsService, ICountersInterfaceChan
     }
 
     private void initializeCountrsConfigDataSrore() {
-        ReadWriteTransaction transaction = db.newReadWriteTransaction();
-        CheckedFuture<Optional<IngressElementCountersRequestConfig>, ReadFailedException> iecrc =
-                transaction.read(LogicalDatastoreType.CONFIGURATION, CountersServiceUtils.IECRC_IDENTIFIER);
-        CheckedFuture<Optional<EgressElementCountersRequestConfig>, ReadFailedException> eecrc =
-                transaction.read(LogicalDatastoreType.CONFIGURATION, CountersServiceUtils.EECRC_IDENTIFIER);
-        try {
-            Optional<IngressElementCountersRequestConfig> iecrcOpt = iecrc.get();
-            if (!iecrcOpt.isPresent()) {
-                creatIngressEelementCountersContainerInConfig(transaction, CountersServiceUtils.IECRC_IDENTIFIER);
-            }
-
-            Optional<EgressElementCountersRequestConfig> eecrcOpt = eecrc.get();
-            if (!eecrcOpt.isPresent()) {
-                creatEgressEelementCountersContainerInConfig(transaction, CountersServiceUtils.EECRC_IDENTIFIER);
-            }
-            transaction.submit();
-        } catch (InterruptedException | ExecutionException e) {
-            StatisticsPluginImplCounters.failed_creating_counters_config.inc();
-            LOG.warn("failed creating counters config data structure in DB");
-        }
+        ListenableFutures.addErrorLogging(
+            txRunner.callWithNewReadWriteTransactionAndSubmit(transaction -> {
+                Optional<IngressElementCountersRequestConfig> iecrcOpt =
+                        transaction.read(LogicalDatastoreType.CONFIGURATION,
+                                CountersServiceUtils.IECRC_IDENTIFIER).checkedGet();
+                Optional<EgressElementCountersRequestConfig> eecrcOpt =
+                        transaction.read(LogicalDatastoreType.CONFIGURATION,
+                                CountersServiceUtils.EECRC_IDENTIFIER).checkedGet();
+                if (!iecrcOpt.isPresent()) {
+                    creatIngressEelementCountersContainerInConfig(transaction,
+                            CountersServiceUtils.IECRC_IDENTIFIER);
+                }
+                if (!eecrcOpt.isPresent()) {
+                    creatEgressEelementCountersContainerInConfig(transaction,
+                            CountersServiceUtils.EECRC_IDENTIFIER);
+                }
+            }), LOG, "Failed to create counters in config datastore");
     }
 
-    private void handleReleaseTransaction(ReleaseElementCountersRequestHandlerInput input,
-            InstanceIdentifier<CounterRequests> path,
-            CheckedFuture<Optional<CounterRequests>, ReadFailedException> requestData,
-            List<CounterRequests> counterRequests) throws InterruptedException, ExecutionException {
-        WriteTransaction wt = db.newWriteOnlyTransaction();
-        wt.delete(LogicalDatastoreType.CONFIGURATION, path);
-        wt.submit();
-        CounterRequests counterRequest = requestData.get().get();
+    private void handleReleaseTransaction(WriteTransaction transaction, ReleaseElementCountersRequestHandlerInput input,
+            InstanceIdentifier<CounterRequests> path, Optional<CounterRequests> requestData,
+            List<CounterRequests> counterRequests) {
+        transaction.delete(LogicalDatastoreType.CONFIGURATION, path);
+        CounterRequests counterRequest = requestData.get();
         if (shouldUnbindCountersService(counterRequest.getPortId(), counterRequest.getKey().getRequestId(),
                 counterRequests)) {
             unbindCountersServiceIfBound(counterRequest.getPortId(),