import java.util.Map;
import java.util.concurrent.Semaphore;
import javax.annotation.concurrent.GuardedBy;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
-import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
-import org.opendaylight.openflowplugin.applications.frsync.util.ZipQueueEntry;
+import org.opendaylight.openflowplugin.applications.frsync.util.SyncupEntry;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Enriches {@link SyncReactorFutureDecorator} with state compression.
*/
public class SyncReactorFutureZipDecorator extends SyncReactorFutureDecorator {
- private static final Logger LOG = LoggerFactory.getLogger(SyncReactorFutureZipDecorator.class);
-
@GuardedBy("compressionGuard")
- private final Map<InstanceIdentifier<FlowCapableNode>, ZipQueueEntry> compressionQueue = new HashMap<>();
- private final Semaphore compressionGuard = new Semaphore(1, false);
+ private final Map<InstanceIdentifier<FlowCapableNode>, SyncupEntry> compressionQueue = new HashMap<>();
+ private final Semaphore compressionGuard = new Semaphore(1, true);
- public SyncReactorFutureZipDecorator(SyncReactor delegate, ListeningExecutorService executorService) {
+ public SyncReactorFutureZipDecorator(final SyncReactor delegate, final ListeningExecutorService executorService) {
super(delegate, executorService);
}
public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
- final FlowCapableNode configTree, final FlowCapableNode operationalTree,
- final LogicalDatastoreType dsType) throws InterruptedException {
- final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
- LOG.trace("syncup zip {}", nodeId.getValue());
-
+ final SyncupEntry syncupEntry) throws InterruptedException {
try {
compressionGuard.acquire();
-
- final boolean newFutureNecessary = updateCompressionState(flowcapableNodePath, configTree, operationalTree, dsType);
- if (newFutureNecessary) {
- super.syncup(flowcapableNodePath, configTree, operationalTree, dsType);
+ final boolean newTaskNecessary = updateCompressionState(flowcapableNodePath, syncupEntry);
+ if (newTaskNecessary) {
+ super.syncup(flowcapableNodePath, syncupEntry);
}
- return Futures.immediateFuture(true);
+ return Futures.immediateFuture(Boolean.TRUE);
} finally {
compressionGuard.release();
}
}
protected ListenableFuture<Boolean> doSyncupInFuture(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
- final FlowCapableNode configTree, final FlowCapableNode operationalTree,
- final LogicalDatastoreType dsType) throws InterruptedException {
- final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
- LOG.trace("doSyncupInFuture zip {}", nodeId.getValue());
-
- final ZipQueueEntry lastCompressionState = removeLastCompressionState(flowcapableNodePath);
+ final SyncupEntry syncupEntry) throws InterruptedException {
+ final SyncupEntry lastCompressionState = removeLastCompressionState(flowcapableNodePath);
if (lastCompressionState == null) {
- return Futures.immediateFuture(true);
+ return Futures.immediateFuture(Boolean.TRUE);
} else {
- return super.doSyncupInFuture(flowcapableNodePath,
- lastCompressionState.getLeft(), lastCompressionState.getRight(), dsType);
+ return super.doSyncupInFuture(flowcapableNodePath, lastCompressionState);
}
}
/**
- * If there is config delta in compression queue for the device and new configuration is coming,
- * update its zip queue entry. Create/replace zip queue entry for the device with operational delta otherwise.
+ * If a syncup entry for corresponding the device is present in compression queue and new configuration diff is
+ * coming - update the entry in compression queue (zip). Create new (no entry in queue for device) or replace
+ * entry (config vs. operational is coming) in queue otherwise.
*/
private boolean updateCompressionState(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
- final FlowCapableNode configTree, final FlowCapableNode operationalTree,
- final LogicalDatastoreType dsType) {
- final ZipQueueEntry previousEntry = compressionQueue.get(flowcapableNodePath);
-
- if (previousEntry != null && dsType == LogicalDatastoreType.CONFIGURATION
- && previousEntry.getDsType() == LogicalDatastoreType.CONFIGURATION) {
- putOptimizedConfigDelta(flowcapableNodePath, configTree, previousEntry);
+ final SyncupEntry syncupEntry) {
+ final SyncupEntry previousEntry = compressionQueue.get(flowcapableNodePath);
+ if (previousEntry != null && syncupEntry.isOptimizedConfigDelta()) {
+ updateOptimizedConfigDelta(flowcapableNodePath, syncupEntry, previousEntry);
} else {
- putLatestOperationalDelta(flowcapableNodePath, configTree, operationalTree, dsType);
+ compressionQueue.put(flowcapableNodePath, syncupEntry);
}
return previousEntry == null;
}
- private void putOptimizedConfigDelta(InstanceIdentifier<FlowCapableNode> flowcapableNodePath, FlowCapableNode configTree,
- ZipQueueEntry previous) {
- compressionQueue.put(flowcapableNodePath, new ZipQueueEntry(configTree, previous.getRight(), previous.getDsType()));
+ private void updateOptimizedConfigDelta(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
+ final SyncupEntry actual,
+ final SyncupEntry previous) {
+ final SyncupEntry updatedEntry = new SyncupEntry(actual.getAfter(), actual.getDsTypeAfter(),
+ previous.getBefore(), previous.getDsTypeBefore());
+ compressionQueue.put(flowcapableNodePath, updatedEntry);
}
- private void putLatestOperationalDelta(InstanceIdentifier<FlowCapableNode> flowcapableNodePath, FlowCapableNode configTree,
- FlowCapableNode operationalTree, LogicalDatastoreType dsType) {
- compressionQueue.put(flowcapableNodePath, new ZipQueueEntry(configTree, operationalTree, dsType));
- }
-
- private ZipQueueEntry removeLastCompressionState(
- final InstanceIdentifier<FlowCapableNode> flowcapableNodePath) {
+ private SyncupEntry removeLastCompressionState(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath) {
try {
try {
compressionGuard.acquire();
} catch (InterruptedException e) {
return null;
}
-
return compressionQueue.remove(flowcapableNodePath);
} finally {
compressionGuard.release();
}
}
-
}
\ No newline at end of file