Merge "Bug 6745 Improve compression queue locking and handle InterruptedException"
[openflowplugin.git] / applications / forwardingrules-sync / src / main / java / org / opendaylight / openflowplugin / applications / frsync / impl / SyncReactorFutureZipDecorator.java
1 /**
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.frsync.impl;
10
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import com.google.common.util.concurrent.ListeningExecutorService;
14 import java.util.HashMap;
15 import java.util.Map;
16 import java.util.Objects;
17 import java.util.concurrent.Semaphore;
18 import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper;
19 import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
20 import org.opendaylight.openflowplugin.applications.frsync.util.SyncupEntry;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
22 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
23
24 /**
25  * Enriches {@link SyncReactorFutureDecorator} with state compression.
26  */
27 public class SyncReactorFutureZipDecorator extends SyncReactorFutureDecorator {
28
29     private final Map<InstanceIdentifier<FlowCapableNode>, SyncupEntry> compressionQueue = new HashMap<>();
30     private final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper;
31
32     public SyncReactorFutureZipDecorator(final SyncReactor delegate,
33                                          final ListeningExecutorService executorService,
34                                          final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper) {
35         super(delegate, executorService);
36         this.semaphoreKeeper = semaphoreKeeper;
37     }
38
39     public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
40                                             final SyncupEntry syncupEntry) {
41         Semaphore guard = null;
42         try {
43             guard = semaphoreKeeper.summonGuardAndAcquire(flowcapableNodePath);
44             if (Objects.isNull(guard)) {
45                 return Futures.immediateFuture(Boolean.FALSE);
46             }
47             final boolean newTaskNecessary = updateCompressionState(flowcapableNodePath, syncupEntry);
48             if (newTaskNecessary) {
49                 super.syncup(flowcapableNodePath, syncupEntry);
50             }
51             return Futures.immediateFuture(Boolean.TRUE);
52         } finally {
53             semaphoreKeeper.releaseGuard(guard);
54         }
55     }
56
57     protected ListenableFuture<Boolean> doSyncupInFuture(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
58                                                          final SyncupEntry syncupEntry) {
59         final SyncupEntry lastCompressionState = removeLastCompressionState(flowcapableNodePath);
60
61         if (lastCompressionState == null) {
62             return Futures.immediateFuture(Boolean.TRUE);
63         } else {
64             return super.doSyncupInFuture(flowcapableNodePath, lastCompressionState);
65         }
66     }
67
68     /**
69      * If a syncup entry for corresponding the device is present in compression queue and new configuration diff is
70      * coming - update the entry in compression queue (zip). Create new (no entry in queue for device) or replace
71      * entry (config vs. operational is coming) in queue otherwise.
72      */
73     private boolean updateCompressionState(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
74                                            final SyncupEntry syncupEntry) {
75         final SyncupEntry previousEntry = compressionQueue.get(flowcapableNodePath);
76
77         if (previousEntry != null && syncupEntry.isOptimizedConfigDelta()) {
78             updateOptimizedConfigDelta(flowcapableNodePath, syncupEntry, previousEntry);
79         } else {
80             compressionQueue.put(flowcapableNodePath, syncupEntry);
81         }
82         return previousEntry == null;
83     }
84
85     private void updateOptimizedConfigDelta(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
86                                             final SyncupEntry actual,
87                                             final SyncupEntry previous) {
88         final SyncupEntry updatedEntry = new SyncupEntry(actual.getAfter(), actual.getDsTypeAfter(),
89                                                          previous.getBefore(), previous.getDsTypeBefore());
90         compressionQueue.put(flowcapableNodePath, updatedEntry);
91     }
92
93     private SyncupEntry removeLastCompressionState(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath) {
94         Semaphore guard = null;
95         try {
96             guard = semaphoreKeeper.summonGuardAndAcquire(flowcapableNodePath);
97             if (Objects.isNull(guard)) {
98                 return null;
99             }
100             return compressionQueue.remove(flowcapableNodePath);
101         } finally {
102             semaphoreKeeper.releaseGuard(guard);
103         }
104     }
105 }