From: Andrej Leitner Date: Tue, 27 Sep 2016 12:13:29 +0000 (+0200) Subject: Bug 6745 Improve compression queue locking and handle InterruptedException X-Git-Tag: release/boron-sr1~15 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=82b1671fb718bbf5b6a18761c41ce8f8be2e2898;p=openflowplugin.git Bug 6745 Improve compression queue locking and handle InterruptedException - compression queue could be locked per device and not for every change since there is only one entry per device in it - IE handled in one place Change-Id: Ic28b448d619c663757f391e7a443b28c8f6a871d Signed-off-by: Andrej Leitner --- diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/SemaphoreKeeper.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/SemaphoreKeeper.java index 8185bb0b29..4964f14919 100644 --- a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/SemaphoreKeeper.java +++ b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/SemaphoreKeeper.java @@ -10,6 +10,7 @@ package org.opendaylight.openflowplugin.applications.frsync; import java.util.concurrent.Semaphore; import javax.annotation.Nonnull; +import javax.annotation.Nullable; /** * Proposal for how a key based semaphore provider should look like. @@ -36,5 +37,18 @@ public interface SemaphoreKeeper { * @param key semaphore identifier * @return new or existing semaphore for given key, for one key there is always only one semaphore available */ - Semaphore summonGuard(@Nonnull K key); + Semaphore summonGuard(@Nonnull final K key); + + /** + * Get guard and lock for key. + * @param key for which guard should be created and acquired + * @return semaphore guard + */ + Semaphore summonGuardAndAcquire(@Nonnull final K key); + + /** + * Unlock and release guard. + * @param guard semaphore guard which should be released + */ + void releaseGuard(@Nullable final Semaphore guard); } diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/SyncReactor.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/SyncReactor.java index d38d784432..c7435f322c 100644 --- a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/SyncReactor.java +++ b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/SyncReactor.java @@ -21,8 +21,7 @@ public interface SyncReactor { * @param flowcapableNodePath path to openflow augmentation of node * @param syncupEntry configured node + device reflection * @return synchronization outcome - * @throws InterruptedException */ ListenableFuture syncup(final InstanceIdentifier flowcapableNodePath, - final SyncupEntry syncupEntry) throws InterruptedException; + final SyncupEntry syncupEntry); } diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/AbstractFrmSyncListener.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/AbstractFrmSyncListener.java index 9ad1542e1c..835823fba7 100644 --- a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/AbstractFrmSyncListener.java +++ b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/AbstractFrmSyncListener.java @@ -45,8 +45,6 @@ public abstract class AbstractFrmSyncListener implements N LOG.trace("Syncup for {} return from {} listener", nodeId.getValue(), dsType()); } } - } catch (InterruptedException e) { - LOG.warn("Permit for forwarding rules sync not acquired: {}", nodeId.getValue()); } catch (Exception e) { LOG.error("Error processing inventory node modification: {}, {}", nodeId.getValue(), e); } @@ -54,7 +52,7 @@ public abstract class AbstractFrmSyncListener implements N } protected abstract Optional> processNodeModification( - final DataTreeModification modification) throws InterruptedException; + final DataTreeModification modification); protected abstract LogicalDatastoreType dsType(); diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/ForwardingRulesSyncProvider.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/ForwardingRulesSyncProvider.java index 642bd684e3..cdbec91a79 100644 --- a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/ForwardingRulesSyncProvider.java +++ b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/ForwardingRulesSyncProvider.java @@ -114,7 +114,8 @@ public class ForwardingRulesSyncProvider implements AutoCloseable, BindingAwareP final SyncReactor syncReactorRetry = new SyncReactorRetryDecorator(syncReactorImpl, reconciliationRegistry); final SyncReactor syncReactorGuard = new SyncReactorGuardDecorator(syncReactorRetry, new SemaphoreKeeperGuavaImpl<>(1, true)); - final SyncReactor syncReactorFutureZip = new SyncReactorFutureZipDecorator(syncReactorGuard, syncThreadPool); + final SyncReactor syncReactorFutureZip = new SyncReactorFutureZipDecorator(syncReactorGuard, syncThreadPool, + new SemaphoreKeeperGuavaImpl<>(1, true)); final SyncReactor reactor = new SyncReactorClusterDecorator(syncReactorFutureZip, deviceMastershipManager); @@ -138,7 +139,7 @@ public class ForwardingRulesSyncProvider implements AutoCloseable, BindingAwareP LOG.info("ForwardingRulesSync has started."); } - public void close() throws InterruptedException { + public void close() { if (Objects.nonNull(dataTreeConfigChangeListener)) { dataTreeConfigChangeListener.close(); dataTreeConfigChangeListener = null; diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SimplifiedConfigListener.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SimplifiedConfigListener.java index e4964b99e0..d9f9a361e3 100644 --- a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SimplifiedConfigListener.java +++ b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SimplifiedConfigListener.java @@ -52,10 +52,9 @@ public class SimplifiedConfigListener extends AbstractFrmSyncListener> processNodeModification( - final DataTreeModification modification) throws InterruptedException { + final DataTreeModification modification) { final InstanceIdentifier nodePath = modification.getRootPath().getRootIdentifier(); final NodeId nodeId = PathUtil.digNodeId(nodePath); @@ -88,7 +87,7 @@ public class SimplifiedConfigListener extends AbstractFrmSyncListener onNodeAdded(final InstanceIdentifier nodePath, final FlowCapableNode dataAfter, - final FlowCapableNode operationalNode) throws InterruptedException { + final FlowCapableNode operationalNode) { LOG.debug("Reconciliation {}: {}", dsType(), PathUtil.digNodeId(nodePath).getValue()); final SyncupEntry syncupEntry = new SyncupEntry(dataAfter, dsType(), operationalNode, LogicalDatastoreType.OPERATIONAL); return reactor.syncup(nodePath, syncupEntry); @@ -103,7 +102,7 @@ public class SimplifiedConfigListener extends AbstractFrmSyncListener onNodeUpdated(final InstanceIdentifier nodePath, final FlowCapableNode dataBefore, - final FlowCapableNode dataAfter) throws InterruptedException { + final FlowCapableNode dataAfter) { final SyncupEntry syncupEntry = new SyncupEntry(dataAfter, dsType(), dataBefore, dsType()); return reactor.syncup(nodePath, syncupEntry); } @@ -113,7 +112,7 @@ public class SimplifiedConfigListener extends AbstractFrmSyncListener onNodeDeleted(final InstanceIdentifier nodePath, - final FlowCapableNode dataBefore) throws InterruptedException { + final FlowCapableNode dataBefore) { final SyncupEntry syncupEntry = new SyncupEntry(null, dsType(), dataBefore, dsType()); return reactor.syncup(nodePath, syncupEntry); } diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SimplifiedOperationalListener.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SimplifiedOperationalListener.java index 326b1debea..3e9014de10 100644 --- a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SimplifiedOperationalListener.java +++ b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SimplifiedOperationalListener.java @@ -73,10 +73,9 @@ public class SimplifiedOperationalListener extends AbstractFrmSyncListener /** * Update cache, register for device mastership when device connected and start reconciliation if device * is registered and actual modification is consistent.Skip the event otherwise. - * @throws InterruptedException from syncup */ protected Optional> processNodeModification( - final DataTreeModification modification) throws InterruptedException { + final DataTreeModification modification) { final NodeId nodeId = ModificationUtil.nodeId(modification); updateCache(modification); @@ -150,10 +149,8 @@ public class SimplifiedOperationalListener extends AbstractFrmSyncListener * configuration (coming from operational) should be calculated and sent to device. * @param modification from DS * @return optional syncup future - * @throws InterruptedException from syncup */ - private Optional> reconciliation(final DataTreeModification modification) - throws InterruptedException { + private Optional> reconciliation(final DataTreeModification modification) { final NodeId nodeId = ModificationUtil.nodeId(modification); final Optional nodeConfiguration = configDao.loadByNodeId(nodeId); diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorClusterDecorator.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorClusterDecorator.java index 37cbb547f8..c9de9b25ab 100644 --- a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorClusterDecorator.java +++ b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorClusterDecorator.java @@ -37,7 +37,7 @@ public class SyncReactorClusterDecorator implements SyncReactor { @Override public ListenableFuture syncup(final InstanceIdentifier flowcapableNodePath, - final SyncupEntry syncupEntry) throws InterruptedException { + final SyncupEntry syncupEntry) { final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath); if (!deviceMastershipManager.isDeviceMastered(nodeId)) { LOG.debug("Skip syncup since not master for: {}", nodeId.getValue()); diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureDecorator.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureDecorator.java index 585abaa93d..51ef17dba0 100644 --- a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureDecorator.java +++ b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureDecorator.java @@ -36,7 +36,7 @@ public class SyncReactorFutureDecorator implements SyncReactor { } public ListenableFuture syncup(final InstanceIdentifier flowcapableNodePath, - final SyncupEntry syncupEntry) throws InterruptedException { + final SyncupEntry syncupEntry) { final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath); return executorService.submit(() -> { try { @@ -49,7 +49,7 @@ public class SyncReactorFutureDecorator implements SyncReactor { } protected ListenableFuture doSyncupInFuture(final InstanceIdentifier flowcapableNodePath, - final SyncupEntry syncupEntry) throws InterruptedException { + final SyncupEntry syncupEntry) { return delegate.syncup(flowcapableNodePath, syncupEntry); } } diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureZipDecorator.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureZipDecorator.java index f6301dc4f1..a215927921 100644 --- a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureZipDecorator.java +++ b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureZipDecorator.java @@ -13,8 +13,9 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.concurrent.Semaphore; -import javax.annotation.concurrent.GuardedBy; +import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper; import org.opendaylight.openflowplugin.applications.frsync.SyncReactor; import org.opendaylight.openflowplugin.applications.frsync.util.SyncupEntry; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; @@ -25,31 +26,38 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; */ public class SyncReactorFutureZipDecorator extends SyncReactorFutureDecorator { - @GuardedBy("compressionGuard") private final Map, SyncupEntry> compressionQueue = new HashMap<>(); - private final Semaphore compressionGuard = new Semaphore(1, true); + private final SemaphoreKeeper> semaphoreKeeper; - public SyncReactorFutureZipDecorator(final SyncReactor delegate, final ListeningExecutorService executorService) { + public SyncReactorFutureZipDecorator(final SyncReactor delegate, + final ListeningExecutorService executorService, + final SemaphoreKeeper> semaphoreKeeper) { super(delegate, executorService); + this.semaphoreKeeper = semaphoreKeeper; } public ListenableFuture syncup(final InstanceIdentifier flowcapableNodePath, - final SyncupEntry syncupEntry) throws InterruptedException { + final SyncupEntry syncupEntry) { + Semaphore guard = null; try { - compressionGuard.acquire(); + guard = semaphoreKeeper.summonGuardAndAcquire(flowcapableNodePath); + if (Objects.isNull(guard)) { + return Futures.immediateFuture(Boolean.FALSE); + } final boolean newTaskNecessary = updateCompressionState(flowcapableNodePath, syncupEntry); if (newTaskNecessary) { super.syncup(flowcapableNodePath, syncupEntry); } return Futures.immediateFuture(Boolean.TRUE); } finally { - compressionGuard.release(); + semaphoreKeeper.releaseGuard(guard); } } protected ListenableFuture doSyncupInFuture(final InstanceIdentifier flowcapableNodePath, - final SyncupEntry syncupEntry) throws InterruptedException { + final SyncupEntry syncupEntry) { final SyncupEntry lastCompressionState = removeLastCompressionState(flowcapableNodePath); + if (lastCompressionState == null) { return Futures.immediateFuture(Boolean.TRUE); } else { @@ -65,6 +73,7 @@ public class SyncReactorFutureZipDecorator extends SyncReactorFutureDecorator { private boolean updateCompressionState(final InstanceIdentifier flowcapableNodePath, final SyncupEntry syncupEntry) { final SyncupEntry previousEntry = compressionQueue.get(flowcapableNodePath); + if (previousEntry != null && syncupEntry.isOptimizedConfigDelta()) { updateOptimizedConfigDelta(flowcapableNodePath, syncupEntry, previousEntry); } else { @@ -82,15 +91,15 @@ public class SyncReactorFutureZipDecorator extends SyncReactorFutureDecorator { } private SyncupEntry removeLastCompressionState(final InstanceIdentifier flowcapableNodePath) { + Semaphore guard = null; try { - try { - compressionGuard.acquire(); - } catch (InterruptedException e) { + guard = semaphoreKeeper.summonGuardAndAcquire(flowcapableNodePath); + if (Objects.isNull(guard)) { return null; } return compressionQueue.remove(flowcapableNodePath); } finally { - compressionGuard.release(); + semaphoreKeeper.releaseGuard(guard); } } } \ No newline at end of file diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorGuardDecorator.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorGuardDecorator.java index 26c8cf5e1b..a6d1bef72d 100644 --- a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorGuardDecorator.java +++ b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorGuardDecorator.java @@ -8,10 +8,10 @@ package org.opendaylight.openflowplugin.applications.frsync.impl; -import com.google.common.base.Preconditions; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import java.util.Objects; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; @@ -41,29 +41,24 @@ public class SyncReactorGuardDecorator implements SyncReactor { } public ListenableFuture syncup(final InstanceIdentifier flowcapableNodePath, - final SyncupEntry syncupEntry) throws InterruptedException { + final SyncupEntry syncupEntry) { final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath); final long stampBeforeGuard = System.nanoTime(); - final Semaphore guard = summonGuardAndAcquire(flowcapableNodePath); - if (guard == null) { + final Semaphore guard = semaphoreKeeper.summonGuardAndAcquire(flowcapableNodePath); + if (Objects.isNull(guard)) { return Futures.immediateFuture(Boolean.FALSE); } final long stampAfterGuard = System.nanoTime(); - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Syncup guard acquired and running for {} ", nodeId.getValue()); - } - final ListenableFuture endResult = delegate.syncup(flowcapableNodePath, syncupEntry); - Futures.addCallback(endResult, createSyncupCallback(guard, stampBeforeGuard, stampAfterGuard, nodeId)); - return endResult; - } catch (InterruptedException e) { - releaseGuard(guard); - throw e; + if (LOG.isDebugEnabled()) { + LOG.debug("Syncup guard acquired and running for {} ", nodeId.getValue()); } + final ListenableFuture endResult = delegate.syncup(flowcapableNodePath, syncupEntry); + Futures.addCallback(endResult, createSyncupCallback(guard, stampBeforeGuard, stampAfterGuard, nodeId)); + return endResult; } - private static FutureCallback createSyncupCallback(final Semaphore guard, + private FutureCallback createSyncupCallback(final Semaphore guard, final long stampBeforeGuard, final long stampAfterGuard, final NodeId nodeId) { @@ -76,7 +71,7 @@ public class SyncReactorGuardDecorator implements SyncReactor { formatNanos(stampFinished - stampBeforeGuard), formatNanos(stampFinished - stampAfterGuard), formatNanos(stampAfterGuard - stampBeforeGuard)); } - releaseGuard(guard); + semaphoreKeeper.releaseGuard(guard); } @Override public void onFailure(final Throwable t) { @@ -84,45 +79,11 @@ public class SyncReactorGuardDecorator implements SyncReactor { LOG.warn("Syncup failed {} took:{} rpc:{} wait:{}", nodeId.getValue(), formatNanos(stampFinished - stampBeforeGuard), formatNanos(stampFinished - stampAfterGuard), formatNanos(stampAfterGuard - stampBeforeGuard)); - releaseGuard(guard); + semaphoreKeeper.releaseGuard(guard); }}; } private static String formatNanos(final long nanos) { return "'" + TimeUnit.NANOSECONDS.toMillis(nanos) + " ms'"; } - - /** - * Get guard and lock for node. - * @param flowcapableNodePath II of node for which guard should be acquired - * @return semaphore guard - */ - private Semaphore summonGuardAndAcquire(final InstanceIdentifier flowcapableNodePath) { - final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath); - final Semaphore guard = Preconditions.checkNotNull(semaphoreKeeper.summonGuard(flowcapableNodePath), - "No guard for " + nodeId.getValue()); - try { - guard.acquire(); - } catch (InterruptedException e) { - LOG.warn("Syncup summon {} failed {}", nodeId.getValue(), e); - return null; - } - if (LOG.isTraceEnabled()) { - LOG.trace("Syncup summon {} guard:{}", nodeId.getValue(), guard); - } - return guard; - } - - /** - * Unlock and release guard. - * @param guard semaphore guard which should be unlocked - */ - private static void releaseGuard(final Semaphore guard) { - if (guard != null) { - guard.release(); - if (LOG.isTraceEnabled()) { - LOG.trace("Syncup release guard:{} thread:{}", guard); - } - } - } } diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorImpl.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorImpl.java index 6f64b8d4b1..272e97dcce 100644 --- a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorImpl.java +++ b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorImpl.java @@ -100,7 +100,7 @@ public class SyncReactorImpl implements SyncReactor { final CrudCounts flowCrudCounts = counters.getFlowCrudCounts(); final CrudCounts meterCrudCounts = counters.getMeterCrudCounts(); final CrudCounts groupCrudCounts = counters.getGroupCrudCounts(); - LOG.debug("syncup outcome[{}] (added/updated/removed): flow={}/{}/{}, group={}/{}/{}, " + + LOG.debug("Syncup outcome[{}] (added/updated/removed): flow={}/{}/{}, group={}/{}/{}, " + "meter={}/{}/{}, errors={}", nodeId.getValue(), flowCrudCounts.getAdded(), flowCrudCounts.getUpdated(), flowCrudCounts.getRemoved(), diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorRetryDecorator.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorRetryDecorator.java index e8c9b3ee71..bf7e896821 100644 --- a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorRetryDecorator.java +++ b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorRetryDecorator.java @@ -36,7 +36,7 @@ public class SyncReactorRetryDecorator implements SyncReactor { } public ListenableFuture syncup(final InstanceIdentifier flowcapableNodePath, - final SyncupEntry syncupEntry) throws InterruptedException { + final SyncupEntry syncupEntry) { final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath); if (syncupEntry.isOptimizedConfigDelta() && reconciliationRegistry.isRegistered(nodeId)) { diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/SemaphoreKeeperGuavaImpl.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/SemaphoreKeeperGuavaImpl.java index 240905bdf2..b506ddf17a 100644 --- a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/SemaphoreKeeperGuavaImpl.java +++ b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/SemaphoreKeeperGuavaImpl.java @@ -8,18 +8,24 @@ package org.opendaylight.openflowplugin.applications.frsync.util; +import com.google.common.base.Preconditions; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import java.util.Objects; import java.util.concurrent.Semaphore; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Key-based semaphore provider. */ public class SemaphoreKeeperGuavaImpl implements SemaphoreKeeper { + private static final Logger LOG = LoggerFactory.getLogger(SemaphoreKeeperGuavaImpl.class); private final LoadingCache semaphoreCache; public SemaphoreKeeperGuavaImpl(final int permits, final boolean fair) { @@ -37,12 +43,26 @@ public class SemaphoreKeeperGuavaImpl implements SemaphoreKeeper { } @Override - public Semaphore summonGuard(final @Nonnull K key) { + public Semaphore summonGuard(@Nonnull final K key) { return semaphoreCache.getUnchecked(key); } @Override - public String toString() { - return super.toString() + " size:" + (semaphoreCache == null ? null : semaphoreCache.size()) + " " + semaphoreCache; + public Semaphore summonGuardAndAcquire(@Nonnull final K key) { + final Semaphore guard = Preconditions.checkNotNull(summonGuard(key), "Guard not available for " + key); + try { + guard.acquire(); + } catch (InterruptedException e) { + LOG.warn("Could not acquire guard for {}, {}", key, e); + return null; + } + return guard; + } + + @Override + public void releaseGuard(@Nullable final Semaphore guard) { + if (Objects.nonNull(guard)) { + guard.release(); + } } } diff --git a/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/ForwardingRulesSyncProviderTest.java b/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/ForwardingRulesSyncProviderTest.java index a9a649462a..8ae21e2e19 100644 --- a/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/ForwardingRulesSyncProviderTest.java +++ b/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/ForwardingRulesSyncProviderTest.java @@ -74,7 +74,7 @@ public class ForwardingRulesSyncProviderTest { } @After - public void tearDown() throws InterruptedException { + public void tearDown() { provider.close(); } diff --git a/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SimplifiedConfigListenerTest.java b/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SimplifiedConfigListenerTest.java index 888f317761..fa79364664 100644 --- a/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SimplifiedConfigListenerTest.java +++ b/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SimplifiedConfigListenerTest.java @@ -89,7 +89,7 @@ public class SimplifiedConfigListenerTest { } @Test - public void testOnDataTreeChangedAdd() throws InterruptedException { + public void testOnDataTreeChangedAdd() { Mockito.when(configModification.getDataBefore()).thenReturn(null); Mockito.when(configModification.getDataAfter()).thenReturn(dataAfter); final SyncupEntry syncupEntry = loadOperationalDSAndPrepareSyncupEntry(dataAfter, confgDS, dataBefore, operationalDS); @@ -102,7 +102,7 @@ public class SimplifiedConfigListenerTest { } @Test - public void testOnDataTreeChangedUpdate() throws InterruptedException { + public void testOnDataTreeChangedUpdate() { Mockito.when(configModification.getDataBefore()).thenReturn(dataBefore); Mockito.when(configModification.getDataAfter()).thenReturn(dataAfter); final SyncupEntry syncupEntry = loadOperationalDSAndPrepareSyncupEntry(dataAfter, confgDS, dataBefore, confgDS); @@ -115,7 +115,7 @@ public class SimplifiedConfigListenerTest { } @Test - public void testOnDataTreeChangedDelete() throws InterruptedException { + public void testOnDataTreeChangedDelete() { Mockito.when(configModification.getDataBefore()).thenReturn(dataBefore); Mockito.when(configModification.getDataAfter()).thenReturn(null); final SyncupEntry syncupEntry = loadOperationalDSAndPrepareSyncupEntry(null, confgDS, dataBefore, confgDS); @@ -139,7 +139,7 @@ public class SimplifiedConfigListenerTest { } private SyncupEntry loadOperationalDSAndPrepareSyncupEntry(final FlowCapableNode after, final LogicalDatastoreType dsTypeAfter, - final FlowCapableNode before, final LogicalDatastoreType dsTypeBefore) throws InterruptedException { + final FlowCapableNode before, final LogicalDatastoreType dsTypeBefore) { Mockito.when(roTx.read(LogicalDatastoreType.OPERATIONAL, fcNodePath)) .thenReturn(Futures.immediateCheckedFuture(Optional.of(dataBefore))); final SyncupEntry syncupEntry = new SyncupEntry(after, dsTypeAfter, before, dsTypeBefore); diff --git a/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SimplifiedOperationalListenerTest.java b/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SimplifiedOperationalListenerTest.java index 0f6a856ad0..15790d9786 100644 --- a/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SimplifiedOperationalListenerTest.java +++ b/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SimplifiedOperationalListenerTest.java @@ -259,7 +259,7 @@ public class SimplifiedOperationalListenerTest { } private SyncupEntry loadConfigDSAndPrepareSyncupEntry(final FlowCapableNode after, final LogicalDatastoreType dsTypeAfter, - final FlowCapableNode before, final LogicalDatastoreType dsTypeBefore) throws InterruptedException { + final FlowCapableNode before, final LogicalDatastoreType dsTypeBefore) { Mockito.when(roTx.read(LogicalDatastoreType.CONFIGURATION, fcNodePath)) .thenReturn(Futures.immediateCheckedFuture(Optional.of(configNode))); final SyncupEntry syncupEntry = new SyncupEntry(after, dsTypeAfter, before, dsTypeBefore); diff --git a/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorClusterDecoratorTest.java b/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorClusterDecoratorTest.java index 3330dd44f9..8b95fc009e 100644 --- a/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorClusterDecoratorTest.java +++ b/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorClusterDecoratorTest.java @@ -50,7 +50,7 @@ public class SyncReactorClusterDecoratorTest { } @Test - public void testSyncupMaster() throws InterruptedException { + public void testSyncupMaster() { Mockito.when(deviceMastershipManager.isDeviceMastered(NODE_ID)).thenReturn(true); reactor.syncup(fcNodePath, syncupEntry); @@ -60,7 +60,7 @@ public class SyncReactorClusterDecoratorTest { } @Test - public void testSyncupSlave() throws InterruptedException { + public void testSyncupSlave() { Mockito.when(deviceMastershipManager.isDeviceMastered(NODE_ID)).thenReturn(false); reactor.syncup(fcNodePath, syncupEntry); diff --git a/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureZipDecoratorTest.java b/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureZipDecoratorTest.java index b300744294..5eca5c54be 100644 --- a/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureZipDecoratorTest.java +++ b/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureZipDecoratorTest.java @@ -31,7 +31,9 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.runners.MockitoJUnitRunner; import org.mockito.stubbing.Answer; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper; import org.opendaylight.openflowplugin.applications.frsync.SyncReactor; +import org.opendaylight.openflowplugin.applications.frsync.util.SemaphoreKeeperGuavaImpl; import org.opendaylight.openflowplugin.applications.frsync.util.SyncupEntry; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; @@ -63,13 +65,14 @@ public class SyncReactorFutureZipDecoratorTest { @Before public void setUp() { + final SemaphoreKeeper> semaphoreKeeper = new SemaphoreKeeperGuavaImpl<>(1, true); final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() .setDaemon(false) .setNameFormat("frsync-test-%d") .setUncaughtExceptionHandler((thread, e) -> LOG.error("Uncaught exception {}", thread, e)) .build()); syncThreadPool = MoreExecutors.listeningDecorator(executorService); - reactor = new SyncReactorFutureZipDecorator(delegate, syncThreadPool); + reactor = new SyncReactorFutureZipDecorator(delegate, syncThreadPool, semaphoreKeeper); fcNodePath = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(NODE_ID)) .augmentation(FlowCapableNode.class); } @@ -202,8 +205,8 @@ public class SyncReactorFutureZipDecoratorTest { Mockito.verify(delegate, Mockito.times(1)).syncup(fcNodePath, second); } - private void mockSyncupWithEntry(final SyncupEntry entry) throws InterruptedException { - Mockito.when(delegate.syncup(Matchers.>any(), Mockito.eq(entry))) + private void mockSyncupWithEntry(final SyncupEntry entry) { + Mockito.when(delegate.syncup(Matchers.any(), Mockito.eq(entry))) .thenReturn(Futures.immediateFuture(Boolean.TRUE)); } diff --git a/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorGuardDecoratorTest.java b/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorGuardDecoratorTest.java index 5b43181d65..827ccf9b30 100644 --- a/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorGuardDecoratorTest.java +++ b/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorGuardDecoratorTest.java @@ -16,6 +16,7 @@ import org.mockito.Matchers; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.runners.MockitoJUnitRunner; +import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper; import org.opendaylight.openflowplugin.applications.frsync.SyncReactor; import org.opendaylight.openflowplugin.applications.frsync.util.SemaphoreKeeperGuavaImpl; import org.opendaylight.openflowplugin.applications.frsync.util.SyncupEntry; @@ -47,7 +48,7 @@ public class SyncReactorGuardDecoratorTest { @Before public void setUp() throws Exception { - final SemaphoreKeeperGuavaImpl semaphoreKeeper = new SemaphoreKeeperGuavaImpl>(1, true); + final SemaphoreKeeper> semaphoreKeeper = new SemaphoreKeeperGuavaImpl<>(1, true); reactor = new SyncReactorGuardDecorator(delegate, semaphoreKeeper); InstanceIdentifier nodePath = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(NODE_ID)); fcNodePath = nodePath.augmentation(FlowCapableNode.class); @@ -58,8 +59,8 @@ public class SyncReactorGuardDecoratorTest { } @Test - public void testSyncupSuccess() throws Exception { - Mockito.when(delegate.syncup(Matchers.>any(), Matchers.any())) + public void testSyncupSuccess() { + Mockito.when(delegate.syncup(Matchers.any(), Matchers.any())) .thenReturn(Futures.immediateFuture(Boolean.TRUE)); reactor.syncup(fcNodePath, syncupEntry); @@ -69,15 +70,13 @@ public class SyncReactorGuardDecoratorTest { } @Test - public void testSyncupFail() throws Exception { - Mockito.when(delegate.syncup(Matchers.>any(), Matchers.any())) + public void testSyncupFail() { + Mockito.when(delegate.syncup(Matchers.any(), Matchers.any())) .thenReturn(Futures.immediateFailedFuture(new Exception())); reactor.syncup(fcNodePath, syncupEntry); Mockito.verify(delegate).syncup(fcNodePath, syncupEntry); Mockito.verifyNoMoreInteractions(delegate); - } - } \ No newline at end of file diff --git a/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorRetryDecoratorTest.java b/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorRetryDecoratorTest.java index 8151e620df..87f686c78a 100644 --- a/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorRetryDecoratorTest.java +++ b/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorRetryDecoratorTest.java @@ -51,7 +51,7 @@ public class SyncReactorRetryDecoratorTest { } @Test - public void testSyncupSuccess() throws InterruptedException { + public void testSyncupSuccess() { Mockito.when(delegate.syncup(Matchers.>any(), Matchers.any())) .thenReturn(Futures.immediateFuture(Boolean.TRUE)); @@ -63,7 +63,7 @@ public class SyncReactorRetryDecoratorTest { } @Test - public void testSyncupFail() throws InterruptedException { + public void testSyncupFail() { Mockito.when(delegate.syncup(Matchers.>any(), Matchers.any())) .thenReturn(Futures.immediateFuture(Boolean.FALSE)); @@ -75,7 +75,7 @@ public class SyncReactorRetryDecoratorTest { } @Test - public void testSyncupConfigIgnoreInRetry() throws InterruptedException { + public void testSyncupConfigIgnoreInRetry() { Mockito.when(reconciliationRegistry.isRegistered(NODE_ID)).thenReturn(true); Mockito.when(syncupEntry.isOptimizedConfigDelta()).thenReturn(true); diff --git a/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/util/SemaphoreKeeperTest.java b/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/util/SemaphoreKeeperGuavaImplTest.java similarity index 97% rename from applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/util/SemaphoreKeeperTest.java rename to applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/util/SemaphoreKeeperGuavaImplTest.java index 510dce07d1..efcaa3de41 100644 --- a/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/util/SemaphoreKeeperTest.java +++ b/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/util/SemaphoreKeeperGuavaImplTest.java @@ -25,14 +25,14 @@ import org.slf4j.LoggerFactory; /** * Test for {@link SemaphoreKeeperGuavaImpl}. */ -public class SemaphoreKeeperTest { - private static final Logger LOG = LoggerFactory.getLogger(SemaphoreKeeperTest.class); +public class SemaphoreKeeperGuavaImplTest { + private static final Logger LOG = LoggerFactory.getLogger(SemaphoreKeeperGuavaImplTest.class); private SemaphoreKeeperGuavaImpl semaphoreKeeper; private final String key = "11"; @Before public void setUp() throws Exception { - semaphoreKeeper = new SemaphoreKeeperGuavaImpl(1, true); + semaphoreKeeper = new SemaphoreKeeperGuavaImpl<>(1, true); } @Test