Bug 5578 Improve frsync (threadpool) 67/40667/4
authorAndrej Leitner <anleitne@cisco.com>
Wed, 25 May 2016 11:22:52 +0000 (13:22 +0200)
committerAndrej Leitner <anleitne@cisco.com>
Thu, 30 Jun 2016 14:16:08 +0000 (16:16 +0200)
  - cached thread pool instead of fixed thread pool
  - FrmExecutors removed cause it's unnecessary
  - removed loopers for listeners registration,
    not needed since using onSessionInitiated()
  - handeled interrupted exception in guard decorator

Change-Id: I8255944aeedc29b147cfb637058dbd7b2af5f9d1
Signed-off-by: Andrej Leitner <anleitne@cisco.com>
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/ForwardingRulesSyncProvider.java
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/FrmExecutors.java [deleted file]
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorGuardDecorator.java
applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureZipDecoratorTest.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/SalFlatBatchServiceImpl.java

index 24bfeafe06522d730c736a3bd82a83491562a42e..0e5ae5e568439bec77f662be6b8920c537ab698e 100644 (file)
@@ -10,9 +10,11 @@ package org.opendaylight.openflowplugin.applications.frsync.impl;
 
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.lang.Thread.UncaughtExceptionHandler;
-import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
@@ -29,7 +31,6 @@ import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeSn
 import org.opendaylight.openflowplugin.applications.frsync.impl.strategy.SyncPlanPushStrategyFlatBatchImpl;
 import org.opendaylight.openflowplugin.applications.frsync.util.RetryRegistry;
 import org.opendaylight.openflowplugin.applications.frsync.util.SemaphoreKeeperGuavaImpl;
-import org.opendaylight.openflowplugin.common.wait.SimpleTaskRetryLooper;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.SalFlatBatchService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
@@ -46,8 +47,6 @@ import org.slf4j.LoggerFactory;
 public class ForwardingRulesSyncProvider implements AutoCloseable, BindingAwareProvider {
 
     private static final Logger LOG = LoggerFactory.getLogger(ForwardingRulesSyncProvider.class);
-    private static final int STARTUP_LOOP_TICK = 500;
-    private static final int STARTUP_LOOP_MAX_RETRIES = 8;
 
     private final DataBroker dataService;
     private final SalTableService salTableService;
@@ -60,13 +59,14 @@ public class ForwardingRulesSyncProvider implements AutoCloseable, BindingAwareP
     private static final InstanceIdentifier<Node> NODE_WC_PATH =
             InstanceIdentifier.create(Nodes.class).child(Node.class);
 
-
     private final DataTreeIdentifier<FlowCapableNode> nodeConfigDataTreePath;
     private final DataTreeIdentifier<Node> nodeOperationalDataTreePath;
 
     private ListenerRegistration<NodeListener> dataTreeConfigChangeListener;
     private ListenerRegistration<NodeListener> dataTreeOperationalChangeListener;
 
+    private final ListeningExecutorService syncThreadPool;
+
     public ForwardingRulesSyncProvider(final BindingAwareBroker broker,
                                        final DataBroker dataBroker,
                                        final RpcConsumerRegistry rpcRegistry) {
@@ -80,24 +80,16 @@ public class ForwardingRulesSyncProvider implements AutoCloseable, BindingAwareP
         nodeConfigDataTreePath = new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, FLOW_CAPABLE_NODE_WC_PATH);
         nodeOperationalDataTreePath = new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, NODE_WC_PATH);
 
+        final ExecutorService executorService= Executors.newCachedThreadPool(new ThreadFactoryBuilder()
+                .setNameFormat(SyncReactorFutureDecorator.FRM_RPC_CLIENT_PREFIX + "%d")
+                .setDaemon(false)
+                .setUncaughtExceptionHandler((thread, e) -> LOG.error("Uncaught exception {}", thread, e))
+                .build());
+        syncThreadPool = MoreExecutors.listeningDecorator(executorService);
+
         broker.registerProvider(this);
     }
 
-    private final ListeningExecutorService syncThreadPool = FrmExecutors.instance()
-            // TODO improve log in ThreadPoolExecutor.afterExecute
-            // TODO max bloking queue size
-            // TODO core/min pool size
-            .newFixedThreadPool(6, new ThreadFactoryBuilder()
-                    .setNameFormat(SyncReactorFutureDecorator.FRM_RPC_CLIENT_PREFIX + "%d")
-                    .setDaemon(false)
-                    .setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
-                        @Override
-                        public void uncaughtException(Thread thread, Throwable e) {
-                            LOG.error("uncaught exception {}", thread, e);
-                        }
-                    })
-                    .build());
-
     @Override
     public void onSessionInitiated(final BindingAwareBroker.ProviderContext providerContext) {
         final TableForwarder tableForwarder = new TableForwarder(salTableService);
@@ -127,38 +119,20 @@ public class ForwardingRulesSyncProvider implements AutoCloseable, BindingAwareP
         final NodeListener<Node> nodeListenerOperational =
                 new SimplifiedOperationalRetryListener(reactor, operationalSnapshot, configDao, retryRegistry);
 
-        try {
-            SimpleTaskRetryLooper looper1 = new SimpleTaskRetryLooper(STARTUP_LOOP_TICK, STARTUP_LOOP_MAX_RETRIES);
-            dataTreeConfigChangeListener = looper1.loopUntilNoException(
-                    new Callable<ListenerRegistration<NodeListener>>() {
-                        @Override
-                        public ListenerRegistration<NodeListener> call() throws Exception {
-                            return dataService.registerDataTreeChangeListener(
-                                    nodeConfigDataTreePath, nodeListenerConfig);
-                        }
-                    });
-
-            SimpleTaskRetryLooper looper2 = new SimpleTaskRetryLooper(STARTUP_LOOP_TICK, STARTUP_LOOP_MAX_RETRIES);
-            dataTreeOperationalChangeListener = looper2.loopUntilNoException(
-                    new Callable<ListenerRegistration<NodeListener>>() {
-                        @Override
-                        public ListenerRegistration<NodeListener> call() throws Exception {
-                            return dataService.registerDataTreeChangeListener(
-                                    nodeOperationalDataTreePath, nodeListenerOperational);
-                        }
-                    });
-        } catch (final Exception e) {
-            LOG.warn("FR-Sync node DataChange listener registration fail!", e);
-            throw new IllegalStateException("FR-Sync startup fail!", e);
-        }
+        dataTreeConfigChangeListener =
+                dataService.registerDataTreeChangeListener(nodeConfigDataTreePath, nodeListenerConfig);
+        dataTreeOperationalChangeListener =
+                dataService.registerDataTreeChangeListener(nodeOperationalDataTreePath, nodeListenerOperational);
+
         LOG.info("ForwardingRulesSync has started.");
     }
 
-    public void close() throws Exception {
+    public void close() throws InterruptedException {
         if (dataTreeConfigChangeListener != null) {
             dataTreeConfigChangeListener.close();
             dataTreeConfigChangeListener = null;
         }
+
         if (dataTreeOperationalChangeListener != null) {
             dataTreeOperationalChangeListener.close();
             dataTreeOperationalChangeListener = null;
diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/FrmExecutors.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/FrmExecutors.java
deleted file mode 100644 (file)
index 4b8eb4e..0000000
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.openflowplugin.applications.frsync.impl;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-
-/**
- * Static Factory for creating ExecutorServices (because there is no dependency injection but
- * static getInstance).
- */
-public final class FrmExecutors {
-    public static PceExecutorsFactory instance() {
-        return DEFAULT_EXECUTORS;
-    }
-
-    public interface PceExecutorsFactory {
-
-        ListeningExecutorService newFixedThreadPool(int nThreads, ThreadFactory factory);
-    }
-
-    /**
-     * This will be rewritten in JUnits using SynchronousExecutorService.
-     */
-    @VisibleForTesting // should not be private and final
-    static PceExecutorsFactory DEFAULT_EXECUTORS = new PceExecutorsFactory() {
-
-        public ListeningExecutorService newFixedThreadPool(int nThreads, ThreadFactory factory) {
-            final ExecutorService executorService = Executors.newFixedThreadPool(nThreads, factory);
-            return MoreExecutors.listeningDecorator(executorService);
-        }
-    };
-}
index 9def21853c861d8d4634ec978cf3499779d7f3e6..e40b3b2200a1eda0fd070ea7553a6f8ba75e5f49 100644 (file)
@@ -48,10 +48,13 @@ public class SyncReactorGuardDecorator implements SyncReactor {
         LOG.trace("syncup guard {}", nodeId.getValue());
 
         final long stampBeforeGuard = System.nanoTime();
-        final Semaphore guard = summonGuardAndAcquire(flowcapableNodePath);//TODO handle InteruptedException
+        final Semaphore guard = summonGuardAndAcquire(flowcapableNodePath);
+        if (guard == null) {
+            return Futures.immediateFuture(false);
+        }
+        final long stampAfterGuard = System.nanoTime();
 
         try {
-            final long stampAfterGuard = System.nanoTime();
             if (LOG.isDebugEnabled()) {
                 LOG.debug("syncup start {} waiting:{} guard:{} thread:{}", nodeId.getValue(),
                         formatNanos(stampAfterGuard - stampBeforeGuard),
@@ -59,7 +62,7 @@ public class SyncReactorGuardDecorator implements SyncReactor {
             }
 
             final ListenableFuture<Boolean> endResult =
-                    delegate.syncup(flowcapableNodePath, configTree, operationalTree, dsType);//TODO handle InteruptedException
+                    delegate.syncup(flowcapableNodePath, configTree, operationalTree, dsType);
 
             Futures.addCallback(endResult, new FutureCallback<Boolean>() {
                 @Override
@@ -91,7 +94,7 @@ public class SyncReactorGuardDecorator implements SyncReactor {
                 }
             });
             return endResult;
-        } catch(InterruptedException e) {
+        } catch (InterruptedException e) {
             releaseGuardForNodeId(guard);
             throw e;
         }
@@ -106,21 +109,17 @@ public class SyncReactorGuardDecorator implements SyncReactor {
      * @param flowcapableNodePath II of node for which guard should be acquired
      * @return semaphore guard
      */
-    private Semaphore summonGuardAndAcquire(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath)
-            throws InterruptedException {
+    private Semaphore summonGuardAndAcquire(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath) {
+        final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
         final Semaphore guard = Preconditions.checkNotNull(semaphoreKeeper.summonGuard(flowcapableNodePath),
-                "no guard for " + flowcapableNodePath);
-
-        if (LOG.isDebugEnabled()) {
-            final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
-            try {
-                LOG.debug("syncup summon {} guard:{} thread:{}", nodeId.getValue(), guard, threadName());
-            } catch (Exception e) {
-                LOG.error("error logging guard after summon before aquiring {}", nodeId);
-            }
+                "no guard for " + nodeId.getValue());
+        try {
+            guard.acquire();
+        } catch (InterruptedException e) {
+            LOG.error("syncup summon {} failed {}", nodeId.getValue(), e);
+            return null;
         }
-
-        guard.acquire();
+        LOG.trace("syncup summon {} guard:{} thread:{}", nodeId.getValue(), guard, threadName());
         return guard;
     }
 
@@ -129,10 +128,10 @@ public class SyncReactorGuardDecorator implements SyncReactor {
      * @param guard semaphore guard which should be unlocked
      */
     private void releaseGuardForNodeId(final Semaphore guard) {
-        if (guard == null) {
-            return;
+        if (guard != null) {
+            guard.release();
+            LOG.trace("syncup release guard:{} thread:{}", guard, threadName());
         }
-        guard.release();
     }
 
     private static String threadName() {
index f9fb4410cee18cb0946881a70f29ab14943f441e..6a059bc1da7caaee7b6e693f82ef6b29510bf9e3 100644 (file)
@@ -11,10 +11,13 @@ package org.opendaylight.openflowplugin.applications.frsync.impl;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import org.junit.Before;
@@ -54,12 +57,12 @@ public class SyncReactorFutureZipDecoratorTest {
 
     @Before
     public void setUp() {
-        syncThreadPool = FrmExecutors.instance()
-                .newFixedThreadPool(1, new ThreadFactoryBuilder()
-                        .setNameFormat(SyncReactorFutureDecorator.FRM_RPC_CLIENT_PREFIX + "%d")
-                        .setDaemon(false)
-                        .build());
-
+        final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+                .setDaemon(false)
+                .setNameFormat("frsync-test%d")
+                .setUncaughtExceptionHandler((thread, e) -> LOG.error("Uncaught exception {}", thread, e))
+                .build());
+        syncThreadPool = MoreExecutors.listeningDecorator(executorService);
         reactor = new SyncReactorFutureZipDecorator(delegate, syncThreadPool);
         fcNodePath = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(NODE_ID))
                 .augmentation(FlowCapableNode.class);
index 6438e799d0894f7fff04b6f9937c99e7aa6df16c..56989de6993d4dff6dea6421e5cb90c114fe50da 100644 (file)
@@ -88,7 +88,8 @@ public class SalFlatBatchServiceImpl implements SalFlatBatchService {
     }
 
     @VisibleForTesting
-    Future<RpcResult<ProcessFlatBatchOutput>> executeBatchPlan(final List<AsyncFunction<RpcResult<ProcessFlatBatchOutput>, RpcResult<ProcessFlatBatchOutput>>> batchChainElements) {
+    Future<RpcResult<ProcessFlatBatchOutput>> executeBatchPlan(final List<AsyncFunction<RpcResult<ProcessFlatBatchOutput>,
+            RpcResult<ProcessFlatBatchOutput>>> batchChainElements) {
         ListenableFuture<RpcResult<ProcessFlatBatchOutput>> chainSummaryResult =
                 RpcResultBuilder.success(new ProcessFlatBatchOutputBuilder().build()).buildFuture();
 
@@ -97,7 +98,6 @@ public class SalFlatBatchServiceImpl implements SalFlatBatchService {
         }
 
         return chainSummaryResult;
-
     }
 
     @VisibleForTesting
@@ -113,7 +113,7 @@ public class SalFlatBatchServiceImpl implements SalFlatBatchService {
             final int currentOffset = stepOffset;
             chainJobs.add(new AsyncFunction<RpcResult<ProcessFlatBatchOutput>, RpcResult<ProcessFlatBatchOutput>>() {
                 @Override
-                public ListenableFuture<RpcResult<ProcessFlatBatchOutput>> apply(final RpcResult<ProcessFlatBatchOutput> chainInput) throws Exception {
+                public ListenableFuture<RpcResult<ProcessFlatBatchOutput>> apply(final RpcResult<ProcessFlatBatchOutput> chainInput) {
                     if (exitOnFirstError && !chainInput.isSuccessful()) {
                         LOG.debug("error on flat batch chain occurred -> skipping step {}", planStep.getStepType());
                         return Futures.immediateFuture(chainInput);
@@ -125,56 +125,47 @@ public class SalFlatBatchServiceImpl implements SalFlatBatchService {
                     final ListenableFuture<RpcResult<ProcessFlatBatchOutput>> chainOutput;
                     switch (planStep.getStepType()) {
                         case FLOW_ADD:
-                            final AddFlowsBatchInput addFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchAddFlow(
-                                    planStep, node);
+                            final AddFlowsBatchInput addFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchAddFlow(planStep, node);
                             final Future<RpcResult<AddFlowsBatchOutput>> resultAddFlowFuture = salFlowService.addFlowsBatch(addFlowsBatchInput);
                             chainOutput = FlatBatchFlowAdapters.adaptFlowBatchFutureForChain(chainInput, resultAddFlowFuture, currentOffset);
                             break;
                         case FLOW_REMOVE:
-                            final RemoveFlowsBatchInput removeFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchRemoveFlow(
-                                    planStep, node);
+                            final RemoveFlowsBatchInput removeFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchRemoveFlow(planStep, node);
                             final Future<RpcResult<RemoveFlowsBatchOutput>> resultRemoveFlowFuture = salFlowService.removeFlowsBatch(removeFlowsBatchInput);
                             chainOutput = FlatBatchFlowAdapters.adaptFlowBatchFutureForChain(chainInput, resultRemoveFlowFuture, currentOffset);
                             break;
                         case FLOW_UPDATE:
-                            final UpdateFlowsBatchInput updateFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchUpdateFlow(
-                                    planStep, node);
+                            final UpdateFlowsBatchInput updateFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchUpdateFlow(planStep, node);
                             final Future<RpcResult<UpdateFlowsBatchOutput>> resultUpdateFlowFuture = salFlowService.updateFlowsBatch(updateFlowsBatchInput);
                             chainOutput = FlatBatchFlowAdapters.adaptFlowBatchFutureForChain(chainInput, resultUpdateFlowFuture, currentOffset);
                             break;
                         case GROUP_ADD:
-                            final AddGroupsBatchInput addGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchAddGroup(
-                                    planStep, node);
+                            final AddGroupsBatchInput addGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchAddGroup(planStep, node);
                             final Future<RpcResult<AddGroupsBatchOutput>> resultAddGroupFuture = salGroupService.addGroupsBatch(addGroupsBatchInput);
                             chainOutput = FlatBatchGroupAdapters.adaptGroupBatchFutureForChain(chainInput, resultAddGroupFuture, currentOffset);
                             break;
                         case GROUP_REMOVE:
-                            final RemoveGroupsBatchInput removeGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchRemoveGroup(
-                                    planStep, node);
+                            final RemoveGroupsBatchInput removeGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchRemoveGroup(planStep, node);
                             final Future<RpcResult<RemoveGroupsBatchOutput>> resultRemoveGroupFuture = salGroupService.removeGroupsBatch(removeGroupsBatchInput);
                             chainOutput = FlatBatchGroupAdapters.adaptGroupBatchFutureForChain(chainInput, resultRemoveGroupFuture, currentOffset);
                             break;
                         case GROUP_UPDATE:
-                            final UpdateGroupsBatchInput updateGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchUpdateGroup(
-                                    planStep, node);
+                            final UpdateGroupsBatchInput updateGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchUpdateGroup(planStep, node);
                             final Future<RpcResult<UpdateGroupsBatchOutput>> resultUpdateGroupFuture = salGroupService.updateGroupsBatch(updateGroupsBatchInput);
                             chainOutput = FlatBatchGroupAdapters.adaptGroupBatchFutureForChain(chainInput, resultUpdateGroupFuture, currentOffset);
                             break;
                         case METER_ADD:
-                            final AddMetersBatchInput addMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchAddMeter(
-                                    planStep, node);
+                            final AddMetersBatchInput addMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchAddMeter(planStep, node);
                             final Future<RpcResult<AddMetersBatchOutput>> resultAddMeterFuture = salMeterService.addMetersBatch(addMetersBatchInput);
                             chainOutput = FlatBatchMeterAdapters.adaptMeterBatchFutureForChain(chainInput, resultAddMeterFuture, currentOffset);
                             break;
                         case METER_REMOVE:
-                            final RemoveMetersBatchInput removeMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchRemoveMeter(
-                                    planStep, node);
+                            final RemoveMetersBatchInput removeMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchRemoveMeter(planStep, node);
                             final Future<RpcResult<RemoveMetersBatchOutput>> resultRemoveMeterFuture = salMeterService.removeMetersBatch(removeMetersBatchInput);
                             chainOutput = FlatBatchMeterAdapters.adaptMeterBatchFutureForChain(chainInput, resultRemoveMeterFuture, currentOffset);
                             break;
                         case METER_UPDATE:
-                            final UpdateMetersBatchInput updateMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchUpdateMeter(
-                                    planStep, node);
+                            final UpdateMetersBatchInput updateMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchUpdateMeter(planStep, node);
                             final Future<RpcResult<UpdateMetersBatchOutput>> resultUpdateMeterFuture = salMeterService.updateMetersBatch(updateMetersBatchInput);
                             chainOutput = FlatBatchMeterAdapters.adaptMeterBatchFutureForChain(chainInput, resultUpdateMeterFuture, currentOffset);
                             break;