X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=applications%2Fforwardingrules-sync%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fapplications%2Ffrsync%2Fimpl%2FSyncReactorGuardDecorator.java;h=91bf600760e6ec94ab50d33af944daefc7dc9ea1;hb=211844a4fda1b9ab05732680697ca507e952a285;hp=f879aea76f3a3ddee05b77e71ffe9fb7b159edaa;hpb=ed61933c71262b4239b88d7ef8a3110b4b76fabd;p=openflowplugin.git diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorGuardDecorator.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorGuardDecorator.java index f879aea76f..91bf600760 100644 --- a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorGuardDecorator.java +++ b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorGuardDecorator.java @@ -1,143 +1,91 @@ -/** - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.openflowplugin.applications.frsync.impl; - -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import javax.annotation.Nullable; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper; -import org.opendaylight.openflowplugin.applications.frsync.SyncReactor; -import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; -import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Decorator for NodeId level syncup locking. - */ -public class SyncReactorGuardDecorator implements SyncReactor { - - private static final Logger LOG = LoggerFactory.getLogger(SyncReactorGuardDecorator.class); - - private final SyncReactor delegate; - private final SemaphoreKeeper> semaphoreKeeper; - - public SyncReactorGuardDecorator(SyncReactor delegate, - SemaphoreKeeper> semaphoreKeeper) { - this.delegate = delegate; - this.semaphoreKeeper = semaphoreKeeper; - } - - public ListenableFuture syncup(final InstanceIdentifier flowcapableNodePath, - final FlowCapableNode configTree, final FlowCapableNode operationalTree, - final LogicalDatastoreType dsType) throws InterruptedException { - final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath); - LOG.trace("syncup guard {}", nodeId.getValue()); - - final long stampBeforeGuard = System.nanoTime(); - final Semaphore guard = summonGuardAndAcquire(flowcapableNodePath);//TODO handle InteruptedException - - try { - final long stampAfterGuard = System.nanoTime(); - if (LOG.isDebugEnabled()) { - LOG.debug("syncup start {} waiting:{} guard:{} thread:{}", nodeId.getValue(), - formatNanos(stampAfterGuard - stampBeforeGuard), - guard, threadName()); - } - - final ListenableFuture endResult = - delegate.syncup(flowcapableNodePath, configTree, operationalTree, dsType);//TODO handle InteruptedException - - Futures.addCallback(endResult, new FutureCallback() { - @Override - public void onSuccess(@Nullable final Boolean result) { - if (LOG.isDebugEnabled()) { - final long stampFinished = System.nanoTime(); - LOG.debug("syncup finished {} took:{} rpc:{} wait:{} guard:{} permits thread:{}", nodeId.getValue(), - formatNanos(stampFinished - stampBeforeGuard), - formatNanos(stampFinished - stampAfterGuard), - formatNanos(stampAfterGuard - stampBeforeGuard), - guard.availablePermits(), threadName()); - } - - releaseGuardForNodeId(guard); - } - - @Override - public void onFailure(final Throwable t) { - if (LOG.isDebugEnabled()) { - final long stampFinished = System.nanoTime(); - LOG.warn("syncup failed {} took:{} rpc:{} wait:{} guard:{} permits thread:{}", nodeId.getValue(), - formatNanos(stampFinished - stampBeforeGuard), - formatNanos(stampFinished - stampAfterGuard), - formatNanos(stampAfterGuard - stampBeforeGuard), - guard.availablePermits(), threadName()); - } - - releaseGuardForNodeId(guard); - } - }); - return endResult; - } catch (InterruptedException e) { - releaseGuardForNodeId(guard); - throw e; - } - } - - private String formatNanos(long nanos) { - return "'" + TimeUnit.NANOSECONDS.toMillis(nanos) + " ms'"; - } - - /** - * Get guard and lock for node. - * @param flowcapableNodePath II of node for which guard should be acquired - * @return semaphore guard - */ - private Semaphore summonGuardAndAcquire(final InstanceIdentifier flowcapableNodePath) - throws InterruptedException { - final Semaphore guard = Preconditions.checkNotNull(semaphoreKeeper.summonGuard(flowcapableNodePath), - "no guard for " + flowcapableNodePath); - - if (LOG.isDebugEnabled()) { - final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath); - try { - LOG.debug("syncup summon {} guard:{} thread:{}", nodeId.getValue(), guard, threadName()); - } catch (Exception e) { - LOG.error("error logging guard after summon before aquiring {}", nodeId); - } - } - - guard.acquire(); - return guard; - } - - /** - * Unlock and release guard. - * @param guard semaphore guard which should be unlocked - */ - private void releaseGuardForNodeId(final Semaphore guard) { - if (guard == null) { - return; - } - guard.release(); - } - - private static String threadName() { - final Thread currentThread = Thread.currentThread(); - return currentThread.getName(); - } - -} +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowplugin.applications.frsync.impl; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper; +import org.opendaylight.openflowplugin.applications.frsync.SyncReactor; +import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil; +import org.opendaylight.openflowplugin.applications.frsync.util.SemaphoreKeeperGuavaImpl; +import org.opendaylight.openflowplugin.applications.frsync.util.SyncupEntry; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Decorator for NodeId level syncup locking. + */ +public class SyncReactorGuardDecorator implements SyncReactor { + + private static final Logger LOG = LoggerFactory.getLogger(SyncReactorGuardDecorator.class); + private final SyncReactor delegate; + private final SemaphoreKeeper> semaphoreKeeper = + new SemaphoreKeeperGuavaImpl<>(1, true); + + public SyncReactorGuardDecorator(final SyncReactor delegate) { + this.delegate = delegate; + } + + @Override + public ListenableFuture syncup(final InstanceIdentifier flowcapableNodePath, + final SyncupEntry syncupEntry) { + final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath); + final long stampBeforeGuard = System.nanoTime(); + final Semaphore guard = semaphoreKeeper.summonGuardAndAcquire(flowcapableNodePath); + if (guard == null) { + return Futures.immediateFuture(Boolean.FALSE); + } + final long stampAfterGuard = System.nanoTime(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Syncup guard acquired and running for {} ", nodeId.getValue()); + } + final ListenableFuture endResult = delegate.syncup(flowcapableNodePath, syncupEntry); + Futures.addCallback(endResult, createSyncupCallback(guard, stampBeforeGuard, stampAfterGuard, nodeId), + MoreExecutors.directExecutor()); + return endResult; + } + + private FutureCallback createSyncupCallback(final Semaphore guard, + final long stampBeforeGuard, + final long stampAfterGuard, + final NodeId nodeId) { + return new FutureCallback() { + @Override + public void onSuccess(final Boolean result) { + if (LOG.isDebugEnabled()) { + final long stampFinished = System.nanoTime(); + LOG.debug("Syncup finished {} took:{} rpc:{} wait:{}", nodeId.getValue(), + formatNanos(stampFinished - stampBeforeGuard), formatNanos(stampFinished - stampAfterGuard), + formatNanos(stampAfterGuard - stampBeforeGuard)); + } + semaphoreKeeper.releaseGuard(guard); + } + + @Override + public void onFailure(final Throwable failure) { + final long stampFinished = System.nanoTime(); + LOG.warn("Syncup failed {} took:{} rpc:{} wait:{}", nodeId.getValue(), + formatNanos(stampFinished - stampBeforeGuard), formatNanos(stampFinished - stampAfterGuard), + formatNanos(stampAfterGuard - stampBeforeGuard)); + semaphoreKeeper.releaseGuard(guard); + } + }; + } + + private static String formatNanos(final long nanos) { + return "'" + TimeUnit.NANOSECONDS.toMillis(nanos) + " ms'"; + } +}