2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.applications.frsync.impl;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import com.google.common.util.concurrent.ListeningExecutorService;
14 import java.util.HashMap;
16 import java.util.Objects;
17 import java.util.concurrent.Semaphore;
18 import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper;
19 import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
20 import org.opendaylight.openflowplugin.applications.frsync.util.SyncupEntry;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
22 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
25 * Enriches {@link SyncReactorFutureDecorator} with state compression.
27 public class SyncReactorFutureZipDecorator extends SyncReactorFutureDecorator {
29 private final Map<InstanceIdentifier<FlowCapableNode>, SyncupEntry> compressionQueue = new HashMap<>();
30 private final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper;
32 public SyncReactorFutureZipDecorator(final SyncReactor delegate,
33 final ListeningExecutorService executorService,
34 final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper) {
35 super(delegate, executorService);
36 this.semaphoreKeeper = semaphoreKeeper;
39 public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
40 final SyncupEntry syncupEntry) {
41 Semaphore guard = null;
43 guard = semaphoreKeeper.summonGuardAndAcquire(flowcapableNodePath);
44 if (Objects.isNull(guard)) {
45 return Futures.immediateFuture(Boolean.FALSE);
47 final boolean newTaskNecessary = updateCompressionState(flowcapableNodePath, syncupEntry);
48 if (newTaskNecessary) {
49 super.syncup(flowcapableNodePath, syncupEntry);
51 return Futures.immediateFuture(Boolean.TRUE);
53 semaphoreKeeper.releaseGuard(guard);
57 protected ListenableFuture<Boolean> doSyncupInFuture(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
58 final SyncupEntry syncupEntry) {
59 final SyncupEntry lastCompressionState = removeLastCompressionState(flowcapableNodePath);
61 if (lastCompressionState == null) {
62 return Futures.immediateFuture(Boolean.TRUE);
64 return super.doSyncupInFuture(flowcapableNodePath, lastCompressionState);
69 * If a syncup entry for corresponding the device is present in compression queue and new configuration diff is
70 * coming - update the entry in compression queue (zip). Create new (no entry in queue for device) or replace
71 * entry (config vs. operational is coming) in queue otherwise.
73 private boolean updateCompressionState(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
74 final SyncupEntry syncupEntry) {
75 final SyncupEntry previousEntry = compressionQueue.get(flowcapableNodePath);
77 if (previousEntry != null && syncupEntry.isOptimizedConfigDelta()) {
78 updateOptimizedConfigDelta(flowcapableNodePath, syncupEntry, previousEntry);
80 compressionQueue.put(flowcapableNodePath, syncupEntry);
82 return previousEntry == null;
85 private void updateOptimizedConfigDelta(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
86 final SyncupEntry actual,
87 final SyncupEntry previous) {
88 final SyncupEntry updatedEntry = new SyncupEntry(actual.getAfter(), actual.getDsTypeAfter(),
89 previous.getBefore(), previous.getDsTypeBefore());
90 compressionQueue.put(flowcapableNodePath, updatedEntry);
93 private SyncupEntry removeLastCompressionState(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath) {
94 Semaphore guard = null;
96 guard = semaphoreKeeper.summonGuardAndAcquire(flowcapableNodePath);
97 if (Objects.isNull(guard)) {
100 return compressionQueue.remove(flowcapableNodePath);
102 semaphoreKeeper.releaseGuard(guard);