Merge "BUG-2771: Converting String.split() to Guava Splitter for Inventory Utils"
[openflowplugin.git] / applications / forwardingrules-sync / src / main / java / org / opendaylight / openflowplugin / applications / frsync / impl / SyncReactorGuardDecorator.java
1 /**\r
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.\r
3  *\r
4  * This program and the accompanying materials are made available under the\r
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
6  * and is available at http://www.eclipse.org/legal/epl-v10.html\r
7  */\r
8 \r
9 package org.opendaylight.openflowplugin.applications.frsync.impl;\r
10 \r
11 import com.google.common.base.Preconditions;\r
12 import com.google.common.util.concurrent.FutureCallback;\r
13 import com.google.common.util.concurrent.Futures;\r
14 import com.google.common.util.concurrent.ListenableFuture;\r
15 import java.util.concurrent.Semaphore;\r
16 import java.util.concurrent.TimeUnit;\r
17 import javax.annotation.Nullable;\r
18 import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper;\r
19 import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;\r
20 import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;\r
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;\r
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;\r
23 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;\r
24 import org.slf4j.Logger;\r
25 import org.slf4j.LoggerFactory;\r
26 \r
27 /**\r
28  * Decorator for NodeId level syncup locking.\r
29  */\r
30 public class SyncReactorGuardDecorator implements SyncReactor {\r
31 \r
32     private static final Logger LOG = LoggerFactory.getLogger(SyncReactorGuardDecorator.class);\r
33 \r
34     private final SyncReactor delegate;\r
35     private final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper;\r
36 \r
37     public SyncReactorGuardDecorator(SyncReactor delegate,\r
38             SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper) {\r
39         this.delegate = delegate;\r
40         this.semaphoreKeeper = semaphoreKeeper;\r
41     }\r
42 \r
43     public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,\r
44             final FlowCapableNode configTree, final FlowCapableNode operationalTree) throws InterruptedException {\r
45         final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
46         LOG.trace("syncup {}", nodeId.getValue());\r
47 \r
48         final long stampBeforeGuard = System.nanoTime();\r
49         final Semaphore guard = summonGuardAndAcquire(flowcapableNodePath);//TODO handle InteruptedException\r
50 \r
51         try {\r
52             final long stampAfterGuard = System.nanoTime();\r
53             if (LOG.isDebugEnabled()) {\r
54                 LOG.debug("syncup start {} waiting:{} guard:{} thread:{}", nodeId.getValue(),\r
55                         formatNanos(stampAfterGuard - stampBeforeGuard),\r
56                         guard, threadName());\r
57             }\r
58             \r
59             final ListenableFuture<Boolean> endResult =\r
60                     delegate.syncup(flowcapableNodePath, configTree, operationalTree);//TODO handle InteruptedException\r
61             \r
62             Futures.addCallback(endResult, new FutureCallback<Boolean>() {\r
63                 @Override\r
64                 public void onSuccess(@Nullable final Boolean result) {\r
65                     if (LOG.isDebugEnabled()) {\r
66                         final long stampFinished = System.nanoTime();\r
67                         LOG.debug("syncup finished {} took:{} rpc:{} wait:{} guard:{}, thread:{}", nodeId.getValue(),\r
68                                 formatNanos(stampFinished - stampBeforeGuard),\r
69                                 formatNanos(stampFinished - stampAfterGuard),\r
70                                 formatNanos(stampAfterGuard - stampBeforeGuard),\r
71                                 guard, threadName());\r
72                     }\r
73 \r
74                     releaseGuardForNodeId(nodeId, guard);\r
75                 }\r
76 \r
77                 @Override\r
78                 public void onFailure(final Throwable t) {\r
79                     if (LOG.isDebugEnabled()) {\r
80                         final long stampFinished = System.nanoTime();\r
81                         LOG.warn("syncup failed {} took:{} rpc:{} wait:{} guard:{} thread:{}", nodeId.getValue(),\r
82                                 formatNanos(stampFinished - stampBeforeGuard),\r
83                                 formatNanos(stampFinished - stampAfterGuard),\r
84                                 formatNanos(stampAfterGuard - stampBeforeGuard),\r
85                                 guard, threadName());\r
86                     }\r
87 \r
88                     releaseGuardForNodeId(nodeId, guard);\r
89                 }\r
90             });\r
91             return endResult;\r
92         } catch(InterruptedException e) {\r
93             releaseGuardForNodeId(nodeId, guard);\r
94             throw e;\r
95         }\r
96     }\r
97 \r
98     protected String formatNanos(long nanos) {\r
99         return "'" + TimeUnit.NANOSECONDS.toMillis(nanos) + " ms'";\r
100     }\r
101 \r
102     /**\r
103      * get guard\r
104      *\r
105      * @param flowcapableNodePath II of node for which guard should be acquired\r
106      * @return semaphore guard\r
107      */\r
108     protected Semaphore summonGuardAndAcquire(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath)\r
109             throws InterruptedException {\r
110         final Semaphore guard = Preconditions.checkNotNull(semaphoreKeeper.summonGuard(flowcapableNodePath),\r
111                 "no guard for " + flowcapableNodePath);\r
112 \r
113         if (LOG.isDebugEnabled()) {\r
114             final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
115             try {\r
116                 LOG.debug("syncup summon {} guard:{} thread:{}", nodeId.getValue(), guard, threadName());\r
117             } catch (Exception e) {\r
118                 LOG.error("error logging guard after summon before aquiring {}", nodeId);\r
119             }\r
120         }\r
121 \r
122         guard.acquire();\r
123         return guard;\r
124     }\r
125 \r
126     /**\r
127      * unlock per node\r
128      *\r
129      * @param nodeId NodeId of node which should be unlocked\r
130      * @param guard semaphore guard\r
131      */\r
132     protected void releaseGuardForNodeId(final NodeId nodeId, final Semaphore guard) {\r
133         if (guard == null) {\r
134             return;\r
135         }\r
136         guard.release();\r
137     }\r
138 \r
139     static String threadName() {\r
140         final Thread currentThread = Thread.currentThread();\r
141         return currentThread.getName();\r
142     }\r
143 \r
144 }\r