Merge "Revert "Bug 6745 Do not ignore syncup return value""
[openflowplugin.git] / applications / forwardingrules-sync / src / main / java / org / opendaylight / openflowplugin / applications / frsync / impl / SyncReactorFutureZipDecorator.java
index a3edde05a884882b7f86eaa5029012346f1af9af..f6301dc4f13bb23679843727597b5858a7878d79 100644 (file)
@@ -15,106 +15,82 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.Semaphore;
 import javax.annotation.concurrent.GuardedBy;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
-import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
-import org.opendaylight.openflowplugin.applications.frsync.util.ZipQueueEntry;
+import org.opendaylight.openflowplugin.applications.frsync.util.SyncupEntry;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Enriches {@link SyncReactorFutureDecorator} with state compression.
  */
 public class SyncReactorFutureZipDecorator extends SyncReactorFutureDecorator {
 
-    private static final Logger LOG = LoggerFactory.getLogger(SyncReactorFutureZipDecorator.class);
-
     @GuardedBy("compressionGuard")
-    private final Map<InstanceIdentifier<FlowCapableNode>, ZipQueueEntry> compressionQueue = new HashMap<>();
-    private final Semaphore compressionGuard = new Semaphore(1, false);
+    private final Map<InstanceIdentifier<FlowCapableNode>, SyncupEntry> compressionQueue = new HashMap<>();
+    private final Semaphore compressionGuard = new Semaphore(1, true);
 
-    public SyncReactorFutureZipDecorator(SyncReactor delegate, ListeningExecutorService executorService) {
+    public SyncReactorFutureZipDecorator(final SyncReactor delegate, final ListeningExecutorService executorService) {
         super(delegate, executorService);
     }
 
     public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
-                                            final FlowCapableNode configTree, final FlowCapableNode operationalTree,
-                                            final LogicalDatastoreType dsType) throws InterruptedException {
-        final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
-        LOG.trace("syncup zip {}", nodeId.getValue());
-
+                                            final SyncupEntry syncupEntry) throws InterruptedException {
         try {
             compressionGuard.acquire();
-
-            final boolean newFutureNecessary = updateCompressionState(flowcapableNodePath, configTree, operationalTree, dsType);
-            if (newFutureNecessary) {
-                super.syncup(flowcapableNodePath, configTree, operationalTree, dsType);
+            final boolean newTaskNecessary = updateCompressionState(flowcapableNodePath, syncupEntry);
+            if (newTaskNecessary) {
+                super.syncup(flowcapableNodePath, syncupEntry);
             }
-            return Futures.immediateFuture(true);
+            return Futures.immediateFuture(Boolean.TRUE);
         } finally {
             compressionGuard.release();
         }
     }
 
     protected ListenableFuture<Boolean> doSyncupInFuture(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
-                                                         final FlowCapableNode configTree, final FlowCapableNode operationalTree,
-                                                         final LogicalDatastoreType dsType) throws InterruptedException {
-        final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
-        LOG.trace("doSyncupInFuture zip {}", nodeId.getValue());
-
-        final ZipQueueEntry lastCompressionState = removeLastCompressionState(flowcapableNodePath);
+                                                         final SyncupEntry syncupEntry) throws InterruptedException {
+        final SyncupEntry lastCompressionState = removeLastCompressionState(flowcapableNodePath);
         if (lastCompressionState == null) {
-            return Futures.immediateFuture(true);
+            return Futures.immediateFuture(Boolean.TRUE);
         } else {
-            return super.doSyncupInFuture(flowcapableNodePath,
-                    lastCompressionState.getLeft(), lastCompressionState.getRight(), dsType);
+            return super.doSyncupInFuture(flowcapableNodePath, lastCompressionState);
         }
     }
 
     /**
-     * If there is config delta in compression queue for the device and new configuration is coming,
-     * update its zip queue entry. Create/replace zip queue entry for the device with operational delta otherwise.
+     * If a syncup entry for corresponding the device is present in compression queue and new configuration diff is
+     * coming - update the entry in compression queue (zip). Create new (no entry in queue for device) or replace
+     * entry (config vs. operational is coming) in queue otherwise.
      */
     private boolean updateCompressionState(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
-                                           final FlowCapableNode configTree, final FlowCapableNode operationalTree,
-                                           final LogicalDatastoreType dsType) {
-        final ZipQueueEntry previousEntry = compressionQueue.get(flowcapableNodePath);
-
-        if (previousEntry != null && dsType == LogicalDatastoreType.CONFIGURATION
-                && previousEntry.getDsType() == LogicalDatastoreType.CONFIGURATION) {
-            putOptimizedConfigDelta(flowcapableNodePath, configTree, previousEntry);
+                                           final SyncupEntry syncupEntry) {
+        final SyncupEntry previousEntry = compressionQueue.get(flowcapableNodePath);
+        if (previousEntry != null && syncupEntry.isOptimizedConfigDelta()) {
+            updateOptimizedConfigDelta(flowcapableNodePath, syncupEntry, previousEntry);
         } else {
-            putLatestOperationalDelta(flowcapableNodePath, configTree, operationalTree, dsType);
+            compressionQueue.put(flowcapableNodePath, syncupEntry);
         }
         return previousEntry == null;
     }
 
-    private void putOptimizedConfigDelta(InstanceIdentifier<FlowCapableNode> flowcapableNodePath, FlowCapableNode configTree,
-                                         ZipQueueEntry previous) {
-        compressionQueue.put(flowcapableNodePath, new ZipQueueEntry(configTree, previous.getRight(), previous.getDsType()));
+    private void updateOptimizedConfigDelta(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
+                                            final SyncupEntry actual,
+                                            final SyncupEntry previous) {
+        final SyncupEntry updatedEntry = new SyncupEntry(actual.getAfter(), actual.getDsTypeAfter(),
+                                                         previous.getBefore(), previous.getDsTypeBefore());
+        compressionQueue.put(flowcapableNodePath, updatedEntry);
     }
 
-    private void putLatestOperationalDelta(InstanceIdentifier<FlowCapableNode> flowcapableNodePath, FlowCapableNode configTree,
-                                           FlowCapableNode operationalTree, LogicalDatastoreType dsType) {
-        compressionQueue.put(flowcapableNodePath, new ZipQueueEntry(configTree, operationalTree, dsType));
-    }
-
-    private ZipQueueEntry removeLastCompressionState(
-            final InstanceIdentifier<FlowCapableNode> flowcapableNodePath) {
+    private SyncupEntry removeLastCompressionState(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath) {
         try {
             try {
                 compressionGuard.acquire();
             } catch (InterruptedException e) {
                 return null;
             }
-
             return compressionQueue.remove(flowcapableNodePath);
         } finally {
             compressionGuard.release();
         }
     }
-
 }
\ No newline at end of file