Merge "Removed duplicate declaration in pom.xml."
[openflowplugin.git] / applications / forwardingrules-sync / src / main / java / org / opendaylight / openflowplugin / applications / frsync / impl / SyncReactorFutureZipDecorator.java
1 /**
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.frsync.impl;
10
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import com.google.common.util.concurrent.ListeningExecutorService;
14 import java.util.HashMap;
15 import java.util.Map;
16 import java.util.concurrent.Semaphore;
17 import javax.annotation.concurrent.GuardedBy;
18 import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
19 import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
20 import org.opendaylight.openflowplugin.applications.frsync.util.SyncupEntry;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
23 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 /**
28  * Enriches {@link SyncReactorFutureDecorator} with state compression.
29  */
30 public class SyncReactorFutureZipDecorator extends SyncReactorFutureDecorator {
31
32     private static final Logger LOG = LoggerFactory.getLogger(SyncReactorFutureZipDecorator.class);
33
34     @GuardedBy("compressionGuard")
35     private final Map<InstanceIdentifier<FlowCapableNode>, SyncupEntry> compressionQueue = new HashMap<>();
36     private final Semaphore compressionGuard = new Semaphore(1, false);
37
38     public SyncReactorFutureZipDecorator(SyncReactor delegate, ListeningExecutorService executorService) {
39         super(delegate, executorService);
40     }
41
42     public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
43                                             final SyncupEntry syncupEntry) throws InterruptedException {
44         final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
45         LOG.trace("syncup zip decorator: {}", nodeId.getValue());
46
47         try {
48             compressionGuard.acquire();
49
50             final boolean newFutureNecessary = updateCompressionState(flowcapableNodePath, syncupEntry);
51             if (newFutureNecessary) {
52                 super.syncup(flowcapableNodePath, syncupEntry);
53             }
54             return Futures.immediateFuture(true);
55         } finally {
56             compressionGuard.release();
57         }
58     }
59
60     protected ListenableFuture<Boolean> doSyncupInFuture(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
61                                                          final SyncupEntry syncupEntry) throws InterruptedException {
62         final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
63         LOG.trace("doSyncupInFuture zip decorator: {}", nodeId.getValue());
64
65         final SyncupEntry lastCompressionState = removeLastCompressionState(flowcapableNodePath);
66         if (lastCompressionState == null) {
67             return Futures.immediateFuture(true);
68         } else {
69             return super.doSyncupInFuture(flowcapableNodePath, lastCompressionState);
70         }
71     }
72
73     /**
74      * If there is config delta in compression queue for the device and new configuration is coming,
75      * update its zip queue entry. Create/replace zip queue entry for the device with operational delta otherwise.
76      */
77     private boolean updateCompressionState(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
78                                            final SyncupEntry syncupEntry) {
79         final SyncupEntry previousEntry = compressionQueue.get(flowcapableNodePath);
80
81         if (previousEntry != null && syncupEntry.isOptimizedConfigDelta() && previousEntry.isOptimizedConfigDelta()) {
82             updateOptimizedConfigDelta(flowcapableNodePath, syncupEntry, previousEntry);
83         } else {
84             compressionQueue.put(flowcapableNodePath, syncupEntry);
85         }
86         return previousEntry == null;
87     }
88
89     private void updateOptimizedConfigDelta(InstanceIdentifier<FlowCapableNode> flowcapableNodePath, SyncupEntry actual,
90                                             SyncupEntry previous) {
91         compressionQueue.put(flowcapableNodePath, new SyncupEntry(actual.getAfter(), actual.getDsTypeAfter(),
92                                                                   previous.getBefore(), previous.getDsTypeBefore()));
93     }
94
95     private SyncupEntry removeLastCompressionState(
96             final InstanceIdentifier<FlowCapableNode> flowcapableNodePath) {
97         try {
98             try {
99                 compressionGuard.acquire();
100             } catch (InterruptedException e) {
101                 return null;
102             }
103
104             return compressionQueue.remove(flowcapableNodePath);
105         } finally {
106             compressionGuard.release();
107         }
108     }
109
110 }