Bump MRI upstreams
[openflowplugin.git] / applications / forwardingrules-sync / src / main / java / org / opendaylight / openflowplugin / applications / frsync / impl / SyncReactorGuardDecorator.java
index f879aea76f3a3ddee05b77e71ffe9fb7b159edaa..ebd10d89ef4f4138def2719b41302761bd32d481 100644 (file)
-/**\r
- * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.\r
- *\r
- * This program and the accompanying materials are made available under the\r
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
- * and is available at http://www.eclipse.org/legal/epl-v10.html\r
- */\r
-\r
-package org.opendaylight.openflowplugin.applications.frsync.impl;\r
-\r
-import com.google.common.base.Preconditions;\r
-import com.google.common.util.concurrent.FutureCallback;\r
-import com.google.common.util.concurrent.Futures;\r
-import com.google.common.util.concurrent.ListenableFuture;\r
-import java.util.concurrent.Semaphore;\r
-import java.util.concurrent.TimeUnit;\r
-import javax.annotation.Nullable;\r
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;\r
-import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper;\r
-import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;\r
-import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;\r
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;\r
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;\r
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;\r
-import org.slf4j.Logger;\r
-import org.slf4j.LoggerFactory;\r
-\r
-/**\r
- * Decorator for NodeId level syncup locking.\r
- */\r
-public class SyncReactorGuardDecorator implements SyncReactor {\r
-\r
-    private static final Logger LOG = LoggerFactory.getLogger(SyncReactorGuardDecorator.class);\r
-\r
-    private final SyncReactor delegate;\r
-    private final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper;\r
-\r
-    public SyncReactorGuardDecorator(SyncReactor delegate,\r
-            SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper) {\r
-        this.delegate = delegate;\r
-        this.semaphoreKeeper = semaphoreKeeper;\r
-    }\r
-\r
-    public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,\r
-                                            final FlowCapableNode configTree, final FlowCapableNode operationalTree,\r
-                                            final LogicalDatastoreType dsType) throws InterruptedException {\r
-        final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
-        LOG.trace("syncup guard {}", nodeId.getValue());\r
-\r
-        final long stampBeforeGuard = System.nanoTime();\r
-        final Semaphore guard = summonGuardAndAcquire(flowcapableNodePath);//TODO handle InteruptedException\r
-\r
-        try {\r
-            final long stampAfterGuard = System.nanoTime();\r
-            if (LOG.isDebugEnabled()) {\r
-                LOG.debug("syncup start {} waiting:{} guard:{} thread:{}", nodeId.getValue(),\r
-                        formatNanos(stampAfterGuard - stampBeforeGuard),\r
-                        guard, threadName());\r
-            }\r
-\r
-            final ListenableFuture<Boolean> endResult =\r
-                    delegate.syncup(flowcapableNodePath, configTree, operationalTree, dsType);//TODO handle InteruptedException\r
-\r
-            Futures.addCallback(endResult, new FutureCallback<Boolean>() {\r
-                @Override\r
-                public void onSuccess(@Nullable final Boolean result) {\r
-                    if (LOG.isDebugEnabled()) {\r
-                        final long stampFinished = System.nanoTime();\r
-                        LOG.debug("syncup finished {} took:{} rpc:{} wait:{} guard:{} permits thread:{}", nodeId.getValue(),\r
-                                formatNanos(stampFinished - stampBeforeGuard),\r
-                                formatNanos(stampFinished - stampAfterGuard),\r
-                                formatNanos(stampAfterGuard - stampBeforeGuard),\r
-                                guard.availablePermits(), threadName());\r
-                    }\r
-\r
-                    releaseGuardForNodeId(guard);\r
-                }\r
-\r
-                @Override\r
-                public void onFailure(final Throwable t) {\r
-                    if (LOG.isDebugEnabled()) {\r
-                        final long stampFinished = System.nanoTime();\r
-                        LOG.warn("syncup failed {} took:{} rpc:{} wait:{} guard:{} permits thread:{}", nodeId.getValue(),\r
-                                formatNanos(stampFinished - stampBeforeGuard),\r
-                                formatNanos(stampFinished - stampAfterGuard),\r
-                                formatNanos(stampAfterGuard - stampBeforeGuard),\r
-                                guard.availablePermits(), threadName());\r
-                    }\r
-\r
-                    releaseGuardForNodeId(guard);\r
-                }\r
-            });\r
-            return endResult;\r
-        } catch (InterruptedException e) {\r
-            releaseGuardForNodeId(guard);\r
-            throw e;\r
-        }\r
-    }\r
-\r
-    private String formatNanos(long nanos) {\r
-        return "'" + TimeUnit.NANOSECONDS.toMillis(nanos) + " ms'";\r
-    }\r
-\r
-    /**\r
-     * Get guard and lock for node.\r
-     * @param flowcapableNodePath II of node for which guard should be acquired\r
-     * @return semaphore guard\r
-     */\r
-    private Semaphore summonGuardAndAcquire(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath)\r
-            throws InterruptedException {\r
-        final Semaphore guard = Preconditions.checkNotNull(semaphoreKeeper.summonGuard(flowcapableNodePath),\r
-                "no guard for " + flowcapableNodePath);\r
-\r
-        if (LOG.isDebugEnabled()) {\r
-            final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
-            try {\r
-                LOG.debug("syncup summon {} guard:{} thread:{}", nodeId.getValue(), guard, threadName());\r
-            } catch (Exception e) {\r
-                LOG.error("error logging guard after summon before aquiring {}", nodeId);\r
-            }\r
-        }\r
-\r
-        guard.acquire();\r
-        return guard;\r
-    }\r
-\r
-    /**\r
-     * Unlock and release guard.\r
-     * @param guard semaphore guard which should be unlocked\r
-     */\r
-    private void releaseGuardForNodeId(final Semaphore guard) {\r
-        if (guard == null) {\r
-            return;\r
-        }\r
-        guard.release();\r
-    }\r
-\r
-    private static String threadName() {\r
-        final Thread currentThread = Thread.currentThread();\r
-        return currentThread.getName();\r
-    }\r
-\r
-}\r
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.applications.frsync.impl;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper;
+import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
+import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
+import org.opendaylight.openflowplugin.applications.frsync.util.SemaphoreKeeperGuavaImpl;
+import org.opendaylight.openflowplugin.applications.frsync.util.SyncupEntry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Decorator for NodeId level syncup locking.
+ */
+public class SyncReactorGuardDecorator implements SyncReactor {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SyncReactorGuardDecorator.class);
+    private final SyncReactor delegate;
+    private final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper =
+            new SemaphoreKeeperGuavaImpl<>(1, true);
+
+    public SyncReactorGuardDecorator(final SyncReactor delegate) {
+        this.delegate = delegate;
+    }
+
+    @Override
+    public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
+                                            final SyncupEntry syncupEntry) {
+        final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
+        final long stampBeforeGuard = System.nanoTime();
+        final Semaphore guard = semaphoreKeeper.summonGuardAndAcquire(flowcapableNodePath);
+        if (guard == null) {
+            return Futures.immediateFuture(Boolean.FALSE);
+        }
+        final long stampAfterGuard = System.nanoTime();
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Syncup guard acquired and running for {} ", nodeId.getValue());
+        }
+        final ListenableFuture<Boolean> endResult = delegate.syncup(flowcapableNodePath, syncupEntry);
+        Futures.addCallback(endResult, createSyncupCallback(guard, stampBeforeGuard, stampAfterGuard, nodeId),
+                MoreExecutors.directExecutor());
+        return endResult;
+    }
+
+    private FutureCallback<Boolean> createSyncupCallback(final Semaphore guard,
+                                                         final long stampBeforeGuard,
+                                                         final long stampAfterGuard,
+                                                         final NodeId nodeId) {
+        return new FutureCallback<>() {
+            @Override
+            public void onSuccess(final Boolean result) {
+                if (LOG.isDebugEnabled()) {
+                    final long stampFinished = System.nanoTime();
+                    LOG.debug("Syncup finished {} took:{} rpc:{} wait:{}", nodeId.getValue(),
+                            formatNanos(stampFinished - stampBeforeGuard), formatNanos(stampFinished - stampAfterGuard),
+                            formatNanos(stampAfterGuard - stampBeforeGuard));
+                }
+                semaphoreKeeper.releaseGuard(guard);
+            }
+
+            @Override
+            public void onFailure(final Throwable failure) {
+                final long stampFinished = System.nanoTime();
+                LOG.warn("Syncup failed {} took:{} rpc:{} wait:{}", nodeId.getValue(),
+                        formatNanos(stampFinished - stampBeforeGuard), formatNanos(stampFinished - stampAfterGuard),
+                        formatNanos(stampAfterGuard - stampBeforeGuard));
+                semaphoreKeeper.releaseGuard(guard);
+            }
+        };
+    }
+
+    private static String formatNanos(final long nanos) {
+        return "'" + TimeUnit.NANOSECONDS.toMillis(nanos) + " ms'";
+    }
+}