Bug 6745 Improve compression queue locking and handle InterruptedException 25/46225/1
authorAndrej Leitner <andrej.leitner@pantheon.tech>
Tue, 27 Sep 2016 12:13:29 +0000 (14:13 +0200)
committerAndrej Leitner <andrej.leitner@pantheon.sk>
Tue, 27 Sep 2016 12:37:18 +0000 (12:37 +0000)
 - compression queue could be locked per device and not for every change
   since there is only one entry per device in it
 - IE handled in one place

Change-Id: Ic28b448d619c663757f391e7a443b28c8f6a871d
Signed-off-by: Andrej Leitner <andrej.leitner@pantheon.tech>
21 files changed:
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/SemaphoreKeeper.java
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/SyncReactor.java
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/AbstractFrmSyncListener.java
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/ForwardingRulesSyncProvider.java
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SimplifiedConfigListener.java
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SimplifiedOperationalListener.java
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorClusterDecorator.java
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureDecorator.java
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureZipDecorator.java
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorGuardDecorator.java
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorImpl.java
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorRetryDecorator.java
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/SemaphoreKeeperGuavaImpl.java
applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/ForwardingRulesSyncProviderTest.java
applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SimplifiedConfigListenerTest.java
applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SimplifiedOperationalListenerTest.java
applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorClusterDecoratorTest.java
applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureZipDecoratorTest.java
applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorGuardDecoratorTest.java
applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorRetryDecoratorTest.java
applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/util/SemaphoreKeeperGuavaImplTest.java [moved from applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/util/SemaphoreKeeperTest.java with 97% similarity]

index 8185bb0b29a0b8aedcec952534eede9a21e2c624..4964f14919fa51c006d96dc3ff4308550a34557c 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.openflowplugin.applications.frsync;
 
 import java.util.concurrent.Semaphore;
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 /**
  * Proposal for how a key based semaphore provider should look like.
@@ -36,5 +37,18 @@ public interface SemaphoreKeeper<K> {
      * @param key semaphore identifier
      * @return new or existing semaphore for given key, for one key there is always only one semaphore available
      */
-    Semaphore summonGuard(@Nonnull K key);
+    Semaphore summonGuard(@Nonnull final K key);
+
+    /**
+     * Get guard and lock for key.
+     * @param key for which guard should be created and acquired
+     * @return semaphore guard
+     */
+    Semaphore summonGuardAndAcquire(@Nonnull final K key);
+
+    /**
+     * Unlock and release guard.
+     * @param guard semaphore guard which should be released
+     */
+    void releaseGuard(@Nullable final Semaphore guard);
 }
index d38d7844323b0e370ab6e3186192b35a00e2b769..c7435f322cae0cd15bfe373938af55c325640ad6 100644 (file)
@@ -21,8 +21,7 @@ public interface SyncReactor {
      * @param flowcapableNodePath path to openflow augmentation of node
      * @param syncupEntry configured node + device reflection
      * @return synchronization outcome
-     * @throws InterruptedException
      */
     ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
-                                     final SyncupEntry syncupEntry) throws InterruptedException;
+                                     final SyncupEntry syncupEntry);
 }
index 9ad1542e1c135681892c244cab5f358a034f2996..835823fba7fe5692a345cfd47076e7591c20a91f 100644 (file)
@@ -45,8 +45,6 @@ public abstract class AbstractFrmSyncListener<T extends DataObject> implements N
                         LOG.trace("Syncup for {} return from {} listener", nodeId.getValue(), dsType());
                     }
                 }
-            } catch (InterruptedException e) {
-                LOG.warn("Permit for forwarding rules sync not acquired: {}", nodeId.getValue());
             } catch (Exception e) {
                 LOG.error("Error processing inventory node modification: {}, {}", nodeId.getValue(), e);
             }
@@ -54,7 +52,7 @@ public abstract class AbstractFrmSyncListener<T extends DataObject> implements N
     }
 
     protected abstract Optional<ListenableFuture<Boolean>> processNodeModification(
-            final DataTreeModification<T> modification) throws InterruptedException;
+            final DataTreeModification<T> modification);
 
     protected abstract LogicalDatastoreType dsType();
 
index 642bd684e329e3aee677ce622ae4bb0b34c20b81..cdbec91a79d4e9c2e7d78d5a7bf974e0999f25db 100644 (file)
@@ -114,7 +114,8 @@ public class ForwardingRulesSyncProvider implements AutoCloseable, BindingAwareP
         final SyncReactor syncReactorRetry = new SyncReactorRetryDecorator(syncReactorImpl, reconciliationRegistry);
         final SyncReactor syncReactorGuard = new SyncReactorGuardDecorator(syncReactorRetry,
                 new SemaphoreKeeperGuavaImpl<>(1, true));
-        final SyncReactor syncReactorFutureZip = new SyncReactorFutureZipDecorator(syncReactorGuard, syncThreadPool);
+        final SyncReactor syncReactorFutureZip = new SyncReactorFutureZipDecorator(syncReactorGuard, syncThreadPool,
+                new SemaphoreKeeperGuavaImpl<>(1, true));
 
         final SyncReactor reactor = new SyncReactorClusterDecorator(syncReactorFutureZip, deviceMastershipManager);
 
@@ -138,7 +139,7 @@ public class ForwardingRulesSyncProvider implements AutoCloseable, BindingAwareP
         LOG.info("ForwardingRulesSync has started.");
     }
 
-    public void close() throws InterruptedException {
+    public void close() {
         if (Objects.nonNull(dataTreeConfigChangeListener)) {
             dataTreeConfigChangeListener.close();
             dataTreeConfigChangeListener = null;
index e4964b99e0f75fb0e75acfeccb95e632625ace80..d9f9a361e363981e7f8a9e02763403b0570ca263 100644 (file)
@@ -52,10 +52,9 @@ public class SimplifiedConfigListener extends AbstractFrmSyncListener<FlowCapabl
     /**
      * Update cache. If operational data are present, choose appropriate data and start syncup.
      * Otherwise skip incoming change.
-     * @throws InterruptedException from syncup
      */
     protected Optional<ListenableFuture<Boolean>> processNodeModification(
-            final DataTreeModification<FlowCapableNode> modification) throws InterruptedException {
+            final DataTreeModification<FlowCapableNode> modification) {
         final InstanceIdentifier<FlowCapableNode> nodePath = modification.getRootPath().getRootIdentifier();
         final NodeId nodeId = PathUtil.digNodeId(nodePath);
 
@@ -88,7 +87,7 @@ public class SimplifiedConfigListener extends AbstractFrmSyncListener<FlowCapabl
      */
     private ListenableFuture<Boolean> onNodeAdded(final InstanceIdentifier<FlowCapableNode> nodePath,
                                                   final FlowCapableNode dataAfter,
-                                                  final FlowCapableNode operationalNode) throws InterruptedException {
+                                                  final FlowCapableNode operationalNode) {
         LOG.debug("Reconciliation {}: {}", dsType(), PathUtil.digNodeId(nodePath).getValue());
         final SyncupEntry syncupEntry = new SyncupEntry(dataAfter, dsType(), operationalNode, LogicalDatastoreType.OPERATIONAL);
         return reactor.syncup(nodePath, syncupEntry);
@@ -103,7 +102,7 @@ public class SimplifiedConfigListener extends AbstractFrmSyncListener<FlowCapabl
      */
     private ListenableFuture<Boolean> onNodeUpdated(final InstanceIdentifier<FlowCapableNode> nodePath,
                                                     final FlowCapableNode dataBefore,
-                                                    final FlowCapableNode dataAfter) throws InterruptedException {
+                                                    final FlowCapableNode dataAfter) {
         final SyncupEntry syncupEntry = new SyncupEntry(dataAfter, dsType(), dataBefore, dsType());
         return reactor.syncup(nodePath, syncupEntry);
     }
@@ -113,7 +112,7 @@ public class SimplifiedConfigListener extends AbstractFrmSyncListener<FlowCapabl
      * Note, this could be probably optimized using dedicated wipe-out RPC.
      */
     private ListenableFuture<Boolean> onNodeDeleted(final InstanceIdentifier<FlowCapableNode> nodePath,
-                                                    final FlowCapableNode dataBefore) throws InterruptedException {
+                                                    final FlowCapableNode dataBefore) {
         final SyncupEntry syncupEntry = new SyncupEntry(null, dsType(), dataBefore, dsType());
         return reactor.syncup(nodePath, syncupEntry);
     }
index 326b1debea1f67f245d70f680f4504a8fdaa5120..3e9014de104ff0595913164eb837f2476bfa0ed7 100644 (file)
@@ -73,10 +73,9 @@ public class SimplifiedOperationalListener extends AbstractFrmSyncListener<Node>
     /**
      * Update cache, register for device mastership when device connected and start reconciliation if device
      * is registered and actual modification is consistent.Skip the event otherwise.
-     * @throws InterruptedException from syncup
      */
     protected Optional<ListenableFuture<Boolean>> processNodeModification(
-            final DataTreeModification<Node> modification) throws InterruptedException {
+            final DataTreeModification<Node> modification) {
         final NodeId nodeId = ModificationUtil.nodeId(modification);
         updateCache(modification);
 
@@ -150,10 +149,8 @@ public class SimplifiedOperationalListener extends AbstractFrmSyncListener<Node>
      * configuration (coming from operational) should be calculated and sent to device.
      * @param modification from DS
      * @return optional syncup future
-     * @throws InterruptedException from syncup
      */
-    private Optional<ListenableFuture<Boolean>> reconciliation(final DataTreeModification<Node> modification)
-            throws InterruptedException {
+    private Optional<ListenableFuture<Boolean>> reconciliation(final DataTreeModification<Node> modification) {
         final NodeId nodeId = ModificationUtil.nodeId(modification);
         final Optional<FlowCapableNode> nodeConfiguration = configDao.loadByNodeId(nodeId);
 
index 37cbb547f8ccfb2eda3c5fb3d0392c8a9d5e8048..c9de9b25ab92ca63bbb85e3b3fab530ff3b96892 100644 (file)
@@ -37,7 +37,7 @@ public class SyncReactorClusterDecorator implements SyncReactor {
 
     @Override
     public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
-                                            final SyncupEntry syncupEntry) throws InterruptedException {
+                                            final SyncupEntry syncupEntry) {
         final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
         if (!deviceMastershipManager.isDeviceMastered(nodeId)) {
             LOG.debug("Skip syncup since not master for: {}", nodeId.getValue());
index 585abaa93dbc50ce17f637eace2e61e801e1c117..51ef17dba0234cea08adf16292d7d7e90254ae9f 100644 (file)
@@ -36,7 +36,7 @@ public class SyncReactorFutureDecorator implements SyncReactor {
     }
 
     public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
-                                            final SyncupEntry syncupEntry) throws InterruptedException {
+                                            final SyncupEntry syncupEntry) {
         final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
         return executorService.submit(() -> {
             try {
@@ -49,7 +49,7 @@ public class SyncReactorFutureDecorator implements SyncReactor {
     }
 
     protected ListenableFuture<Boolean> doSyncupInFuture(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
-                                                         final SyncupEntry syncupEntry) throws InterruptedException {
+                                                         final SyncupEntry syncupEntry) {
         return delegate.syncup(flowcapableNodePath, syncupEntry);
     }
 }
index f6301dc4f13bb23679843727597b5858a7878d79..a21592792130187ac8be06bbaca42f0e8c0eab93 100644 (file)
@@ -13,8 +13,9 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.Semaphore;
-import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper;
 import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
 import org.opendaylight.openflowplugin.applications.frsync.util.SyncupEntry;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
@@ -25,31 +26,38 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
  */
 public class SyncReactorFutureZipDecorator extends SyncReactorFutureDecorator {
 
-    @GuardedBy("compressionGuard")
     private final Map<InstanceIdentifier<FlowCapableNode>, SyncupEntry> compressionQueue = new HashMap<>();
-    private final Semaphore compressionGuard = new Semaphore(1, true);
+    private final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper;
 
-    public SyncReactorFutureZipDecorator(final SyncReactor delegate, final ListeningExecutorService executorService) {
+    public SyncReactorFutureZipDecorator(final SyncReactor delegate,
+                                         final ListeningExecutorService executorService,
+                                         final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper) {
         super(delegate, executorService);
+        this.semaphoreKeeper = semaphoreKeeper;
     }
 
     public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
-                                            final SyncupEntry syncupEntry) throws InterruptedException {
+                                            final SyncupEntry syncupEntry) {
+        Semaphore guard = null;
         try {
-            compressionGuard.acquire();
+            guard = semaphoreKeeper.summonGuardAndAcquire(flowcapableNodePath);
+            if (Objects.isNull(guard)) {
+                return Futures.immediateFuture(Boolean.FALSE);
+            }
             final boolean newTaskNecessary = updateCompressionState(flowcapableNodePath, syncupEntry);
             if (newTaskNecessary) {
                 super.syncup(flowcapableNodePath, syncupEntry);
             }
             return Futures.immediateFuture(Boolean.TRUE);
         } finally {
-            compressionGuard.release();
+            semaphoreKeeper.releaseGuard(guard);
         }
     }
 
     protected ListenableFuture<Boolean> doSyncupInFuture(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
-                                                         final SyncupEntry syncupEntry) throws InterruptedException {
+                                                         final SyncupEntry syncupEntry) {
         final SyncupEntry lastCompressionState = removeLastCompressionState(flowcapableNodePath);
+
         if (lastCompressionState == null) {
             return Futures.immediateFuture(Boolean.TRUE);
         } else {
@@ -65,6 +73,7 @@ public class SyncReactorFutureZipDecorator extends SyncReactorFutureDecorator {
     private boolean updateCompressionState(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
                                            final SyncupEntry syncupEntry) {
         final SyncupEntry previousEntry = compressionQueue.get(flowcapableNodePath);
+
         if (previousEntry != null && syncupEntry.isOptimizedConfigDelta()) {
             updateOptimizedConfigDelta(flowcapableNodePath, syncupEntry, previousEntry);
         } else {
@@ -82,15 +91,15 @@ public class SyncReactorFutureZipDecorator extends SyncReactorFutureDecorator {
     }
 
     private SyncupEntry removeLastCompressionState(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath) {
+        Semaphore guard = null;
         try {
-            try {
-                compressionGuard.acquire();
-            } catch (InterruptedException e) {
+            guard = semaphoreKeeper.summonGuardAndAcquire(flowcapableNodePath);
+            if (Objects.isNull(guard)) {
                 return null;
             }
             return compressionQueue.remove(flowcapableNodePath);
         } finally {
-            compressionGuard.release();
+            semaphoreKeeper.releaseGuard(guard);
         }
     }
 }
\ No newline at end of file
index 26c8cf5e1bbe3a3245c3cf45cd54e49fa381c059..a6d1bef72da98b563e777875bb4159e66d6d43a7 100644 (file)
@@ -8,10 +8,10 @@
 
 package org.opendaylight.openflowplugin.applications.frsync.impl;
 
-import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import java.util.Objects;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
@@ -41,29 +41,24 @@ public class SyncReactorGuardDecorator implements SyncReactor {
     }
 
     public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
-                                            final SyncupEntry syncupEntry) throws InterruptedException {
+                                            final SyncupEntry syncupEntry) {
         final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
         final long stampBeforeGuard = System.nanoTime();
-        final Semaphore guard = summonGuardAndAcquire(flowcapableNodePath);
-        if (guard == null) {
+        final Semaphore guard = semaphoreKeeper.summonGuardAndAcquire(flowcapableNodePath);
+        if (Objects.isNull(guard)) {
             return Futures.immediateFuture(Boolean.FALSE);
         }
         final long stampAfterGuard = System.nanoTime();
 
-        try {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Syncup guard acquired and running for {} ", nodeId.getValue());
-            }
-            final ListenableFuture<Boolean> endResult = delegate.syncup(flowcapableNodePath, syncupEntry);
-            Futures.addCallback(endResult, createSyncupCallback(guard, stampBeforeGuard, stampAfterGuard, nodeId));
-            return endResult;
-        } catch (InterruptedException e) {
-            releaseGuard(guard);
-            throw e;
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Syncup guard acquired and running for {} ", nodeId.getValue());
         }
+        final ListenableFuture<Boolean> endResult = delegate.syncup(flowcapableNodePath, syncupEntry);
+        Futures.addCallback(endResult, createSyncupCallback(guard, stampBeforeGuard, stampAfterGuard, nodeId));
+        return endResult;
     }
 
-    private static FutureCallback<Boolean> createSyncupCallback(final Semaphore guard,
+    private FutureCallback<Boolean> createSyncupCallback(final Semaphore guard,
                                                                 final long stampBeforeGuard,
                                                                 final long stampAfterGuard,
                                                                 final NodeId nodeId) {
@@ -76,7 +71,7 @@ public class SyncReactorGuardDecorator implements SyncReactor {
                             formatNanos(stampFinished - stampBeforeGuard), formatNanos(stampFinished - stampAfterGuard),
                             formatNanos(stampAfterGuard - stampBeforeGuard));
                 }
-                releaseGuard(guard);
+                semaphoreKeeper.releaseGuard(guard);
             }
             @Override
             public void onFailure(final Throwable t) {
@@ -84,45 +79,11 @@ public class SyncReactorGuardDecorator implements SyncReactor {
                 LOG.warn("Syncup failed {} took:{} rpc:{} wait:{}", nodeId.getValue(),
                         formatNanos(stampFinished - stampBeforeGuard), formatNanos(stampFinished - stampAfterGuard),
                         formatNanos(stampAfterGuard - stampBeforeGuard));
-                releaseGuard(guard);
+                semaphoreKeeper.releaseGuard(guard);
             }};
     }
 
     private static String formatNanos(final long nanos) {
         return "'" + TimeUnit.NANOSECONDS.toMillis(nanos) + " ms'";
     }
-
-    /**
-     * Get guard and lock for node.
-     * @param flowcapableNodePath II of node for which guard should be acquired
-     * @return semaphore guard
-     */
-    private Semaphore summonGuardAndAcquire(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath) {
-        final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
-        final Semaphore guard = Preconditions.checkNotNull(semaphoreKeeper.summonGuard(flowcapableNodePath),
-                "No guard for " + nodeId.getValue());
-        try {
-            guard.acquire();
-        } catch (InterruptedException e) {
-            LOG.warn("Syncup summon {} failed {}", nodeId.getValue(), e);
-            return null;
-        }
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("Syncup summon {} guard:{}", nodeId.getValue(), guard);
-        }
-        return guard;
-    }
-
-    /**
-     * Unlock and release guard.
-     * @param guard semaphore guard which should be unlocked
-     */
-    private static void releaseGuard(final Semaphore guard) {
-        if (guard != null) {
-            guard.release();
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Syncup release guard:{} thread:{}", guard);
-            }
-        }
-    }
 }
index 6f64b8d4b1d053d827aff9dc1658fd48ffabb6df..272e97dcceceea7849f8c6902183931dd75da734 100644 (file)
@@ -100,7 +100,7 @@ public class SyncReactorImpl implements SyncReactor {
                     final CrudCounts flowCrudCounts = counters.getFlowCrudCounts();
                     final CrudCounts meterCrudCounts = counters.getMeterCrudCounts();
                     final CrudCounts groupCrudCounts = counters.getGroupCrudCounts();
-                    LOG.debug("syncup outcome[{}] (added/updated/removed): flow={}/{}/{}, group={}/{}/{}, " +
+                    LOG.debug("Syncup outcome[{}] (added/updated/removed): flow={}/{}/{}, group={}/{}/{}, " +
                                     "meter={}/{}/{}, errors={}",
                             nodeId.getValue(),
                             flowCrudCounts.getAdded(), flowCrudCounts.getUpdated(), flowCrudCounts.getRemoved(),
index e8c9b3ee71f98792239907ad587cc3b5ac3135d3..bf7e89682117bb50dc269205c0ca36d244d632ed 100644 (file)
@@ -36,7 +36,7 @@ public class SyncReactorRetryDecorator implements SyncReactor {
     }
 
     public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
-                                            final SyncupEntry syncupEntry) throws InterruptedException {
+                                            final SyncupEntry syncupEntry) {
 
         final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
         if (syncupEntry.isOptimizedConfigDelta() && reconciliationRegistry.isRegistered(nodeId)) {
index 240905bdf28929a5b9083bfd6a5d827487660f67..b506ddf17adb5d3ba8d927a1ff7cc495a7dca7e7 100644 (file)
@@ -8,18 +8,24 @@
 
 package org.opendaylight.openflowplugin.applications.frsync.util;
 
+import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import java.util.Objects;
 import java.util.concurrent.Semaphore;
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Key-based semaphore provider.
  */
 public class SemaphoreKeeperGuavaImpl<K> implements SemaphoreKeeper<K> {
 
+    private static final Logger LOG = LoggerFactory.getLogger(SemaphoreKeeperGuavaImpl.class);
     private final LoadingCache<K, Semaphore> semaphoreCache;
 
     public SemaphoreKeeperGuavaImpl(final int permits, final boolean fair) {
@@ -37,12 +43,26 @@ public class SemaphoreKeeperGuavaImpl<K> implements SemaphoreKeeper<K> {
     }
 
     @Override
-    public Semaphore summonGuard(final @Nonnull K key) {
+    public Semaphore summonGuard(@Nonnull final K key) {
         return semaphoreCache.getUnchecked(key);
     }
 
     @Override
-    public String toString() {
-        return super.toString() + " size:" + (semaphoreCache == null ? null : semaphoreCache.size()) + " " + semaphoreCache;
+    public Semaphore summonGuardAndAcquire(@Nonnull final K key) {
+        final Semaphore guard = Preconditions.checkNotNull(summonGuard(key), "Guard not available for " + key);
+        try {
+            guard.acquire();
+        } catch (InterruptedException e) {
+            LOG.warn("Could not acquire guard for {}, {}", key, e);
+            return null;
+        }
+        return guard;
+    }
+
+    @Override
+    public void releaseGuard(@Nullable final Semaphore guard) {
+        if (Objects.nonNull(guard)) {
+            guard.release();
+        }
     }
 }
index 888f31776109fed27b87a1b78235cf6107b371f9..fa79364664082213324db7e496f0b61bf5fc6e8a 100644 (file)
@@ -89,7 +89,7 @@ public class SimplifiedConfigListenerTest {
     }
 
     @Test
-    public void testOnDataTreeChangedAdd() throws InterruptedException {
+    public void testOnDataTreeChangedAdd() {
         Mockito.when(configModification.getDataBefore()).thenReturn(null);
         Mockito.when(configModification.getDataAfter()).thenReturn(dataAfter);
         final SyncupEntry syncupEntry = loadOperationalDSAndPrepareSyncupEntry(dataAfter, confgDS, dataBefore, operationalDS);
@@ -102,7 +102,7 @@ public class SimplifiedConfigListenerTest {
     }
 
     @Test
-    public void testOnDataTreeChangedUpdate() throws InterruptedException {
+    public void testOnDataTreeChangedUpdate() {
         Mockito.when(configModification.getDataBefore()).thenReturn(dataBefore);
         Mockito.when(configModification.getDataAfter()).thenReturn(dataAfter);
         final SyncupEntry syncupEntry = loadOperationalDSAndPrepareSyncupEntry(dataAfter, confgDS, dataBefore, confgDS);
@@ -115,7 +115,7 @@ public class SimplifiedConfigListenerTest {
     }
 
     @Test
-    public void testOnDataTreeChangedDelete() throws InterruptedException {
+    public void testOnDataTreeChangedDelete() {
         Mockito.when(configModification.getDataBefore()).thenReturn(dataBefore);
         Mockito.when(configModification.getDataAfter()).thenReturn(null);
         final SyncupEntry syncupEntry = loadOperationalDSAndPrepareSyncupEntry(null, confgDS, dataBefore, confgDS);
@@ -139,7 +139,7 @@ public class SimplifiedConfigListenerTest {
     }
 
     private SyncupEntry loadOperationalDSAndPrepareSyncupEntry(final FlowCapableNode after, final LogicalDatastoreType dsTypeAfter,
-                                                               final FlowCapableNode before, final LogicalDatastoreType dsTypeBefore) throws InterruptedException {
+                                                               final FlowCapableNode before, final LogicalDatastoreType dsTypeBefore) {
         Mockito.when(roTx.read(LogicalDatastoreType.OPERATIONAL, fcNodePath))
                 .thenReturn(Futures.immediateCheckedFuture(Optional.of(dataBefore)));
         final SyncupEntry syncupEntry = new SyncupEntry(after, dsTypeAfter, before, dsTypeBefore);
index 0f6a856ad07243590e438b8b5d964010948cd820..15790d97869418018058f1599deba0391a14ca70 100644 (file)
@@ -259,7 +259,7 @@ public class SimplifiedOperationalListenerTest {
     }
 
     private SyncupEntry loadConfigDSAndPrepareSyncupEntry(final FlowCapableNode after, final LogicalDatastoreType dsTypeAfter,
-                                                          final FlowCapableNode before, final LogicalDatastoreType dsTypeBefore) throws InterruptedException {
+                                                          final FlowCapableNode before, final LogicalDatastoreType dsTypeBefore) {
         Mockito.when(roTx.read(LogicalDatastoreType.CONFIGURATION, fcNodePath))
                 .thenReturn(Futures.immediateCheckedFuture(Optional.of(configNode)));
         final SyncupEntry syncupEntry = new SyncupEntry(after, dsTypeAfter, before, dsTypeBefore);
index 3330dd44f91194c296ef431f8e30e05f2e8363a2..8b95fc009e6eca68a4b3c0cd6df1dc153bfacc56 100644 (file)
@@ -50,7 +50,7 @@ public class SyncReactorClusterDecoratorTest {
     }
 
     @Test
-    public void testSyncupMaster() throws InterruptedException {
+    public void testSyncupMaster() {
         Mockito.when(deviceMastershipManager.isDeviceMastered(NODE_ID)).thenReturn(true);
 
         reactor.syncup(fcNodePath, syncupEntry);
@@ -60,7 +60,7 @@ public class SyncReactorClusterDecoratorTest {
     }
 
     @Test
-    public void testSyncupSlave() throws InterruptedException {
+    public void testSyncupSlave() {
         Mockito.when(deviceMastershipManager.isDeviceMastered(NODE_ID)).thenReturn(false);
 
         reactor.syncup(fcNodePath, syncupEntry);
index b300744294ca6335711bb102d064ab11395bfa49..5eca5c54be7de2555f606d113355457e85a24e95 100644 (file)
@@ -31,7 +31,9 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.runners.MockitoJUnitRunner;
 import org.mockito.stubbing.Answer;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper;
 import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
+import org.opendaylight.openflowplugin.applications.frsync.util.SemaphoreKeeperGuavaImpl;
 import org.opendaylight.openflowplugin.applications.frsync.util.SyncupEntry;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
@@ -63,13 +65,14 @@ public class SyncReactorFutureZipDecoratorTest {
 
     @Before
     public void setUp() {
+        final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper = new SemaphoreKeeperGuavaImpl<>(1, true);
         final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
                 .setDaemon(false)
                 .setNameFormat("frsync-test-%d")
                 .setUncaughtExceptionHandler((thread, e) -> LOG.error("Uncaught exception {}", thread, e))
                 .build());
         syncThreadPool = MoreExecutors.listeningDecorator(executorService);
-        reactor = new SyncReactorFutureZipDecorator(delegate, syncThreadPool);
+        reactor = new SyncReactorFutureZipDecorator(delegate, syncThreadPool, semaphoreKeeper);
         fcNodePath = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(NODE_ID))
                 .augmentation(FlowCapableNode.class);
     }
@@ -202,8 +205,8 @@ public class SyncReactorFutureZipDecoratorTest {
         Mockito.verify(delegate, Mockito.times(1)).syncup(fcNodePath, second);
     }
 
-    private void mockSyncupWithEntry(final SyncupEntry entry) throws InterruptedException {
-        Mockito.when(delegate.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Mockito.eq(entry)))
+    private void mockSyncupWithEntry(final SyncupEntry entry) {
+        Mockito.when(delegate.syncup(Matchers.any(), Mockito.eq(entry)))
                 .thenReturn(Futures.immediateFuture(Boolean.TRUE));
     }
 
index 5b43181d65d6bdedc117abcc9713ecb2d763728e..827ccf9b30fa1bdfb550a39622d5ef082e431909 100644 (file)
@@ -16,6 +16,7 @@ import org.mockito.Matchers;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.runners.MockitoJUnitRunner;
+import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper;
 import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
 import org.opendaylight.openflowplugin.applications.frsync.util.SemaphoreKeeperGuavaImpl;
 import org.opendaylight.openflowplugin.applications.frsync.util.SyncupEntry;
@@ -47,7 +48,7 @@ public class SyncReactorGuardDecoratorTest {
 
     @Before
     public void setUp() throws Exception {
-        final SemaphoreKeeperGuavaImpl semaphoreKeeper = new SemaphoreKeeperGuavaImpl<InstanceIdentifier<FlowCapableNode>>(1, true);
+        final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper = new SemaphoreKeeperGuavaImpl<>(1, true);
         reactor = new SyncReactorGuardDecorator(delegate, semaphoreKeeper);
         InstanceIdentifier<Node> nodePath = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(NODE_ID));
         fcNodePath = nodePath.augmentation(FlowCapableNode.class);
@@ -58,8 +59,8 @@ public class SyncReactorGuardDecoratorTest {
     }
 
     @Test
-    public void testSyncupSuccess() throws Exception {
-        Mockito.when(delegate.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Matchers.<SyncupEntry>any()))
+    public void testSyncupSuccess() {
+        Mockito.when(delegate.syncup(Matchers.any(), Matchers.any()))
                 .thenReturn(Futures.immediateFuture(Boolean.TRUE));
 
         reactor.syncup(fcNodePath, syncupEntry);
@@ -69,15 +70,13 @@ public class SyncReactorGuardDecoratorTest {
     }
 
     @Test
-    public void testSyncupFail() throws Exception {
-        Mockito.when(delegate.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Matchers.<SyncupEntry>any()))
+    public void testSyncupFail() {
+        Mockito.when(delegate.syncup(Matchers.any(), Matchers.any()))
                 .thenReturn(Futures.immediateFailedFuture(new Exception()));
 
         reactor.syncup(fcNodePath, syncupEntry);
 
         Mockito.verify(delegate).syncup(fcNodePath, syncupEntry);
         Mockito.verifyNoMoreInteractions(delegate);
-
     }
-
 }
\ No newline at end of file
index 8151e620df889e0ebe4cb74e5e329f64d25ba88f..87f686c78a6d741faedd4c6740b5e9466c036e44 100644 (file)
@@ -51,7 +51,7 @@ public class SyncReactorRetryDecoratorTest {
     }
 
     @Test
-    public void testSyncupSuccess() throws InterruptedException {
+    public void testSyncupSuccess() {
         Mockito.when(delegate.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Matchers.<SyncupEntry>any()))
                 .thenReturn(Futures.immediateFuture(Boolean.TRUE));
 
@@ -63,7 +63,7 @@ public class SyncReactorRetryDecoratorTest {
     }
 
     @Test
-    public void testSyncupFail() throws InterruptedException {
+    public void testSyncupFail() {
         Mockito.when(delegate.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Matchers.<SyncupEntry>any()))
                 .thenReturn(Futures.immediateFuture(Boolean.FALSE));
 
@@ -75,7 +75,7 @@ public class SyncReactorRetryDecoratorTest {
     }
 
     @Test
-    public void testSyncupConfigIgnoreInRetry() throws InterruptedException {
+    public void testSyncupConfigIgnoreInRetry() {
         Mockito.when(reconciliationRegistry.isRegistered(NODE_ID)).thenReturn(true);
         Mockito.when(syncupEntry.isOptimizedConfigDelta()).thenReturn(true);
 
@@ -25,14 +25,14 @@ import org.slf4j.LoggerFactory;
 /**
  * Test for {@link SemaphoreKeeperGuavaImpl}.
  */
-public class SemaphoreKeeperTest {
-    private static final Logger LOG = LoggerFactory.getLogger(SemaphoreKeeperTest.class);
+public class SemaphoreKeeperGuavaImplTest {
+    private static final Logger LOG = LoggerFactory.getLogger(SemaphoreKeeperGuavaImplTest.class);
     private SemaphoreKeeperGuavaImpl<String> semaphoreKeeper;
     private final String key = "11";
 
     @Before
     public void setUp() throws Exception {
-        semaphoreKeeper = new SemaphoreKeeperGuavaImpl(1, true);
+        semaphoreKeeper = new SemaphoreKeeperGuavaImpl<>(1, true);
     }
 
     @Test