Avoid holding netty threads during resync callbacks
[openflowplugin.git] / applications / forwardingrules-manager / src / main / java / org / opendaylight / openflowplugin / applications / frm / impl / FlowNodeReconciliationImpl.java
index 6655cb24bfc706c82265226a3bad9b36602451de..0cd969e02a74bf6d755798d4338ce16c63636e1f 100644 (file)
@@ -15,6 +15,7 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.JdkFutureAdapters;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.math.BigInteger;
 import java.util.ArrayList;
@@ -29,6 +30,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
@@ -116,7 +118,11 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
     private static final long MAX_ADD_GROUP_TIMEOUT = TimeUnit.SECONDS.toNanos(20);
     private static final String SEPARATOR = ":";
     private static final int THREAD_POOL_SIZE = 4;
-
+    private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder()
+            .setNameFormat("BundleResync-%d")
+            .setDaemon(false)
+            .setUncaughtExceptionHandler((thread, ex) -> LOG.error("Uncaught exception {}", thread, ex))
+            .build();
     private final DataBroker dataBroker;
     private final ForwardingRulesManager provider;
     private final String serviceName;
@@ -182,105 +188,109 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
             Optional<FlowCapableNode> flowNode = Optional.empty();
             BundleId bundleIdValue = new BundleId(BUNDLE_ID.getAndIncrement());
             BigInteger dpnId = getDpnIdFromNodeName(node);
+            ExecutorService service = Executors.newSingleThreadExecutor(THREAD_FACTORY);
             LOG.info("Triggering bundle based reconciliation for device : {}", dpnId);
             try (ReadTransaction trans = provider.getReadTransaction()) {
                 flowNode = trans.read(LogicalDatastoreType.CONFIGURATION, nodeIdentity).get();
             } catch (ExecutionException | InterruptedException e) {
                 LOG.error("Error occurred while reading the configuration data store for node {}", nodeIdentity, e);
             }
-
-            if (flowNode.isPresent()) {
-                LOG.debug("FlowNode present for Datapath ID {}", dpnId);
-                OF_EVENT_LOG.debug("Bundle Reconciliation Start, Node: {}", dpnId);
-                final NodeRef nodeRef = new NodeRef(nodeIdentity.firstIdentifierOf(Node.class));
-
-                final ControlBundleInput closeBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
-                        .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
-                        .setType(BundleControlType.ONFBCTCLOSEREQUEST).build();
-
-                final ControlBundleInput openBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
-                        .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS).setType(BundleControlType.ONFBCTOPENREQUEST)
-                        .build();
-
-                final ControlBundleInput commitBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
-                        .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
-                        .setType(BundleControlType.ONFBCTCOMMITREQUEST).build();
-
-                final AddBundleMessagesInput deleteAllFlowGroupsInput = new AddBundleMessagesInputBuilder()
-                        .setNode(nodeRef).setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
-                        .setMessages(createMessages(nodeRef)).build();
-
-                LOG.debug("Closing openflow bundle for device {}", dpnId);
-                /* Close previously opened bundle on the openflow switch if any */
-                ListenableFuture<RpcResult<ControlBundleOutput>> closeBundle
-                        = salBundleService.controlBundle(closeBundleInput);
-
-                /* Open a new bundle on the switch */
-                ListenableFuture<RpcResult<ControlBundleOutput>> openBundle =
+            try {
+                if (flowNode.isPresent()) {
+                    LOG.debug("FlowNode present for Datapath ID {}", dpnId);
+                    OF_EVENT_LOG.debug("Bundle Reconciliation Start, Node: {}", dpnId);
+                    final NodeRef nodeRef = new NodeRef(nodeIdentity.firstIdentifierOf(Node.class));
+
+                    final ControlBundleInput closeBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
+                            .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
+                            .setType(BundleControlType.ONFBCTCLOSEREQUEST).build();
+
+                    final ControlBundleInput openBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
+                            .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
+                            .setType(BundleControlType.ONFBCTOPENREQUEST).build();
+
+                    final ControlBundleInput commitBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
+                            .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
+                            .setType(BundleControlType.ONFBCTCOMMITREQUEST).build();
+
+                    final AddBundleMessagesInput deleteAllFlowGroupsInput = new AddBundleMessagesInputBuilder()
+                            .setNode(nodeRef).setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
+                            .setMessages(createMessages(nodeRef)).build();
+
+                    LOG.debug("Closing openflow bundle for device {}", dpnId);
+                    /* Close previously opened bundle on the openflow switch if any */
+                    ListenableFuture<RpcResult<ControlBundleOutput>> closeBundle
+                            = salBundleService.controlBundle(closeBundleInput);
+
+                    /* Open a new bundle on the switch */
+                    ListenableFuture<RpcResult<ControlBundleOutput>> openBundle =
                         Futures.transformAsync(closeBundle,
                             rpcResult -> salBundleService.controlBundle(openBundleInput),
-                            MoreExecutors.directExecutor());
-
-                /* Push groups and flows via bundle add messages */
-                ListenableFuture<RpcResult<AddBundleMessagesOutput>> deleteAllFlowGroupsFuture
-                        = Futures.transformAsync(openBundle, rpcResult -> {
-                            if (rpcResult.isSuccessful()) {
-                                return salBundleService.addBundleMessages(deleteAllFlowGroupsInput);
-                            }
-                            return Futures.immediateFuture(null);
-                        }, MoreExecutors.directExecutor());
+                                service);
 
-                /* Push flows and groups via bundle add messages */
-                Optional<FlowCapableNode> finalFlowNode = flowNode;
-                ListenableFuture<List<RpcResult<AddBundleMessagesOutput>>> addbundlesFuture
-                        = Futures.transformAsync(deleteAllFlowGroupsFuture, rpcResult -> {
+                    /* Push groups and flows via bundle add messages */
+                    ListenableFuture<RpcResult<AddBundleMessagesOutput>> deleteAllFlowGroupsFuture
+                            = Futures.transformAsync(openBundle, rpcResult -> {
+                                if (rpcResult.isSuccessful()) {
+                                    return salBundleService.addBundleMessages(deleteAllFlowGroupsInput);
+                                }
+                                return Futures.immediateFuture(null);
+                            }, service);
+
+                    /* Push flows and groups via bundle add messages */
+                    Optional<FlowCapableNode> finalFlowNode = flowNode;
+                    ListenableFuture<List<RpcResult<AddBundleMessagesOutput>>> addbundlesFuture
+                            = Futures.transformAsync(deleteAllFlowGroupsFuture, rpcResult -> {
+                                if (rpcResult.isSuccessful()) {
+                                    LOG.debug("Adding delete all flow/group message is successful for device {}",dpnId);
+                                    return Futures.allAsList(addBundleMessages(finalFlowNode.get(), bundleIdValue,
+                                            nodeIdentity));
+                                }
+                                return Futures.immediateFuture(null);
+                            }, service);
+
+                    /* Commit the bundle on the openflow switch */
+                    ListenableFuture<RpcResult<ControlBundleOutput>> commitBundleFuture = Futures.transformAsync(
+                            addbundlesFuture, rpcResult -> {
+                            LOG.debug("Adding bundle messages completed for device {}", dpnId);
+                            return salBundleService.controlBundle(commitBundleInput);
+                        }, service);
+
+                    /* Bundles not supported for meters */
+                    List<Meter> meters = flowNode.get().getMeter() != null ? flowNode.get().getMeter()
+                            : Collections.emptyList();
+                    Futures.transformAsync(commitBundleFuture,
+                        rpcResult -> {
                             if (rpcResult.isSuccessful()) {
-                                LOG.debug("Adding delete all flow/group message is successful for device {}", dpnId);
-                                return Futures.allAsList(addBundleMessages(finalFlowNode.get(), bundleIdValue,
-                                        nodeIdentity));
+                                for (Meter meter : meters) {
+                                    final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdentity
+                                            .child(Meter.class, meter.key());
+                                    provider.getMeterCommiter().add(meterIdent, meter, nodeIdentity);
+                                }
                             }
                             return Futures.immediateFuture(null);
-                        }, MoreExecutors.directExecutor());
-
-                /* Commit the bundle on the openflow switch */
-                ListenableFuture<RpcResult<ControlBundleOutput>> commitBundleFuture = Futures.transformAsync(
-                        addbundlesFuture, rpcResult -> {
-                        LOG.debug("Adding bundle messages completed for device {}", dpnId);
-                        return salBundleService.controlBundle(commitBundleInput);
-                    }, MoreExecutors.directExecutor());
-
-                /* Bundles not supported for meters */
-                List<Meter> meters = flowNode.get().getMeter() != null ? flowNode.get().getMeter()
-                        : Collections.emptyList();
-                Futures.transformAsync(commitBundleFuture,
-                    rpcResult -> {
-                        if (rpcResult.isSuccessful()) {
-                            for (Meter meter : meters) {
-                                final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdentity
-                                        .child(Meter.class, meter.key());
-                                provider.getMeterCommiter().add(meterIdent, meter, nodeIdentity);
-                            }
+                        }, service);
+                    try {
+                        RpcResult<ControlBundleOutput> bundleFuture = commitBundleFuture.get();
+                        if (bundleFuture != null && bundleFuture.isSuccessful()) {
+                            LOG.debug("Completing bundle based reconciliation for device ID:{}", dpnId);
+                            OF_EVENT_LOG.debug("Bundle Reconciliation Finish, Node: {}", dpnId);
+                            return true;
+                        } else {
+                            LOG.error("commit bundle failed for device {} with error {}", dpnId,
+                                    commitBundleFuture.get().getErrors());
+                            return false;
                         }
-                        return Futures.immediateFuture(null);
-                    }, MoreExecutors.directExecutor());
-                try {
-                    RpcResult<ControlBundleOutput> bundleFuture = commitBundleFuture.get();
-                    if (bundleFuture != null && bundleFuture.isSuccessful()) {
-                        LOG.debug("Completing bundle based reconciliation for device ID:{}", dpnId);
-                        OF_EVENT_LOG.debug("Bundle Reconciliation Finish, Node: {}", dpnId);
-                        return true;
-                    } else {
-                        LOG.error("commit bundle failed for device {} with error {}", dpnId,
-                                commitBundleFuture.get().getErrors());
+                    } catch (InterruptedException | ExecutionException e) {
+                        LOG.error("Error while doing bundle based reconciliation for device ID:{}", dpnId);
                         return false;
                     }
-                } catch (InterruptedException | ExecutionException e) {
-                    LOG.error("Error while doing bundle based reconciliation for device ID:{}", dpnId);
-                    return false;
                 }
+                LOG.error("FlowNode not present for Datapath ID {}", dpnId);
+                return false;
+            }  finally {
+                service.shutdown();
             }
-            LOG.error("FlowNode not present for Datapath ID {}", dpnId);
-            return false;
         }
     }