Merge "Bug 6050 - TcpSrcCodec is not maskable"
[openflowplugin.git] / applications / forwardingrules-sync / src / main / java / org / opendaylight / openflowplugin / applications / frsync / impl / SyncReactorGuardDecorator.java
1 /**\r
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.\r
3  *\r
4  * This program and the accompanying materials are made available under the\r
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
6  * and is available at http://www.eclipse.org/legal/epl-v10.html\r
7  */\r
8 \r
9 package org.opendaylight.openflowplugin.applications.frsync.impl;\r
10 \r
11 import com.google.common.base.Preconditions;\r
12 import com.google.common.util.concurrent.FutureCallback;\r
13 import com.google.common.util.concurrent.Futures;\r
14 import com.google.common.util.concurrent.ListenableFuture;\r
15 import java.util.concurrent.Semaphore;\r
16 import java.util.concurrent.TimeUnit;\r
17 import javax.annotation.Nullable;\r
18 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;\r
19 import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper;\r
20 import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;\r
21 import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;\r
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;\r
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;\r
24 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;\r
25 import org.slf4j.Logger;\r
26 import org.slf4j.LoggerFactory;\r
27 \r
28 /**\r
29  * Decorator for NodeId level syncup locking.\r
30  */\r
31 public class SyncReactorGuardDecorator implements SyncReactor {\r
32 \r
33     private static final Logger LOG = LoggerFactory.getLogger(SyncReactorGuardDecorator.class);\r
34 \r
35     private final SyncReactor delegate;\r
36     private final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper;\r
37 \r
38     public SyncReactorGuardDecorator(SyncReactor delegate,\r
39             SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper) {\r
40         this.delegate = delegate;\r
41         this.semaphoreKeeper = semaphoreKeeper;\r
42     }\r
43 \r
44     public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,\r
45                                             final FlowCapableNode configTree, final FlowCapableNode operationalTree,\r
46                                             final LogicalDatastoreType dsType) throws InterruptedException {\r
47         final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
48         LOG.trace("syncup {}", nodeId.getValue());\r
49 \r
50         final long stampBeforeGuard = System.nanoTime();\r
51         final Semaphore guard = summonGuardAndAcquire(flowcapableNodePath);//TODO handle InteruptedException\r
52 \r
53         try {\r
54             final long stampAfterGuard = System.nanoTime();\r
55             if (LOG.isDebugEnabled()) {\r
56                 LOG.debug("syncup start {} waiting:{} guard:{} thread:{}", nodeId.getValue(),\r
57                         formatNanos(stampAfterGuard - stampBeforeGuard),\r
58                         guard, threadName());\r
59             }\r
60 \r
61             final ListenableFuture<Boolean> endResult =\r
62                     delegate.syncup(flowcapableNodePath, configTree, operationalTree, dsType);//TODO handle InteruptedException\r
63 \r
64             Futures.addCallback(endResult, new FutureCallback<Boolean>() {\r
65                 @Override\r
66                 public void onSuccess(@Nullable final Boolean result) {\r
67                     if (LOG.isDebugEnabled()) {\r
68                         final long stampFinished = System.nanoTime();\r
69                         LOG.debug("syncup finished {} took:{} rpc:{} wait:{} guard:{}, thread:{}", nodeId.getValue(),\r
70                                 formatNanos(stampFinished - stampBeforeGuard),\r
71                                 formatNanos(stampFinished - stampAfterGuard),\r
72                                 formatNanos(stampAfterGuard - stampBeforeGuard),\r
73                                 guard, threadName());\r
74                     }\r
75 \r
76                     releaseGuardForNodeId(nodeId, guard);\r
77                 }\r
78 \r
79                 @Override\r
80                 public void onFailure(final Throwable t) {\r
81                     if (LOG.isDebugEnabled()) {\r
82                         final long stampFinished = System.nanoTime();\r
83                         LOG.warn("syncup failed {} took:{} rpc:{} wait:{} guard:{} thread:{}", nodeId.getValue(),\r
84                                 formatNanos(stampFinished - stampBeforeGuard),\r
85                                 formatNanos(stampFinished - stampAfterGuard),\r
86                                 formatNanos(stampAfterGuard - stampBeforeGuard),\r
87                                 guard, threadName());\r
88                     }\r
89 \r
90                     releaseGuardForNodeId(nodeId, guard);\r
91                 }\r
92             });\r
93             return endResult;\r
94         } catch(InterruptedException e) {\r
95             releaseGuardForNodeId(nodeId, guard);\r
96             throw e;\r
97         }\r
98     }\r
99 \r
100     protected String formatNanos(long nanos) {\r
101         return "'" + TimeUnit.NANOSECONDS.toMillis(nanos) + " ms'";\r
102     }\r
103 \r
104     /**\r
105      * get guard\r
106      *\r
107      * @param flowcapableNodePath II of node for which guard should be acquired\r
108      * @return semaphore guard\r
109      */\r
110     protected Semaphore summonGuardAndAcquire(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath)\r
111             throws InterruptedException {\r
112         final Semaphore guard = Preconditions.checkNotNull(semaphoreKeeper.summonGuard(flowcapableNodePath),\r
113                 "no guard for " + flowcapableNodePath);\r
114 \r
115         if (LOG.isDebugEnabled()) {\r
116             final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
117             try {\r
118                 LOG.debug("syncup summon {} guard:{} thread:{}", nodeId.getValue(), guard, threadName());\r
119             } catch (Exception e) {\r
120                 LOG.error("error logging guard after summon before aquiring {}", nodeId);\r
121             }\r
122         }\r
123 \r
124         guard.acquire();\r
125         return guard;\r
126     }\r
127 \r
128     /**\r
129      * unlock per node\r
130      *\r
131      * @param nodeId NodeId of node which should be unlocked\r
132      * @param guard semaphore guard\r
133      */\r
134     protected void releaseGuardForNodeId(final NodeId nodeId, final Semaphore guard) {\r
135         if (guard == null) {\r
136             return;\r
137         }\r
138         guard.release();\r
139     }\r
140 \r
141     static String threadName() {\r
142         final Thread currentThread = Thread.currentThread();\r
143         return currentThread.getName();\r
144     }\r
145 \r
146 }\r