-/**\r
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.\r
- *\r
- * This program and the accompanying materials are made available under the\r
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
- * and is available at http://www.eclipse.org/legal/epl-v10.html\r
- */\r
-\r
-package org.opendaylight.openflowplugin.applications.frsync.impl;\r
-\r
-import com.google.common.base.Preconditions;\r
-import com.google.common.util.concurrent.FutureCallback;\r
-import com.google.common.util.concurrent.Futures;\r
-import com.google.common.util.concurrent.ListenableFuture;\r
-import java.util.concurrent.Semaphore;\r
-import java.util.concurrent.TimeUnit;\r
-import javax.annotation.Nullable;\r
-import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper;\r
-import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;\r
-import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;\r
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;\r
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;\r
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;\r
-import org.slf4j.Logger;\r
-import org.slf4j.LoggerFactory;\r
-\r
-/**\r
- * Decorator for NodeId level syncup locking.\r
- */\r
-public class SyncReactorGuardDecorator implements SyncReactor {\r
-\r
- private static final Logger LOG = LoggerFactory.getLogger(SyncReactorGuardDecorator.class);\r
-\r
- private final SyncReactor delegate;\r
- private final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper;\r
-\r
- public SyncReactorGuardDecorator(SyncReactor delegate,\r
- SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper) {\r
- this.delegate = delegate;\r
- this.semaphoreKeeper = semaphoreKeeper;\r
- }\r
-\r
- public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,\r
- final FlowCapableNode configTree, final FlowCapableNode operationalTree) throws InterruptedException {\r
- final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
- LOG.trace("syncup {}", nodeId.getValue());\r
-\r
- final long stampBeforeGuard = System.nanoTime();\r
- final Semaphore guard = summonGuardAndAcquire(flowcapableNodePath);//TODO handle InteruptedException\r
-\r
- try {\r
- final long stampAfterGuard = System.nanoTime();\r
- if (LOG.isDebugEnabled()) {\r
- LOG.debug("syncup start {} waiting:{} guard:{} thread:{}", nodeId.getValue(),\r
- formatNanos(stampAfterGuard - stampBeforeGuard),\r
- guard, threadName());\r
- }\r
- \r
- final ListenableFuture<Boolean> endResult =\r
- delegate.syncup(flowcapableNodePath, configTree, operationalTree);//TODO handle InteruptedException\r
- \r
- Futures.addCallback(endResult, new FutureCallback<Boolean>() {\r
- @Override\r
- public void onSuccess(@Nullable final Boolean result) {\r
- if (LOG.isDebugEnabled()) {\r
- final long stampFinished = System.nanoTime();\r
- LOG.debug("syncup finished {} took:{} rpc:{} wait:{} guard:{}, thread:{}", nodeId.getValue(),\r
- formatNanos(stampFinished - stampBeforeGuard),\r
- formatNanos(stampFinished - stampAfterGuard),\r
- formatNanos(stampAfterGuard - stampBeforeGuard),\r
- guard, threadName());\r
- }\r
-\r
- releaseGuardForNodeId(nodeId, guard);\r
- }\r
-\r
- @Override\r
- public void onFailure(final Throwable t) {\r
- if (LOG.isDebugEnabled()) {\r
- final long stampFinished = System.nanoTime();\r
- LOG.warn("syncup failed {} took:{} rpc:{} wait:{} guard:{} thread:{}", nodeId.getValue(),\r
- formatNanos(stampFinished - stampBeforeGuard),\r
- formatNanos(stampFinished - stampAfterGuard),\r
- formatNanos(stampAfterGuard - stampBeforeGuard),\r
- guard, threadName());\r
- }\r
-\r
- releaseGuardForNodeId(nodeId, guard);\r
- }\r
- });\r
- return endResult;\r
- } catch(InterruptedException e) {\r
- releaseGuardForNodeId(nodeId, guard);\r
- throw e;\r
- }\r
- }\r
-\r
- protected String formatNanos(long nanos) {\r
- return "'" + TimeUnit.NANOSECONDS.toMillis(nanos) + " ms'";\r
- }\r
-\r
- /**\r
- * get guard\r
- *\r
- * @param flowcapableNodePath II of node for which guard should be acquired\r
- * @return semaphore guard\r
- */\r
- protected Semaphore summonGuardAndAcquire(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath)\r
- throws InterruptedException {\r
- final Semaphore guard = Preconditions.checkNotNull(semaphoreKeeper.summonGuard(flowcapableNodePath),\r
- "no guard for " + flowcapableNodePath);\r
-\r
- if (LOG.isDebugEnabled()) {\r
- final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
- try {\r
- LOG.debug("syncup summon {} guard:{} thread:{}", nodeId.getValue(), guard, threadName());\r
- } catch (Exception e) {\r
- LOG.error("error logging guard after summon before aquiring {}", nodeId);\r
- }\r
- }\r
-\r
- guard.acquire();\r
- return guard;\r
- }\r
-\r
- /**\r
- * unlock per node\r
- *\r
- * @param nodeId NodeId of node which should be unlocked\r
- * @param guard semaphore guard\r
- */\r
- protected void releaseGuardForNodeId(final NodeId nodeId, final Semaphore guard) {\r
- if (guard == null) {\r
- return;\r
- }\r
- guard.release();\r
- }\r
-\r
- static String threadName() {\r
- final Thread currentThread = Thread.currentThread();\r
- return currentThread.getName();\r
- }\r
-\r
-}\r
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.applications.frsync.impl;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper;
+import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
+import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
+import org.opendaylight.openflowplugin.applications.frsync.util.SemaphoreKeeperGuavaImpl;
+import org.opendaylight.openflowplugin.applications.frsync.util.SyncupEntry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Decorator for NodeId level syncup locking.
+ */
+public class SyncReactorGuardDecorator implements SyncReactor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SyncReactorGuardDecorator.class);
+ private final SyncReactor delegate;
+ private final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper =
+ new SemaphoreKeeperGuavaImpl<>(1, true);
+
+ public SyncReactorGuardDecorator(final SyncReactor delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
+ final SyncupEntry syncupEntry) {
+ final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
+ final long stampBeforeGuard = System.nanoTime();
+ final Semaphore guard = semaphoreKeeper.summonGuardAndAcquire(flowcapableNodePath);
+ if (guard == null) {
+ return Futures.immediateFuture(Boolean.FALSE);
+ }
+ final long stampAfterGuard = System.nanoTime();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Syncup guard acquired and running for {} ", nodeId.getValue());
+ }
+ final ListenableFuture<Boolean> endResult = delegate.syncup(flowcapableNodePath, syncupEntry);
+ Futures.addCallback(endResult, createSyncupCallback(guard, stampBeforeGuard, stampAfterGuard, nodeId),
+ MoreExecutors.directExecutor());
+ return endResult;
+ }
+
+ private FutureCallback<Boolean> createSyncupCallback(final Semaphore guard,
+ final long stampBeforeGuard,
+ final long stampAfterGuard,
+ final NodeId nodeId) {
+ return new FutureCallback<>() {
+ @Override
+ public void onSuccess(final Boolean result) {
+ if (LOG.isDebugEnabled()) {
+ final long stampFinished = System.nanoTime();
+ LOG.debug("Syncup finished {} took:{} rpc:{} wait:{}", nodeId.getValue(),
+ formatNanos(stampFinished - stampBeforeGuard), formatNanos(stampFinished - stampAfterGuard),
+ formatNanos(stampAfterGuard - stampBeforeGuard));
+ }
+ semaphoreKeeper.releaseGuard(guard);
+ }
+
+ @Override
+ public void onFailure(final Throwable failure) {
+ final long stampFinished = System.nanoTime();
+ LOG.warn("Syncup failed {} took:{} rpc:{} wait:{}", nodeId.getValue(),
+ formatNanos(stampFinished - stampBeforeGuard), formatNanos(stampFinished - stampAfterGuard),
+ formatNanos(stampAfterGuard - stampBeforeGuard));
+ semaphoreKeeper.releaseGuard(guard);
+ }
+ };
+ }
+
+ private static String formatNanos(final long nanos) {
+ return "'" + TimeUnit.NANOSECONDS.toMillis(nanos) + " ms'";
+ }
+}