Merge "Remove M2E lifecycle-mapping <execute/> for yang-maven-plugin"
[openflowplugin.git] / applications / forwardingrules-sync / src / main / java / org / opendaylight / openflowplugin / applications / frsync / impl / SyncReactorGuardDecorator.java
1 /**\r
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.\r
3  *\r
4  * This program and the accompanying materials are made available under the\r
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
6  * and is available at http://www.eclipse.org/legal/epl-v10.html\r
7  */\r
8 \r
9 package org.opendaylight.openflowplugin.applications.frsync.impl;\r
10 \r
11 import java.util.concurrent.Semaphore;\r
12 import java.util.concurrent.TimeUnit;\r
13 \r
14 import javax.annotation.Nullable;\r
15 \r
16 import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper;\r
17 import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;\r
18 import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;\r
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;\r
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;\r
21 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;\r
22 import org.slf4j.Logger;\r
23 import org.slf4j.LoggerFactory;\r
24 \r
25 import com.google.common.base.Preconditions;\r
26 import com.google.common.util.concurrent.FutureCallback;\r
27 import com.google.common.util.concurrent.Futures;\r
28 import com.google.common.util.concurrent.ListenableFuture;\r
29 \r
30 /**\r
31  * Decorator for NodeId level syncup locking.\r
32  */\r
33 public class SyncReactorGuardDecorator implements SyncReactor {\r
34 \r
35     private static final Logger LOG = LoggerFactory.getLogger(SyncReactorGuardDecorator.class);\r
36 \r
37     private final SyncReactor delegate;\r
38     private final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper;\r
39 \r
40     public SyncReactorGuardDecorator(SyncReactor delegate,\r
41             SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper) {\r
42         this.delegate = delegate;\r
43         this.semaphoreKeeper = semaphoreKeeper;\r
44     }\r
45 \r
46     public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,\r
47             final FlowCapableNode configTree, final FlowCapableNode operationalTree) throws InterruptedException {\r
48         final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
49         LOG.trace("syncup {}", nodeId.getValue());\r
50 \r
51         final long stampBeforeGuard = System.nanoTime();\r
52         final Semaphore guard = summonGuardAndAcquire(flowcapableNodePath);//TODO handle InteruptedException\r
53 \r
54         try {\r
55             final long stampAfterGuard = System.nanoTime();\r
56             if (LOG.isDebugEnabled()) {\r
57                 LOG.debug("syncup start {} waiting:{} guard:{} thread:{}", nodeId.getValue(),\r
58                         formatNanos(stampAfterGuard - stampBeforeGuard),\r
59                         guard, threadName());\r
60             }\r
61             \r
62             final ListenableFuture<Boolean> endResult =\r
63                     delegate.syncup(flowcapableNodePath, configTree, operationalTree);//TODO handle InteruptedException\r
64             \r
65             Futures.addCallback(endResult, new FutureCallback<Boolean>() {\r
66                 @Override\r
67                 public void onSuccess(@Nullable final Boolean result) {\r
68                     if (LOG.isDebugEnabled()) {\r
69                         final long stampFinished = System.nanoTime();\r
70                         LOG.debug("syncup finished {} took:{} rpc:{} wait:{} guard:{}, thread:{}", nodeId.getValue(),\r
71                                 formatNanos(stampFinished - stampBeforeGuard),\r
72                                 formatNanos(stampFinished - stampAfterGuard),\r
73                                 formatNanos(stampAfterGuard - stampBeforeGuard),\r
74                                 guard, threadName());\r
75                     }\r
76                     \r
77                     lockReleaseForNodeId(nodeId, guard);\r
78                 }\r
79                 \r
80                 @Override\r
81                 public void onFailure(final Throwable t) {\r
82                     if (LOG.isDebugEnabled()) {\r
83                         final long stampFinished = System.nanoTime();\r
84                         LOG.warn("syncup failed {} took:{} rpc:{} wait:{} guard:{} thread:{}", nodeId.getValue(),\r
85                                 formatNanos(stampFinished - stampBeforeGuard),\r
86                                 formatNanos(stampFinished - stampAfterGuard),\r
87                                 formatNanos(stampAfterGuard - stampBeforeGuard),\r
88                                 guard, threadName());\r
89                     }\r
90                     \r
91                     lockReleaseForNodeId(nodeId, guard);\r
92                 }\r
93             });\r
94             return endResult;\r
95         } catch(InterruptedException e) {\r
96             lockReleaseForNodeId(nodeId, guard);\r
97             throw e;\r
98         }\r
99     }\r
100 \r
101     protected String formatNanos(long nanos) {\r
102         return "'" + TimeUnit.NANOSECONDS.toMillis(nanos) + " ms'";\r
103     }\r
104 \r
105     /**\r
106      * get guard\r
107      *\r
108      * @param flowcapableNodePath\r
109      * @return\r
110      */\r
111     protected Semaphore summonGuardAndAcquire(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath)\r
112             throws InterruptedException {\r
113         final Semaphore guard = Preconditions.checkNotNull(semaphoreKeeper.summonGuard(flowcapableNodePath),\r
114                 "no guard for " + flowcapableNodePath);\r
115 \r
116         if (LOG.isDebugEnabled()) {\r
117             final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
118             try {\r
119                 LOG.debug("syncup summon {} guard:{} thread:{}", nodeId.getValue(), guard, threadName());\r
120             } catch (Exception e) {\r
121                 LOG.error("error logging guard after summon before aquiring {}", nodeId);\r
122             }\r
123         }\r
124 \r
125         guard.acquire();\r
126         return guard;\r
127     }\r
128 \r
129     /**\r
130      * unlock per node\r
131      *\r
132      * @param nodeId\r
133      * @param guard\r
134      */\r
135     protected void lockReleaseForNodeId(final NodeId nodeId,\r
136             final Semaphore guard) {\r
137         if (guard == null) {\r
138             return;\r
139         }\r
140         guard.release();\r
141     }\r
142 \r
143     static String threadName() {\r
144         final Thread currentThread = Thread.currentThread();\r
145         return currentThread.getName();\r
146     }\r
147 \r
148 }\r