Bug 5575 added SyncReactorDecorators and thread pool 52/38152/8
authorAndrej Leitner <anleitne@cisco.com>
Tue, 26 Apr 2016 14:20:32 +0000 (16:20 +0200)
committerAndrej Leitner <anleitne@cisco.com>
Tue, 24 May 2016 06:32:06 +0000 (08:32 +0200)
  - Guard, Future, Future w/compression decorators
  - thread pool and semaphore keeper

Change-Id: I4d8d3327ef3f318cc17534e789134204977550f7
Signed-off-by: Andrej Leitner <anleitne@cisco.com>
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/SemaphoreKeeper.java [new file with mode: 0644]
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/ForwardingRulesSyncProvider.java
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/FrmExecutors.java [new file with mode: 0644]
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureDecorator.java [new file with mode: 0644]
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureWithCompressionDecorator.java [new file with mode: 0644]
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorGuardDecorator.java [new file with mode: 0644]
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/SemaphoreKeeperGuavaImpl.java [new file with mode: 0644]

diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/SemaphoreKeeper.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/SemaphoreKeeper.java
new file mode 100644 (file)
index 0000000..8185bb0
--- /dev/null
@@ -0,0 +1,40 @@
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync;
+
+import java.util.concurrent.Semaphore;
+import javax.annotation.Nonnull;
+
+/**
+ * Proposal for how a key based semaphore provider should look like.
+ * <ul>
+ * <li>thread safe</li>
+ * <li>garbage-collect unused semaphores</li>
+ * <li>for the same key there must be always only one semaphore available</li>
+ * </ul>
+ *
+ *
+ * usage:
+ * <pre>
+ * final Semaphore guard = semaphoreKeeper.summonGuard(key);
+ * guard.acquire();
+ * // guard protected logic ...
+ * guard.release();
+ * </pre>
+ *
+ * @param <K> key type
+ */
+
+public interface SemaphoreKeeper<K> {
+    /**
+     * @param key semaphore identifier
+     * @return new or existing semaphore for given key, for one key there is always only one semaphore available
+     */
+    Semaphore summonGuard(@Nonnull K key);
+}
index eb5b46f292bd08291d93e8c388fc3805df17a543..aa1f8a72ba2a9cc694138b0e70b21289f100cc7f 100644 (file)
@@ -9,16 +9,21 @@
 package org.opendaylight.openflowplugin.applications.frsync.impl;
 
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.lang.Thread.UncaughtExceptionHandler;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
 import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
+import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeCachedDao;
 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeDao;
 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeOdlDao;
 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeSnapshotDao;
+import org.opendaylight.openflowplugin.applications.frsync.util.SemaphoreKeeperGuavaImpl;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
@@ -82,6 +87,21 @@ public class ForwardingRulesSyncProvider implements AutoCloseable, BindingAwareP
         broker.registerProvider(this);
     }
 
+    private final ListeningExecutorService syncThreadPool = FrmExecutors.instance()
+            // TODO improve log in ThreadPoolExecutor.afterExecute
+            // TODO max bloking queue size
+            // TODO core/min pool size
+            .newFixedThreadPool(6, new ThreadFactoryBuilder()
+                    .setNameFormat(SyncReactorFutureDecorator.FRM_RPC_CLIENT_PREFIX + "%d")
+                    .setDaemon(false)
+                    .setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+                        @Override
+                        public void uncaughtException(Thread thread, Throwable e) {
+                            LOG.error("uncaught exception {}", thread, e);
+                        }
+                    })
+                    .build());
+
     @Override
     public void onSessionInitiated(final BindingAwareBroker.ProviderContext providerContext) {
         final FlowForwarder flowForwarder = new FlowForwarder(salFlowService);
@@ -91,7 +111,17 @@ public class ForwardingRulesSyncProvider implements AutoCloseable, BindingAwareP
 
         {
             final SyncReactorImpl syncReactorImpl = new SyncReactorImpl();
-            
+            final SyncReactor syncReactorGuard = new SyncReactorGuardDecorator(syncReactorImpl
+                    .setFlowForwarder(flowForwarder)
+                    .setGroupForwarder(groupForwarder)
+                    .setMeterForwarder(meterForwarder)
+                    .setTableForwarder(tableForwarder)
+                    .setTransactionService(transactionService),
+                    new SemaphoreKeeperGuavaImpl<InstanceIdentifier<FlowCapableNode>>(1, true));
+
+            final SyncReactor cfgReactor = new SyncReactorFutureWithCompressionDecorator(syncReactorGuard, syncThreadPool);
+            final SyncReactor operReactor = new SyncReactorFutureWithCompressionDecorator(syncReactorGuard, syncThreadPool);
+
             final FlowCapableNodeSnapshotDao configSnapshot = new FlowCapableNodeSnapshotDao();
             final FlowCapableNodeSnapshotDao operationalSnapshot = new FlowCapableNodeSnapshotDao();
             final FlowCapableNodeDao configDao = new FlowCapableNodeCachedDao(configSnapshot,
diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/FrmExecutors.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/FrmExecutors.java
new file mode 100644 (file)
index 0000000..44e625d
--- /dev/null
@@ -0,0 +1,44 @@
+/**\r
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.\r
+ *\r
+ * This program and the accompanying materials are made available under the\r
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
+ * and is available at http://www.eclipse.org/legal/epl-v10.html\r
+ */\r
+\r
+package org.opendaylight.openflowplugin.applications.frsync.impl;\r
+\r
+import java.util.concurrent.ExecutorService;\r
+import java.util.concurrent.Executors;\r
+import java.util.concurrent.ThreadFactory;\r
+\r
+import com.google.common.annotations.VisibleForTesting;\r
+import com.google.common.util.concurrent.ListeningExecutorService;\r
+import com.google.common.util.concurrent.MoreExecutors;\r
+\r
+/**\r
+ * Static Factory for creating ExecutorServicess (because there is no dependency injection but\r
+ * static getInstance).\r
+ */\r
+public final class FrmExecutors {\r
+    public static PceExecursFactory instance() {\r
+        return DEFAULT_EXECUTORS;\r
+    }\r
+\r
+    public interface PceExecursFactory {\r
+\r
+        public ListeningExecutorService newFixedThreadPool(int nThreads, ThreadFactory factory);\r
+    }\r
+\r
+    /**\r
+     * This will be rewritten in JUnits using SynchronousExecutorService\r
+     */\r
+    @VisibleForTesting // should not be private and final\r
+    static PceExecursFactory DEFAULT_EXECUTORS = new PceExecursFactory() {\r
+\r
+        public ListeningExecutorService newFixedThreadPool(int nThreads, ThreadFactory factory) {\r
+            final ExecutorService executorService = Executors.newFixedThreadPool(nThreads, factory);\r
+            return MoreExecutors.listeningDecorator(executorService);\r
+        }\r
+    };\r
+}\r
diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureDecorator.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureDecorator.java
new file mode 100644 (file)
index 0000000..fb0b3e9
--- /dev/null
@@ -0,0 +1,112 @@
+/**\r
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.\r
+ *\r
+ * This program and the accompanying materials are made available under the\r
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
+ * and is available at http://www.eclipse.org/legal/epl-v10.html\r
+ */\r
+\r
+package org.opendaylight.openflowplugin.applications.frsync.impl;\r
+\r
+import java.util.concurrent.Callable;\r
+import java.util.concurrent.TimeUnit;\r
+import java.util.concurrent.TimeoutException;\r
+\r
+import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;\r
+import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;\r
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;\r
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;\r
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+\r
+import com.google.common.util.concurrent.ListenableFuture;\r
+import com.google.common.util.concurrent.ListeningExecutorService;\r
+\r
+/**\r
+ * Decorator for running delegate syncup in Future.\r
+ */\r
+public class SyncReactorFutureDecorator implements SyncReactor {\r
+\r
+    private static final Logger LOG = LoggerFactory.getLogger(SyncReactorFutureDecorator.class);\r
+\r
+    private final SyncReactor delegate;\r
+    private final ListeningExecutorService executorService;\r
+\r
+    public static final String FRM_RPC_CLIENT_PREFIX = "FRM-RPC-client-";\r
+\r
+    public SyncReactorFutureDecorator(SyncReactor delegate, ListeningExecutorService executorService) {\r
+        this.delegate = delegate;\r
+        this.executorService = executorService;\r
+    }\r
+\r
+    public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,\r
+            final FlowCapableNode configTree, final FlowCapableNode operationalTree) throws InterruptedException {\r
+        final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
+        LOG.trace("syncup {}", nodeId.getValue());\r
+\r
+        final ListenableFuture<Boolean> syncup = executorService.submit(new Callable<Boolean>() {\r
+            public Boolean call() throws Exception {\r
+                final String oldThreadName = updateThreadName(nodeId);\r
+\r
+                try {\r
+                    final Boolean ret = doSyncupInFuture(flowcapableNodePath, configTree, operationalTree)\r
+                            .get(10000, TimeUnit.MILLISECONDS);\r
+                    LOG.trace("ret {} {}", nodeId.getValue(), ret);\r
+                    return true;\r
+                } catch (TimeoutException e) {\r
+                    LOG.error("doSyncupInFuture timeout occured {}", nodeId.getValue(), e);\r
+                    return false;\r
+                } finally {\r
+                    updateThreadName(oldThreadName);\r
+                }\r
+            }\r
+        });\r
+        \r
+        return syncup;\r
+    }\r
+\r
+    protected ListenableFuture<Boolean> doSyncupInFuture(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,\r
+            final FlowCapableNode configTree, final FlowCapableNode operationalTree)\r
+                    throws InterruptedException {\r
+        final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
+        LOG.trace("doSyncupInFuture {}", nodeId.getValue());\r
+\r
+        return delegate.syncup(flowcapableNodePath, configTree, operationalTree);\r
+    }\r
+\r
+    static String threadName() {\r
+        final Thread currentThread = Thread.currentThread();\r
+        return currentThread.getName();\r
+    }\r
+\r
+    protected String updateThreadName(NodeId nodeId) {\r
+        final Thread currentThread = Thread.currentThread();\r
+        final String oldName = currentThread.getName();\r
+        try {\r
+            if (oldName.startsWith(SyncReactorFutureDecorator.FRM_RPC_CLIENT_PREFIX)) {\r
+                currentThread.setName(oldName + "@" + nodeId.getValue());\r
+            } else {\r
+                LOG.warn("try to update foreign thread name {} {}", nodeId, oldName);\r
+            }\r
+        } catch (Exception e) {\r
+            LOG.error("failed updating threadName {}", nodeId, e);\r
+        }\r
+        return oldName;\r
+    }\r
+\r
+    protected String updateThreadName(String name) {\r
+        final Thread currentThread = Thread.currentThread();\r
+        final String oldName = currentThread.getName();\r
+        try {\r
+            if (oldName.startsWith(SyncReactorFutureDecorator.FRM_RPC_CLIENT_PREFIX)) {\r
+                currentThread.setName(name);\r
+            } else {\r
+                LOG.warn("try to update foreign thread name {} {}", oldName, name);\r
+            }\r
+        } catch (Exception e) {\r
+            LOG.error("failed updating threadName {}", name, e);\r
+        }\r
+        return oldName;\r
+    }\r
+}\r
diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureWithCompressionDecorator.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureWithCompressionDecorator.java
new file mode 100644 (file)
index 0000000..44f3ee7
--- /dev/null
@@ -0,0 +1,107 @@
+/**\r
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.\r
+ *\r
+ * This program and the accompanying materials are made available under the\r
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
+ * and is available at http://www.eclipse.org/legal/epl-v10.html\r
+ */\r
+\r
+package org.opendaylight.openflowplugin.applications.frsync.impl;\r
+\r
+import java.util.HashMap;\r
+import java.util.Map;\r
+import java.util.concurrent.Semaphore;\r
+\r
+import javax.annotation.concurrent.GuardedBy;\r
+\r
+import org.apache.commons.lang3.tuple.Pair;\r
+import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;\r
+import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;\r
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;\r
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;\r
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+\r
+import com.google.common.util.concurrent.Futures;\r
+import com.google.common.util.concurrent.ListenableFuture;\r
+import com.google.common.util.concurrent.ListeningExecutorService;\r
+\r
+/**\r
+ * Enriches {@link SyncReactorFutureDecorator} with state compression.\r
+ */\r
+public class SyncReactorFutureWithCompressionDecorator extends SyncReactorFutureDecorator {\r
+\r
+    private static final Logger LOG = LoggerFactory.getLogger(SyncReactorFutureWithCompressionDecorator.class);\r
+\r
+    @GuardedBy("beforeCompressionGuard")\r
+    final Map<InstanceIdentifier<FlowCapableNode>, Pair<FlowCapableNode, FlowCapableNode>> beforeCompression =\r
+            new HashMap<>();\r
+    final Semaphore beforeCompressionGuard = new Semaphore(1, false);\r
+\r
+    public SyncReactorFutureWithCompressionDecorator(SyncReactor delegate, ListeningExecutorService executorService) {\r
+        super(delegate, executorService);\r
+    }\r
+\r
+    public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,\r
+            final FlowCapableNode configTree, final FlowCapableNode operationalTree) throws InterruptedException {\r
+        final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
+        LOG.trace("syncup {}", nodeId.getValue());\r
+\r
+        try {\r
+            beforeCompressionGuard.acquire();\r
+\r
+            final boolean newFutureNecessary = updateCompressionState(flowcapableNodePath, configTree, operationalTree);\r
+            if (newFutureNecessary) {\r
+                super.syncup(flowcapableNodePath, configTree, operationalTree);\r
+            }\r
+            return Futures.immediateFuture(true);\r
+        } finally {\r
+            beforeCompressionGuard.release();\r
+        }\r
+    }\r
+\r
+    protected ListenableFuture<Boolean> doSyncupInFuture(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,\r
+            final FlowCapableNode configTree, final FlowCapableNode operationalTree)\r
+                    throws InterruptedException {\r
+        final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
+        LOG.trace("doSyncupInFuture {}", nodeId.getValue());\r
+\r
+        final Pair<FlowCapableNode, FlowCapableNode> lastCompressionState =\r
+                removeLastCompressionState(flowcapableNodePath);\r
+        if (lastCompressionState == null) {\r
+            return Futures.immediateFuture(true);\r
+        } else {\r
+            return super.doSyncupInFuture(flowcapableNodePath,\r
+                    lastCompressionState.getLeft(), lastCompressionState.getRight());\r
+        }\r
+    }\r
+\r
+    protected boolean updateCompressionState(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,\r
+            final FlowCapableNode configTree, final FlowCapableNode operationalTree) {\r
+        final Pair<FlowCapableNode, FlowCapableNode> previous = beforeCompression.get(flowcapableNodePath);\r
+        if (previous != null) {\r
+            final FlowCapableNode previousOperational = previous.getRight();\r
+            beforeCompression.put(flowcapableNodePath, Pair.of(configTree, previousOperational));\r
+            return false;\r
+        } else {\r
+            beforeCompression.put(flowcapableNodePath, Pair.of(configTree, operationalTree));\r
+            return true;\r
+        }\r
+    }\r
+\r
+    protected Pair<FlowCapableNode/* config */, FlowCapableNode/* operational */> removeLastCompressionState(\r
+            final InstanceIdentifier<FlowCapableNode> flowcapableNodePath) {\r
+        try {\r
+            try {\r
+                beforeCompressionGuard.acquire();\r
+            } catch (InterruptedException e) {\r
+                return null;\r
+            }\r
+\r
+            return beforeCompression.remove(flowcapableNodePath);\r
+        } finally {\r
+            beforeCompressionGuard.release();\r
+        }\r
+    }\r
+}\r
diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorGuardDecorator.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorGuardDecorator.java
new file mode 100644 (file)
index 0000000..ea019e5
--- /dev/null
@@ -0,0 +1,148 @@
+/**\r
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.\r
+ *\r
+ * This program and the accompanying materials are made available under the\r
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
+ * and is available at http://www.eclipse.org/legal/epl-v10.html\r
+ */\r
+\r
+package org.opendaylight.openflowplugin.applications.frsync.impl;\r
+\r
+import java.util.concurrent.Semaphore;\r
+import java.util.concurrent.TimeUnit;\r
+\r
+import javax.annotation.Nullable;\r
+\r
+import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper;\r
+import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;\r
+import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;\r
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;\r
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;\r
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+\r
+import com.google.common.base.Preconditions;\r
+import com.google.common.util.concurrent.FutureCallback;\r
+import com.google.common.util.concurrent.Futures;\r
+import com.google.common.util.concurrent.ListenableFuture;\r
+\r
+/**\r
+ * Decorator for NodeId level syncup locking.\r
+ */\r
+public class SyncReactorGuardDecorator implements SyncReactor {\r
+\r
+    private static final Logger LOG = LoggerFactory.getLogger(SyncReactorGuardDecorator.class);\r
+\r
+    private final SyncReactor delegate;\r
+    private final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper;\r
+\r
+    public SyncReactorGuardDecorator(SyncReactor delegate,\r
+            SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper) {\r
+        this.delegate = delegate;\r
+        this.semaphoreKeeper = semaphoreKeeper;\r
+    }\r
+\r
+    public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,\r
+            final FlowCapableNode configTree, final FlowCapableNode operationalTree) throws InterruptedException {\r
+        final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
+        LOG.trace("syncup {}", nodeId.getValue());\r
+\r
+        final long stampBeforeGuard = System.nanoTime();\r
+        final Semaphore guard = summonGuardAndAcquire(flowcapableNodePath);//TODO handle InteruptedException\r
+\r
+        try {\r
+            final long stampAfterGuard = System.nanoTime();\r
+            if (LOG.isDebugEnabled()) {\r
+                LOG.debug("syncup start {} waiting:{} guard:{} thread:{}", nodeId.getValue(),\r
+                        formatNanos(stampAfterGuard - stampBeforeGuard),\r
+                        guard, threadName());\r
+            }\r
+            \r
+            final ListenableFuture<Boolean> endResult =\r
+                    delegate.syncup(flowcapableNodePath, configTree, operationalTree);//TODO handle InteruptedException\r
+            \r
+            Futures.addCallback(endResult, new FutureCallback<Boolean>() {\r
+                @Override\r
+                public void onSuccess(@Nullable final Boolean result) {\r
+                    if (LOG.isDebugEnabled()) {\r
+                        final long stampFinished = System.nanoTime();\r
+                        LOG.debug("syncup finished {} took:{} rpc:{} wait:{} guard:{}, thread:{}", nodeId.getValue(),\r
+                                formatNanos(stampFinished - stampBeforeGuard),\r
+                                formatNanos(stampFinished - stampAfterGuard),\r
+                                formatNanos(stampAfterGuard - stampBeforeGuard),\r
+                                guard, threadName());\r
+                    }\r
+                    \r
+                    lockReleaseForNodeId(nodeId, guard);\r
+                }\r
+                \r
+                @Override\r
+                public void onFailure(final Throwable t) {\r
+                    if (LOG.isDebugEnabled()) {\r
+                        final long stampFinished = System.nanoTime();\r
+                        LOG.warn("syncup failed {} took:{} rpc:{} wait:{} guard:{} thread:{}", nodeId.getValue(),\r
+                                formatNanos(stampFinished - stampBeforeGuard),\r
+                                formatNanos(stampFinished - stampAfterGuard),\r
+                                formatNanos(stampAfterGuard - stampBeforeGuard),\r
+                                guard, threadName());\r
+                    }\r
+                    \r
+                    lockReleaseForNodeId(nodeId, guard);\r
+                }\r
+            });\r
+            return endResult;\r
+        } catch(InterruptedException e) {\r
+            lockReleaseForNodeId(nodeId, guard);\r
+            throw e;\r
+        }\r
+    }\r
+\r
+    protected String formatNanos(long nanos) {\r
+        return "'" + TimeUnit.NANOSECONDS.toMillis(nanos) + " ms'";\r
+    }\r
+\r
+    /**\r
+     * get guard\r
+     *\r
+     * @param flowcapableNodePath\r
+     * @return\r
+     */\r
+    protected Semaphore summonGuardAndAcquire(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath)\r
+            throws InterruptedException {\r
+        final Semaphore guard = Preconditions.checkNotNull(semaphoreKeeper.summonGuard(flowcapableNodePath),\r
+                "no guard for " + flowcapableNodePath);\r
+\r
+        if (LOG.isDebugEnabled()) {\r
+            final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
+            try {\r
+                LOG.debug("syncup summon {} guard:{} thread:{}", nodeId.getValue(), guard, threadName());\r
+            } catch (Exception e) {\r
+                LOG.error("error logging guard after summon before aquiring {}", nodeId);\r
+            }\r
+        }\r
+\r
+        guard.acquire();\r
+        return guard;\r
+    }\r
+\r
+    /**\r
+     * unlock per node\r
+     *\r
+     * @param nodeId\r
+     * @param guard\r
+     */\r
+    protected void lockReleaseForNodeId(final NodeId nodeId,\r
+            final Semaphore guard) {\r
+        if (guard == null) {\r
+            return;\r
+        }\r
+        guard.release();\r
+    }\r
+\r
+    static String threadName() {\r
+        final Thread currentThread = Thread.currentThread();\r
+        return currentThread.getName();\r
+    }\r
+\r
+}\r
diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/SemaphoreKeeperGuavaImpl.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/SemaphoreKeeperGuavaImpl.java
new file mode 100644 (file)
index 0000000..1a11c9f
--- /dev/null
@@ -0,0 +1,51 @@
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.util;
+
+import java.util.concurrent.Semaphore;
+
+import javax.annotation.Nonnull;
+
+import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
+/**
+ * Key-based semaphore provider.
+ */
+public class SemaphoreKeeperGuavaImpl<K> implements SemaphoreKeeper<K> {
+
+    private LoadingCache<K, Semaphore> semaphoreCache;
+
+    public SemaphoreKeeperGuavaImpl(final int permits, final boolean fair) {
+        semaphoreCache = CacheBuilder.newBuilder()
+                .concurrencyLevel(1)
+                .weakValues()
+                .build(new CacheLoader<K, Semaphore>() {
+                    @Override
+                    public Semaphore load(final K key) throws Exception {
+                        return new Semaphore(permits, fair) {
+                            private static final long serialVersionUID = 1L;
+                        };
+                    }
+                });
+    }
+
+    @Override
+    public Semaphore summonGuard(final @Nonnull K key) {
+        return semaphoreCache.getUnchecked(key);
+    }
+    
+    @Override
+    public String toString() {
+        return super.toString() + " size:" + (semaphoreCache == null ? null : semaphoreCache.size()) + " " + semaphoreCache;
+    }
+}