Merge "Bug 5543 - Bo: Update JUnit tests part_9"
[openflowplugin.git] / applications / forwardingrules-sync / src / main / java / org / opendaylight / openflowplugin / applications / frsync / impl / SyncReactorGuardDecorator.java
index 90ead252bc52be1c458fb118b7e2b9da0199206e..e40b3b2200a1eda0fd070ea7553a6f8ba75e5f49 100644 (file)
-/**\r
- * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.\r
- *\r
- * This program and the accompanying materials are made available under the\r
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
- * and is available at http://www.eclipse.org/legal/epl-v10.html\r
- */\r
-\r
-package org.opendaylight.openflowplugin.applications.frsync.impl;\r
-\r
-import com.google.common.base.Preconditions;\r
-import com.google.common.util.concurrent.FutureCallback;\r
-import com.google.common.util.concurrent.Futures;\r
-import com.google.common.util.concurrent.ListenableFuture;\r
-import java.util.concurrent.Semaphore;\r
-import java.util.concurrent.TimeUnit;\r
-import javax.annotation.Nullable;\r
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;\r
-import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper;\r
-import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;\r
-import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;\r
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;\r
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;\r
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;\r
-import org.slf4j.Logger;\r
-import org.slf4j.LoggerFactory;\r
-\r
-/**\r
- * Decorator for NodeId level syncup locking.\r
- */\r
-public class SyncReactorGuardDecorator implements SyncReactor {\r
-\r
-    private static final Logger LOG = LoggerFactory.getLogger(SyncReactorGuardDecorator.class);\r
-\r
-    private final SyncReactor delegate;\r
-    private final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper;\r
-\r
-    public SyncReactorGuardDecorator(SyncReactor delegate,\r
-            SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper) {\r
-        this.delegate = delegate;\r
-        this.semaphoreKeeper = semaphoreKeeper;\r
-    }\r
-\r
-    public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,\r
-                                            final FlowCapableNode configTree, final FlowCapableNode operationalTree,\r
-                                            final LogicalDatastoreType dsType) throws InterruptedException {\r
-        final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
-        LOG.trace("syncup {}", nodeId.getValue());\r
-\r
-        final long stampBeforeGuard = System.nanoTime();\r
-        final Semaphore guard = summonGuardAndAcquire(flowcapableNodePath);//TODO handle InteruptedException\r
-\r
-        try {\r
-            final long stampAfterGuard = System.nanoTime();\r
-            if (LOG.isDebugEnabled()) {\r
-                LOG.debug("syncup start {} waiting:{} guard:{} thread:{}", nodeId.getValue(),\r
-                        formatNanos(stampAfterGuard - stampBeforeGuard),\r
-                        guard, threadName());\r
-            }\r
-\r
-            final ListenableFuture<Boolean> endResult =\r
-                    delegate.syncup(flowcapableNodePath, configTree, operationalTree, dsType);//TODO handle InteruptedException\r
-\r
-            Futures.addCallback(endResult, new FutureCallback<Boolean>() {\r
-                @Override\r
-                public void onSuccess(@Nullable final Boolean result) {\r
-                    if (LOG.isDebugEnabled()) {\r
-                        final long stampFinished = System.nanoTime();\r
-                        LOG.debug("syncup finished {} took:{} rpc:{} wait:{} guard:{}, thread:{}", nodeId.getValue(),\r
-                                formatNanos(stampFinished - stampBeforeGuard),\r
-                                formatNanos(stampFinished - stampAfterGuard),\r
-                                formatNanos(stampAfterGuard - stampBeforeGuard),\r
-                                guard, threadName());\r
-                    }\r
-\r
-                    releaseGuardForNodeId(nodeId, guard);\r
-                }\r
-\r
-                @Override\r
-                public void onFailure(final Throwable t) {\r
-                    if (LOG.isDebugEnabled()) {\r
-                        final long stampFinished = System.nanoTime();\r
-                        LOG.warn("syncup failed {} took:{} rpc:{} wait:{} guard:{} thread:{}", nodeId.getValue(),\r
-                                formatNanos(stampFinished - stampBeforeGuard),\r
-                                formatNanos(stampFinished - stampAfterGuard),\r
-                                formatNanos(stampAfterGuard - stampBeforeGuard),\r
-                                guard, threadName());\r
-                    }\r
-\r
-                    releaseGuardForNodeId(nodeId, guard);\r
-                }\r
-            });\r
-            return endResult;\r
-        } catch(InterruptedException e) {\r
-            releaseGuardForNodeId(nodeId, guard);\r
-            throw e;\r
-        }\r
-    }\r
-\r
-    protected String formatNanos(long nanos) {\r
-        return "'" + TimeUnit.NANOSECONDS.toMillis(nanos) + " ms'";\r
-    }\r
-\r
-    /**\r
-     * get guard\r
-     *\r
-     * @param flowcapableNodePath II of node for which guard should be acquired\r
-     * @return semaphore guard\r
-     */\r
-    protected Semaphore summonGuardAndAcquire(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath)\r
-            throws InterruptedException {\r
-        final Semaphore guard = Preconditions.checkNotNull(semaphoreKeeper.summonGuard(flowcapableNodePath),\r
-                "no guard for " + flowcapableNodePath);\r
-\r
-        if (LOG.isDebugEnabled()) {\r
-            final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
-            try {\r
-                LOG.debug("syncup summon {} guard:{} thread:{}", nodeId.getValue(), guard, threadName());\r
-            } catch (Exception e) {\r
-                LOG.error("error logging guard after summon before aquiring {}", nodeId);\r
-            }\r
-        }\r
-\r
-        guard.acquire();\r
-        return guard;\r
-    }\r
-\r
-    /**\r
-     * unlock per node\r
-     *\r
-     * @param nodeId NodeId of node which should be unlocked\r
-     * @param guard semaphore guard\r
-     */\r
-    protected void releaseGuardForNodeId(final NodeId nodeId, final Semaphore guard) {\r
-        if (guard == null) {\r
-            return;\r
-        }\r
-        guard.release();\r
-    }\r
-\r
-    static String threadName() {\r
-        final Thread currentThread = Thread.currentThread();\r
-        return currentThread.getName();\r
-    }\r
-\r
-}\r
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.impl;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper;
+import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
+import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Decorator for NodeId level syncup locking.
+ */
+public class SyncReactorGuardDecorator implements SyncReactor {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SyncReactorGuardDecorator.class);
+
+    private final SyncReactor delegate;
+    private final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper;
+
+    public SyncReactorGuardDecorator(SyncReactor delegate,
+            SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper) {
+        this.delegate = delegate;
+        this.semaphoreKeeper = semaphoreKeeper;
+    }
+
+    public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
+                                            final FlowCapableNode configTree, final FlowCapableNode operationalTree,
+                                            final LogicalDatastoreType dsType) throws InterruptedException {
+        final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
+        LOG.trace("syncup guard {}", nodeId.getValue());
+
+        final long stampBeforeGuard = System.nanoTime();
+        final Semaphore guard = summonGuardAndAcquire(flowcapableNodePath);
+        if (guard == null) {
+            return Futures.immediateFuture(false);
+        }
+        final long stampAfterGuard = System.nanoTime();
+
+        try {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("syncup start {} waiting:{} guard:{} thread:{}", nodeId.getValue(),
+                        formatNanos(stampAfterGuard - stampBeforeGuard),
+                        guard, threadName());
+            }
+
+            final ListenableFuture<Boolean> endResult =
+                    delegate.syncup(flowcapableNodePath, configTree, operationalTree, dsType);
+
+            Futures.addCallback(endResult, new FutureCallback<Boolean>() {
+                @Override
+                public void onSuccess(@Nullable final Boolean result) {
+                    if (LOG.isDebugEnabled()) {
+                        final long stampFinished = System.nanoTime();
+                        LOG.debug("syncup finished {} took:{} rpc:{} wait:{} guard:{} permits thread:{}", nodeId.getValue(),
+                                formatNanos(stampFinished - stampBeforeGuard),
+                                formatNanos(stampFinished - stampAfterGuard),
+                                formatNanos(stampAfterGuard - stampBeforeGuard),
+                                guard.availablePermits(), threadName());
+                    }
+
+                    releaseGuardForNodeId(guard);
+                }
+
+                @Override
+                public void onFailure(final Throwable t) {
+                    if (LOG.isDebugEnabled()) {
+                        final long stampFinished = System.nanoTime();
+                        LOG.warn("syncup failed {} took:{} rpc:{} wait:{} guard:{} permits thread:{}", nodeId.getValue(),
+                                formatNanos(stampFinished - stampBeforeGuard),
+                                formatNanos(stampFinished - stampAfterGuard),
+                                formatNanos(stampAfterGuard - stampBeforeGuard),
+                                guard.availablePermits(), threadName());
+                    }
+
+                    releaseGuardForNodeId(guard);
+                }
+            });
+            return endResult;
+        } catch (InterruptedException e) {
+            releaseGuardForNodeId(guard);
+            throw e;
+        }
+    }
+
+    private String formatNanos(long nanos) {
+        return "'" + TimeUnit.NANOSECONDS.toMillis(nanos) + " ms'";
+    }
+
+    /**
+     * Get guard and lock for node.
+     * @param flowcapableNodePath II of node for which guard should be acquired
+     * @return semaphore guard
+     */
+    private Semaphore summonGuardAndAcquire(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath) {
+        final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
+        final Semaphore guard = Preconditions.checkNotNull(semaphoreKeeper.summonGuard(flowcapableNodePath),
+                "no guard for " + nodeId.getValue());
+        try {
+            guard.acquire();
+        } catch (InterruptedException e) {
+            LOG.error("syncup summon {} failed {}", nodeId.getValue(), e);
+            return null;
+        }
+        LOG.trace("syncup summon {} guard:{} thread:{}", nodeId.getValue(), guard, threadName());
+        return guard;
+    }
+
+    /**
+     * Unlock and release guard.
+     * @param guard semaphore guard which should be unlocked
+     */
+    private void releaseGuardForNodeId(final Semaphore guard) {
+        if (guard != null) {
+            guard.release();
+            LOG.trace("syncup release guard:{} thread:{}", guard, threadName());
+        }
+    }
+
+    private static String threadName() {
+        final Thread currentThread = Thread.currentThread();
+        return currentThread.getName();
+    }
+
+}