Bump MRI upstreams
[openflowplugin.git] / applications / forwardingrules-sync / src / main / java / org / opendaylight / openflowplugin / applications / frsync / impl / SyncReactorGuardDecorator.java
index ea019e5ee6bd72bf2c0266ee24f596014bf362c5..ebd10d89ef4f4138def2719b41302761bd32d481 100644 (file)
-/**\r
- * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.\r
- *\r
- * This program and the accompanying materials are made available under the\r
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
- * and is available at http://www.eclipse.org/legal/epl-v10.html\r
- */\r
-\r
-package org.opendaylight.openflowplugin.applications.frsync.impl;\r
-\r
-import java.util.concurrent.Semaphore;\r
-import java.util.concurrent.TimeUnit;\r
-\r
-import javax.annotation.Nullable;\r
-\r
-import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper;\r
-import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;\r
-import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;\r
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;\r
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;\r
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;\r
-import org.slf4j.Logger;\r
-import org.slf4j.LoggerFactory;\r
-\r
-import com.google.common.base.Preconditions;\r
-import com.google.common.util.concurrent.FutureCallback;\r
-import com.google.common.util.concurrent.Futures;\r
-import com.google.common.util.concurrent.ListenableFuture;\r
-\r
-/**\r
- * Decorator for NodeId level syncup locking.\r
- */\r
-public class SyncReactorGuardDecorator implements SyncReactor {\r
-\r
-    private static final Logger LOG = LoggerFactory.getLogger(SyncReactorGuardDecorator.class);\r
-\r
-    private final SyncReactor delegate;\r
-    private final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper;\r
-\r
-    public SyncReactorGuardDecorator(SyncReactor delegate,\r
-            SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper) {\r
-        this.delegate = delegate;\r
-        this.semaphoreKeeper = semaphoreKeeper;\r
-    }\r
-\r
-    public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,\r
-            final FlowCapableNode configTree, final FlowCapableNode operationalTree) throws InterruptedException {\r
-        final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
-        LOG.trace("syncup {}", nodeId.getValue());\r
-\r
-        final long stampBeforeGuard = System.nanoTime();\r
-        final Semaphore guard = summonGuardAndAcquire(flowcapableNodePath);//TODO handle InteruptedException\r
-\r
-        try {\r
-            final long stampAfterGuard = System.nanoTime();\r
-            if (LOG.isDebugEnabled()) {\r
-                LOG.debug("syncup start {} waiting:{} guard:{} thread:{}", nodeId.getValue(),\r
-                        formatNanos(stampAfterGuard - stampBeforeGuard),\r
-                        guard, threadName());\r
-            }\r
-            \r
-            final ListenableFuture<Boolean> endResult =\r
-                    delegate.syncup(flowcapableNodePath, configTree, operationalTree);//TODO handle InteruptedException\r
-            \r
-            Futures.addCallback(endResult, new FutureCallback<Boolean>() {\r
-                @Override\r
-                public void onSuccess(@Nullable final Boolean result) {\r
-                    if (LOG.isDebugEnabled()) {\r
-                        final long stampFinished = System.nanoTime();\r
-                        LOG.debug("syncup finished {} took:{} rpc:{} wait:{} guard:{}, thread:{}", nodeId.getValue(),\r
-                                formatNanos(stampFinished - stampBeforeGuard),\r
-                                formatNanos(stampFinished - stampAfterGuard),\r
-                                formatNanos(stampAfterGuard - stampBeforeGuard),\r
-                                guard, threadName());\r
-                    }\r
-                    \r
-                    lockReleaseForNodeId(nodeId, guard);\r
-                }\r
-                \r
-                @Override\r
-                public void onFailure(final Throwable t) {\r
-                    if (LOG.isDebugEnabled()) {\r
-                        final long stampFinished = System.nanoTime();\r
-                        LOG.warn("syncup failed {} took:{} rpc:{} wait:{} guard:{} thread:{}", nodeId.getValue(),\r
-                                formatNanos(stampFinished - stampBeforeGuard),\r
-                                formatNanos(stampFinished - stampAfterGuard),\r
-                                formatNanos(stampAfterGuard - stampBeforeGuard),\r
-                                guard, threadName());\r
-                    }\r
-                    \r
-                    lockReleaseForNodeId(nodeId, guard);\r
-                }\r
-            });\r
-            return endResult;\r
-        } catch(InterruptedException e) {\r
-            lockReleaseForNodeId(nodeId, guard);\r
-            throw e;\r
-        }\r
-    }\r
-\r
-    protected String formatNanos(long nanos) {\r
-        return "'" + TimeUnit.NANOSECONDS.toMillis(nanos) + " ms'";\r
-    }\r
-\r
-    /**\r
-     * get guard\r
-     *\r
-     * @param flowcapableNodePath\r
-     * @return\r
-     */\r
-    protected Semaphore summonGuardAndAcquire(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath)\r
-            throws InterruptedException {\r
-        final Semaphore guard = Preconditions.checkNotNull(semaphoreKeeper.summonGuard(flowcapableNodePath),\r
-                "no guard for " + flowcapableNodePath);\r
-\r
-        if (LOG.isDebugEnabled()) {\r
-            final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
-            try {\r
-                LOG.debug("syncup summon {} guard:{} thread:{}", nodeId.getValue(), guard, threadName());\r
-            } catch (Exception e) {\r
-                LOG.error("error logging guard after summon before aquiring {}", nodeId);\r
-            }\r
-        }\r
-\r
-        guard.acquire();\r
-        return guard;\r
-    }\r
-\r
-    /**\r
-     * unlock per node\r
-     *\r
-     * @param nodeId\r
-     * @param guard\r
-     */\r
-    protected void lockReleaseForNodeId(final NodeId nodeId,\r
-            final Semaphore guard) {\r
-        if (guard == null) {\r
-            return;\r
-        }\r
-        guard.release();\r
-    }\r
-\r
-    static String threadName() {\r
-        final Thread currentThread = Thread.currentThread();\r
-        return currentThread.getName();\r
-    }\r
-\r
-}\r
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.applications.frsync.impl;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper;
+import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
+import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
+import org.opendaylight.openflowplugin.applications.frsync.util.SemaphoreKeeperGuavaImpl;
+import org.opendaylight.openflowplugin.applications.frsync.util.SyncupEntry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Decorator for NodeId level syncup locking.
+ */
+public class SyncReactorGuardDecorator implements SyncReactor {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SyncReactorGuardDecorator.class);
+    private final SyncReactor delegate;
+    private final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper =
+            new SemaphoreKeeperGuavaImpl<>(1, true);
+
+    public SyncReactorGuardDecorator(final SyncReactor delegate) {
+        this.delegate = delegate;
+    }
+
+    @Override
+    public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
+                                            final SyncupEntry syncupEntry) {
+        final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
+        final long stampBeforeGuard = System.nanoTime();
+        final Semaphore guard = semaphoreKeeper.summonGuardAndAcquire(flowcapableNodePath);
+        if (guard == null) {
+            return Futures.immediateFuture(Boolean.FALSE);
+        }
+        final long stampAfterGuard = System.nanoTime();
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Syncup guard acquired and running for {} ", nodeId.getValue());
+        }
+        final ListenableFuture<Boolean> endResult = delegate.syncup(flowcapableNodePath, syncupEntry);
+        Futures.addCallback(endResult, createSyncupCallback(guard, stampBeforeGuard, stampAfterGuard, nodeId),
+                MoreExecutors.directExecutor());
+        return endResult;
+    }
+
+    private FutureCallback<Boolean> createSyncupCallback(final Semaphore guard,
+                                                         final long stampBeforeGuard,
+                                                         final long stampAfterGuard,
+                                                         final NodeId nodeId) {
+        return new FutureCallback<>() {
+            @Override
+            public void onSuccess(final Boolean result) {
+                if (LOG.isDebugEnabled()) {
+                    final long stampFinished = System.nanoTime();
+                    LOG.debug("Syncup finished {} took:{} rpc:{} wait:{}", nodeId.getValue(),
+                            formatNanos(stampFinished - stampBeforeGuard), formatNanos(stampFinished - stampAfterGuard),
+                            formatNanos(stampAfterGuard - stampBeforeGuard));
+                }
+                semaphoreKeeper.releaseGuard(guard);
+            }
+
+            @Override
+            public void onFailure(final Throwable failure) {
+                final long stampFinished = System.nanoTime();
+                LOG.warn("Syncup failed {} took:{} rpc:{} wait:{}", nodeId.getValue(),
+                        formatNanos(stampFinished - stampBeforeGuard), formatNanos(stampFinished - stampAfterGuard),
+                        formatNanos(stampAfterGuard - stampBeforeGuard));
+                semaphoreKeeper.releaseGuard(guard);
+            }
+        };
+    }
+
+    private static String formatNanos(final long nanos) {
+        return "'" + TimeUnit.NANOSECONDS.toMillis(nanos) + " ms'";
+    }
+}