Merge "Bug 5974: He plugin: Don't invalidate session context that is not valid."
[openflowplugin.git] / applications / forwardingrules-sync / src / main / java / org / opendaylight / openflowplugin / applications / frsync / impl / SyncReactorFutureZipDecorator.java
1 /**
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.frsync.impl;
10
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import com.google.common.util.concurrent.ListeningExecutorService;
14 import java.util.HashMap;
15 import java.util.Map;
16 import java.util.concurrent.Semaphore;
17 import javax.annotation.concurrent.GuardedBy;
18 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
19 import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
20 import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
21 import org.opendaylight.openflowplugin.applications.frsync.util.ZipQueueEntry;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
24 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 /**
29  * Enriches {@link SyncReactorFutureDecorator} with state compression.
30  */
31 public class SyncReactorFutureZipDecorator extends SyncReactorFutureDecorator {
32
33     private static final Logger LOG = LoggerFactory.getLogger(SyncReactorFutureZipDecorator.class);
34
35     @GuardedBy("compressionGuard")
36     private final Map<InstanceIdentifier<FlowCapableNode>, ZipQueueEntry> compressionQueue = new HashMap<>();
37     private final Semaphore compressionGuard = new Semaphore(1, false);
38
39     public SyncReactorFutureZipDecorator(SyncReactor delegate, ListeningExecutorService executorService) {
40         super(delegate, executorService);
41     }
42
43     public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
44                                             final FlowCapableNode configTree, final FlowCapableNode operationalTree,
45                                             final LogicalDatastoreType dsType) throws InterruptedException {
46         final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
47         LOG.trace("syncup zip {}", nodeId.getValue());
48
49         try {
50             compressionGuard.acquire();
51
52             final boolean newFutureNecessary = updateCompressionState(flowcapableNodePath, configTree, operationalTree, dsType);
53             if (newFutureNecessary) {
54                 super.syncup(flowcapableNodePath, configTree, operationalTree, dsType);
55             }
56             return Futures.immediateFuture(true);
57         } finally {
58             compressionGuard.release();
59         }
60     }
61
62     protected ListenableFuture<Boolean> doSyncupInFuture(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
63                                                          final FlowCapableNode configTree, final FlowCapableNode operationalTree,
64                                                          final LogicalDatastoreType dsType) throws InterruptedException {
65         final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
66         LOG.trace("doSyncupInFuture zip {}", nodeId.getValue());
67
68         final ZipQueueEntry lastCompressionState = removeLastCompressionState(flowcapableNodePath);
69         if (lastCompressionState == null) {
70             return Futures.immediateFuture(true);
71         } else {
72             return super.doSyncupInFuture(flowcapableNodePath,
73                     lastCompressionState.getLeft(), lastCompressionState.getRight(), dsType);
74         }
75     }
76
77     /**
78      * If there is config delta in compression queue for the device and new configuration is coming,
79      * update its zip queue entry. Create/replace zip queue entry for the device with operational delta otherwise.
80      */
81     private boolean updateCompressionState(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
82                                            final FlowCapableNode configTree, final FlowCapableNode operationalTree,
83                                            final LogicalDatastoreType dsType) {
84         final ZipQueueEntry previousEntry = compressionQueue.get(flowcapableNodePath);
85
86         if (previousEntry != null && dsType == LogicalDatastoreType.CONFIGURATION
87                 && previousEntry.getDsType() == LogicalDatastoreType.CONFIGURATION) {
88             putOptimizedConfigDelta(flowcapableNodePath, configTree, previousEntry);
89         } else {
90             putLatestOperationalDelta(flowcapableNodePath, configTree, operationalTree, dsType);
91         }
92         return previousEntry == null;
93     }
94
95     private void putOptimizedConfigDelta(InstanceIdentifier<FlowCapableNode> flowcapableNodePath, FlowCapableNode configTree,
96                                          ZipQueueEntry previous) {
97         compressionQueue.put(flowcapableNodePath, new ZipQueueEntry(configTree, previous.getRight(), previous.getDsType()));
98     }
99
100     private void putLatestOperationalDelta(InstanceIdentifier<FlowCapableNode> flowcapableNodePath, FlowCapableNode configTree,
101                                            FlowCapableNode operationalTree, LogicalDatastoreType dsType) {
102         compressionQueue.put(flowcapableNodePath, new ZipQueueEntry(configTree, operationalTree, dsType));
103     }
104
105     private ZipQueueEntry removeLastCompressionState(
106             final InstanceIdentifier<FlowCapableNode> flowcapableNodePath) {
107         try {
108             try {
109                 compressionGuard.acquire();
110             } catch (InterruptedException e) {
111                 return null;
112             }
113
114             return compressionQueue.remove(flowcapableNodePath);
115         } finally {
116             compressionGuard.release();
117         }
118     }
119
120 }