Create SemaphoreKeeper inside decorators 44/46244/1
authorAndrej Leitner <andrej.leitner@pantheon.tech>
Wed, 28 Sep 2016 07:17:40 +0000 (09:17 +0200)
committerAndrej Leitner <andrej.leitner@pantheon.tech>
Wed, 28 Sep 2016 09:27:28 +0000 (11:27 +0200)
Change-Id: I29fb2da53d32b331459a4492ca15a4081ea79195
Signed-off-by: Andrej Leitner <andrej.leitner@pantheon.tech>
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/SemaphoreKeeper.java
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/ForwardingRulesSyncProvider.java
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureDecorator.java
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureZipDecorator.java
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorGuardDecorator.java
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/ReconcileUtil.java
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/SemaphoreKeeperGuavaImpl.java
applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorFutureZipDecoratorTest.java
applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/SyncReactorGuardDecoratorTest.java

index 4964f14919fa51c006d96dc3ff4308550a34557c..9f31ad541796d45ebe3bb7f889c7016753595da0 100644 (file)
@@ -13,27 +13,14 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 /**
- * Proposal for how a key based semaphore provider should look like.
- * <ul>
- * <li>thread safe</li>
- * <li>garbage-collect unused semaphores</li>
- * <li>for the same key there must be always only one semaphore available</li>
- * </ul>
- *
- *
- * usage:
- * <pre>
- * final Semaphore guard = semaphoreKeeper.summonGuard(key);
- * guard.acquire();
- * // guard protected logic ...
- * guard.release();
- * </pre>
- *
+ * Key based semaphore provider.
+ * For the same key there is always only one semaphore available. Unused semaphores are garbage-collect.
  * @param <K> key type
  */
 
 public interface SemaphoreKeeper<K> {
     /**
+     * Create or load semaphore for key from cache.
      * @param key semaphore identifier
      * @return new or existing semaphore for given key, for one key there is always only one semaphore available
      */
index cdbec91a79d4e9c2e7d78d5a7bf974e0999f25db..fe4f43b3c6d85123d6bbd052fcf775b5e05b8441 100644 (file)
@@ -34,7 +34,6 @@ import org.opendaylight.openflowplugin.applications.frsync.impl.clustering.Devic
 import org.opendaylight.openflowplugin.applications.frsync.impl.strategy.SyncPlanPushStrategyFlatBatchImpl;
 import org.opendaylight.openflowplugin.applications.frsync.impl.strategy.TableForwarder;
 import org.opendaylight.openflowplugin.applications.frsync.util.ReconciliationRegistry;
-import org.opendaylight.openflowplugin.applications.frsync.util.SemaphoreKeeperGuavaImpl;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.SalFlatBatchService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
@@ -112,10 +111,8 @@ public class ForwardingRulesSyncProvider implements AutoCloseable, BindingAwareP
 
         final SyncReactor syncReactorImpl = new SyncReactorImpl(syncPlanPushStrategy);
         final SyncReactor syncReactorRetry = new SyncReactorRetryDecorator(syncReactorImpl, reconciliationRegistry);
-        final SyncReactor syncReactorGuard = new SyncReactorGuardDecorator(syncReactorRetry,
-                new SemaphoreKeeperGuavaImpl<>(1, true));
-        final SyncReactor syncReactorFutureZip = new SyncReactorFutureZipDecorator(syncReactorGuard, syncThreadPool,
-                new SemaphoreKeeperGuavaImpl<>(1, true));
+        final SyncReactor syncReactorGuard = new SyncReactorGuardDecorator(syncReactorRetry);
+        final SyncReactor syncReactorFutureZip = new SyncReactorFutureZipDecorator(syncReactorGuard, syncThreadPool);
 
         final SyncReactor reactor = new SyncReactorClusterDecorator(syncReactorFutureZip, deviceMastershipManager);
 
index 51ef17dba0234cea08adf16292d7d7e90254ae9f..5bb10906071b0405ff9e861712f36f549e764c42 100644 (file)
@@ -42,7 +42,7 @@ public class SyncReactorFutureDecorator implements SyncReactor {
             try {
                 return doSyncupInFuture(flowcapableNodePath, syncupEntry).get(10000, TimeUnit.MILLISECONDS);
             } catch (TimeoutException e) {
-                LOG.warn("Syncup future timeout occured {}", nodeId.getValue(), e);
+                LOG.warn("Syncup future timeout occured {}", nodeId.getValue());
                 return Boolean.FALSE;
             }
         });
index a21592792130187ac8be06bbaca42f0e8c0eab93..384b0c6327531b7c2d556010332ff10364213583 100644 (file)
@@ -17,6 +17,7 @@ import java.util.Objects;
 import java.util.concurrent.Semaphore;
 import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper;
 import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
+import org.opendaylight.openflowplugin.applications.frsync.util.SemaphoreKeeperGuavaImpl;
 import org.opendaylight.openflowplugin.applications.frsync.util.SyncupEntry;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
@@ -27,13 +28,11 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 public class SyncReactorFutureZipDecorator extends SyncReactorFutureDecorator {
 
     private final Map<InstanceIdentifier<FlowCapableNode>, SyncupEntry> compressionQueue = new HashMap<>();
-    private final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper;
+    private final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper =
+            new SemaphoreKeeperGuavaImpl<>(1, true);
 
-    public SyncReactorFutureZipDecorator(final SyncReactor delegate,
-                                         final ListeningExecutorService executorService,
-                                         final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper) {
+    public SyncReactorFutureZipDecorator(final SyncReactor delegate, final ListeningExecutorService executorService) {
         super(delegate, executorService);
-        this.semaphoreKeeper = semaphoreKeeper;
     }
 
     public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
index a6d1bef72da98b563e777875bb4159e66d6d43a7..f4ee57fdde6d860404148191ae6b66ccd6f4bc48 100644 (file)
@@ -18,6 +18,7 @@ import javax.annotation.Nullable;
 import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper;
 import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
 import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
+import org.opendaylight.openflowplugin.applications.frsync.util.SemaphoreKeeperGuavaImpl;
 import org.opendaylight.openflowplugin.applications.frsync.util.SyncupEntry;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
@@ -32,12 +33,11 @@ public class SyncReactorGuardDecorator implements SyncReactor {
 
     private static final Logger LOG = LoggerFactory.getLogger(SyncReactorGuardDecorator.class);
     private final SyncReactor delegate;
-    private final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper;
+    private final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper =
+            new SemaphoreKeeperGuavaImpl<>(1, true);
 
-    public SyncReactorGuardDecorator(final SyncReactor delegate,
-                                     final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper) {
+    public SyncReactorGuardDecorator(final SyncReactor delegate) {
         this.delegate = delegate;
-        this.semaphoreKeeper = semaphoreKeeper;
     }
 
     public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
@@ -59,9 +59,9 @@ public class SyncReactorGuardDecorator implements SyncReactor {
     }
 
     private FutureCallback<Boolean> createSyncupCallback(final Semaphore guard,
-                                                                final long stampBeforeGuard,
-                                                                final long stampAfterGuard,
-                                                                final NodeId nodeId) {
+                                                         final long stampBeforeGuard,
+                                                         final long stampAfterGuard,
+                                                         final NodeId nodeId) {
         return new FutureCallback<Boolean>() {
             @Override
             public void onSuccess(@Nullable final Boolean result) {
index 85f04d438d6f1162e4d8c7f5e7118a51bc247626..9bc5fb7dcf90dbfc696e403e97cc9496d5fcbbb8 100644 (file)
@@ -268,7 +268,7 @@ public final class ReconcileUtil {
      * @param gatherUpdates      check content of pending item if present on device (and create update task eventually)
      * @return list of safe synchronization steps
      */
-    public static ItemSyncBox<Flow> resolveFlowDiffsInTable(final List<Flow> flowsConfigured,
+    private static ItemSyncBox<Flow> resolveFlowDiffsInTable(final List<Flow> flowsConfigured,
                                                             final Map<FlowDescriptor, Flow> flowOperationalMap,
                                                             final boolean gatherUpdates) {
         final ItemSyncBox<Flow> flowsSyncBox = new ItemSyncBox<>();
index b506ddf17adb5d3ba8d927a1ff7cc495a7dca7e7..12040c58112d620f1f1e208127d046fca7f9305f 100644 (file)
@@ -20,9 +20,6 @@ import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/**
- * Key-based semaphore provider.
- */
 public class SemaphoreKeeperGuavaImpl<K> implements SemaphoreKeeper<K> {
 
     private static final Logger LOG = LoggerFactory.getLogger(SemaphoreKeeperGuavaImpl.class);
index 5eca5c54be7de2555f606d113355457e85a24e95..6b59c9b3e22474c496d5ecab7ed497377722ce95 100644 (file)
@@ -31,9 +31,7 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.runners.MockitoJUnitRunner;
 import org.mockito.stubbing.Answer;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper;
 import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
-import org.opendaylight.openflowplugin.applications.frsync.util.SemaphoreKeeperGuavaImpl;
 import org.opendaylight.openflowplugin.applications.frsync.util.SyncupEntry;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
@@ -65,14 +63,13 @@ public class SyncReactorFutureZipDecoratorTest {
 
     @Before
     public void setUp() {
-        final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper = new SemaphoreKeeperGuavaImpl<>(1, true);
         final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
                 .setDaemon(false)
                 .setNameFormat("frsync-test-%d")
                 .setUncaughtExceptionHandler((thread, e) -> LOG.error("Uncaught exception {}", thread, e))
                 .build());
         syncThreadPool = MoreExecutors.listeningDecorator(executorService);
-        reactor = new SyncReactorFutureZipDecorator(delegate, syncThreadPool, semaphoreKeeper);
+        reactor = new SyncReactorFutureZipDecorator(delegate, syncThreadPool);
         fcNodePath = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(NODE_ID))
                 .augmentation(FlowCapableNode.class);
     }
index 827ccf9b30fa1bdfb550a39622d5ef082e431909..099d45cc2fc8ada72eb57a00d0c545a54eca29c0 100644 (file)
@@ -16,9 +16,7 @@ import org.mockito.Matchers;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.runners.MockitoJUnitRunner;
-import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper;
 import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
-import org.opendaylight.openflowplugin.applications.frsync.util.SemaphoreKeeperGuavaImpl;
 import org.opendaylight.openflowplugin.applications.frsync.util.SyncupEntry;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
@@ -48,8 +46,7 @@ public class SyncReactorGuardDecoratorTest {
 
     @Before
     public void setUp() throws Exception {
-        final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper = new SemaphoreKeeperGuavaImpl<>(1, true);
-        reactor = new SyncReactorGuardDecorator(delegate, semaphoreKeeper);
+        reactor = new SyncReactorGuardDecorator(delegate);
         InstanceIdentifier<Node> nodePath = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(NODE_ID));
         fcNodePath = nodePath.augmentation(FlowCapableNode.class);