Remove Objects.{is,non}Null abuse
[openflowplugin.git] / applications / forwardingrules-sync / src / main / java / org / opendaylight / openflowplugin / applications / frsync / impl / SyncReactorFutureZipDecorator.java
index 060a04964c02aff5ba2f9a5ca79604e9bc67c87c..8f49b42a9dfb2ff21abd255115d1fd1f29b00a9c 100644 (file)
@@ -1,11 +1,10 @@
-/**
+/*
  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.openflowplugin.applications.frsync.impl;
 
 import com.google.common.util.concurrent.Futures;
@@ -14,71 +13,67 @@ import com.google.common.util.concurrent.ListeningExecutorService;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.Semaphore;
-import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper;
 import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
-import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
+import org.opendaylight.openflowplugin.applications.frsync.util.SemaphoreKeeperGuavaImpl;
 import org.opendaylight.openflowplugin.applications.frsync.util.SyncupEntry;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Enriches {@link SyncReactorFutureDecorator} with state compression.
  */
 public class SyncReactorFutureZipDecorator extends SyncReactorFutureDecorator {
 
-    private static final Logger LOG = LoggerFactory.getLogger(SyncReactorFutureZipDecorator.class);
-
-    @GuardedBy("compressionGuard")
     private final Map<InstanceIdentifier<FlowCapableNode>, SyncupEntry> compressionQueue = new HashMap<>();
-    private final Semaphore compressionGuard = new Semaphore(1, false);
+    private final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper =
+            new SemaphoreKeeperGuavaImpl<>(1, true);
 
-    public SyncReactorFutureZipDecorator(SyncReactor delegate, ListeningExecutorService executorService) {
+    public SyncReactorFutureZipDecorator(final SyncReactor delegate, final ListeningExecutorService executorService) {
         super(delegate, executorService);
     }
 
+    @Override
     public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
-                                            final SyncupEntry syncupEntry) throws InterruptedException {
-        final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
-        LOG.trace("syncup zip decorator: {}", nodeId.getValue());
-
+                                            final SyncupEntry syncupEntry) {
+        Semaphore guard = null;
         try {
-            compressionGuard.acquire();
-
-            final boolean newFutureNecessary = updateCompressionState(flowcapableNodePath, syncupEntry);
-            if (newFutureNecessary) {
+            guard = semaphoreKeeper.summonGuardAndAcquire(flowcapableNodePath);
+            if (guard == null) {
+                return Futures.immediateFuture(Boolean.FALSE);
+            }
+            final boolean newTaskNecessary = updateCompressionState(flowcapableNodePath, syncupEntry);
+            if (newTaskNecessary) {
                 super.syncup(flowcapableNodePath, syncupEntry);
             }
-            return Futures.immediateFuture(true);
+            return Futures.immediateFuture(Boolean.TRUE);
         } finally {
-            compressionGuard.release();
+            semaphoreKeeper.releaseGuard(guard);
         }
     }
 
+    @Override
     protected ListenableFuture<Boolean> doSyncupInFuture(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
-                                                         final SyncupEntry syncupEntry) throws InterruptedException {
-        final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
-        LOG.trace("doSyncupInFuture zip decorator: {}", nodeId.getValue());
-
+                                                         final SyncupEntry syncupEntry) {
         final SyncupEntry lastCompressionState = removeLastCompressionState(flowcapableNodePath);
+
         if (lastCompressionState == null) {
-            return Futures.immediateFuture(true);
+            return Futures.immediateFuture(Boolean.TRUE);
         } else {
             return super.doSyncupInFuture(flowcapableNodePath, lastCompressionState);
         }
     }
 
     /**
-     * If there is config delta in compression queue for the device and new configuration is coming,
-     * update its zip queue entry. Create/replace zip queue entry for the device with operational delta otherwise.
+     * If a syncup entry for corresponding the device is present in compression queue and new configuration diff is
+     * coming - update the entry in compression queue (zip). Create new (no entry in queue for device) or replace
+     * entry (config vs. operational is coming) in queue otherwise.
      */
     private boolean updateCompressionState(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
                                            final SyncupEntry syncupEntry) {
         final SyncupEntry previousEntry = compressionQueue.get(flowcapableNodePath);
 
-        if (previousEntry != null && syncupEntry.isOptimizedConfigDelta() && previousEntry.isOptimizedConfigDelta()) {
+        if (previousEntry != null && syncupEntry.isOptimizedConfigDelta()) {
             updateOptimizedConfigDelta(flowcapableNodePath, syncupEntry, previousEntry);
         } else {
             compressionQueue.put(flowcapableNodePath, syncupEntry);
@@ -86,25 +81,21 @@ public class SyncReactorFutureZipDecorator extends SyncReactorFutureDecorator {
         return previousEntry == null;
     }
 
-    private void updateOptimizedConfigDelta(InstanceIdentifier<FlowCapableNode> flowcapableNodePath, SyncupEntry actual,
-                                            SyncupEntry previous) {
-        compressionQueue.put(flowcapableNodePath, new SyncupEntry(actual.getAfter(), actual.getDsTypeAfter(),
-                                                                  previous.getBefore(), previous.getDsTypeBefore()));
+    private void updateOptimizedConfigDelta(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
+                                            final SyncupEntry actual,
+                                            final SyncupEntry previous) {
+        final SyncupEntry updatedEntry = new SyncupEntry(actual.getAfter(), actual.getDsTypeAfter(),
+                                                         previous.getBefore(), previous.getDsTypeBefore());
+        compressionQueue.put(flowcapableNodePath, updatedEntry);
     }
 
-    private SyncupEntry removeLastCompressionState(
-            final InstanceIdentifier<FlowCapableNode> flowcapableNodePath) {
+    private SyncupEntry removeLastCompressionState(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath) {
+        Semaphore guard = null;
         try {
-            try {
-                compressionGuard.acquire();
-            } catch (InterruptedException e) {
-                return null;
-            }
-
-            return compressionQueue.remove(flowcapableNodePath);
+            guard = semaphoreKeeper.summonGuardAndAcquire(flowcapableNodePath);
+            return guard == null ? null : compressionQueue.remove(flowcapableNodePath);
         } finally {
-            compressionGuard.release();
+            semaphoreKeeper.releaseGuard(guard);
         }
     }
-
 }
\ No newline at end of file