Merge "Bug 6075 - Missing unregistration for Nshc1 codec in NiciraExtensionsRegistrator"
[openflowplugin.git] / applications / forwardingrules-sync / src / main / java / org / opendaylight / openflowplugin / applications / frsync / impl / SyncReactorGuardDecorator.java
1 /**\r
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.\r
3  *\r
4  * This program and the accompanying materials are made available under the\r
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
6  * and is available at http://www.eclipse.org/legal/epl-v10.html\r
7  */\r
8 \r
9 package org.opendaylight.openflowplugin.applications.frsync.impl;\r
10 \r
11 import com.google.common.base.Preconditions;\r
12 import com.google.common.util.concurrent.FutureCallback;\r
13 import com.google.common.util.concurrent.Futures;\r
14 import com.google.common.util.concurrent.ListenableFuture;\r
15 import java.util.concurrent.Semaphore;\r
16 import java.util.concurrent.TimeUnit;\r
17 import javax.annotation.Nullable;\r
18 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;\r
19 import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper;\r
20 import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;\r
21 import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;\r
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;\r
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;\r
24 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;\r
25 import org.slf4j.Logger;\r
26 import org.slf4j.LoggerFactory;\r
27 \r
28 /**\r
29  * Decorator for NodeId level syncup locking.\r
30  */\r
31 public class SyncReactorGuardDecorator implements SyncReactor {\r
32 \r
33     private static final Logger LOG = LoggerFactory.getLogger(SyncReactorGuardDecorator.class);\r
34 \r
35     private final SyncReactor delegate;\r
36     private final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper;\r
37 \r
38     public SyncReactorGuardDecorator(SyncReactor delegate,\r
39             SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper) {\r
40         this.delegate = delegate;\r
41         this.semaphoreKeeper = semaphoreKeeper;\r
42     }\r
43 \r
44     public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,\r
45                                             final FlowCapableNode configTree, final FlowCapableNode operationalTree,\r
46                                             final LogicalDatastoreType dsType) throws InterruptedException {\r
47         final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
48         LOG.trace("syncup guard {}", nodeId.getValue());\r
49 \r
50         final long stampBeforeGuard = System.nanoTime();\r
51         final Semaphore guard = summonGuardAndAcquire(flowcapableNodePath);//TODO handle InteruptedException\r
52 \r
53         try {\r
54             final long stampAfterGuard = System.nanoTime();\r
55             if (LOG.isDebugEnabled()) {\r
56                 LOG.debug("syncup start {} waiting:{} guard:{} thread:{}", nodeId.getValue(),\r
57                         formatNanos(stampAfterGuard - stampBeforeGuard),\r
58                         guard, threadName());\r
59             }\r
60 \r
61             final ListenableFuture<Boolean> endResult =\r
62                     delegate.syncup(flowcapableNodePath, configTree, operationalTree, dsType);//TODO handle InteruptedException\r
63 \r
64             Futures.addCallback(endResult, new FutureCallback<Boolean>() {\r
65                 @Override\r
66                 public void onSuccess(@Nullable final Boolean result) {\r
67                     if (LOG.isDebugEnabled()) {\r
68                         final long stampFinished = System.nanoTime();\r
69                         LOG.debug("syncup finished {} took:{} rpc:{} wait:{} guard:{} permits thread:{}", nodeId.getValue(),\r
70                                 formatNanos(stampFinished - stampBeforeGuard),\r
71                                 formatNanos(stampFinished - stampAfterGuard),\r
72                                 formatNanos(stampAfterGuard - stampBeforeGuard),\r
73                                 guard.availablePermits(), threadName());\r
74                     }\r
75 \r
76                     releaseGuardForNodeId(guard);\r
77                 }\r
78 \r
79                 @Override\r
80                 public void onFailure(final Throwable t) {\r
81                     if (LOG.isDebugEnabled()) {\r
82                         final long stampFinished = System.nanoTime();\r
83                         LOG.warn("syncup failed {} took:{} rpc:{} wait:{} guard:{} permits thread:{}", nodeId.getValue(),\r
84                                 formatNanos(stampFinished - stampBeforeGuard),\r
85                                 formatNanos(stampFinished - stampAfterGuard),\r
86                                 formatNanos(stampAfterGuard - stampBeforeGuard),\r
87                                 guard.availablePermits(), threadName());\r
88                     }\r
89 \r
90                     releaseGuardForNodeId(guard);\r
91                 }\r
92             });\r
93             return endResult;\r
94         } catch (InterruptedException e) {\r
95             releaseGuardForNodeId(guard);\r
96             throw e;\r
97         }\r
98     }\r
99 \r
100     private String formatNanos(long nanos) {\r
101         return "'" + TimeUnit.NANOSECONDS.toMillis(nanos) + " ms'";\r
102     }\r
103 \r
104     /**\r
105      * Get guard and lock for node.\r
106      * @param flowcapableNodePath II of node for which guard should be acquired\r
107      * @return semaphore guard\r
108      */\r
109     private Semaphore summonGuardAndAcquire(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath)\r
110             throws InterruptedException {\r
111         final Semaphore guard = Preconditions.checkNotNull(semaphoreKeeper.summonGuard(flowcapableNodePath),\r
112                 "no guard for " + flowcapableNodePath);\r
113 \r
114         if (LOG.isDebugEnabled()) {\r
115             final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
116             try {\r
117                 LOG.debug("syncup summon {} guard:{} thread:{}", nodeId.getValue(), guard, threadName());\r
118             } catch (Exception e) {\r
119                 LOG.error("error logging guard after summon before aquiring {}", nodeId);\r
120             }\r
121         }\r
122 \r
123         guard.acquire();\r
124         return guard;\r
125     }\r
126 \r
127     /**\r
128      * Unlock and release guard.\r
129      * @param guard semaphore guard which should be unlocked\r
130      */\r
131     private void releaseGuardForNodeId(final Semaphore guard) {\r
132         if (guard == null) {\r
133             return;\r
134         }\r
135         guard.release();\r
136     }\r
137 \r
138     private static String threadName() {\r
139         final Thread currentThread = Thread.currentThread();\r
140         return currentThread.getName();\r
141     }\r
142 \r
143 }\r