Merge changes from topic 'blueprint'
[openflowplugin.git] / applications / forwardingrules-sync / src / main / java / org / opendaylight / openflowplugin / applications / frsync / impl / SyncReactorFutureDecorator.java
1 /**\r
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.\r
3  *\r
4  * This program and the accompanying materials are made available under the\r
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
6  * and is available at http://www.eclipse.org/legal/epl-v10.html\r
7  */\r
8 \r
9 package org.opendaylight.openflowplugin.applications.frsync.impl;\r
10 \r
11 import com.google.common.util.concurrent.ListenableFuture;\r
12 import com.google.common.util.concurrent.ListeningExecutorService;\r
13 import java.util.concurrent.Callable;\r
14 import java.util.concurrent.TimeUnit;\r
15 import java.util.concurrent.TimeoutException;\r
16 import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;\r
17 import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;\r
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;\r
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;\r
20 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;\r
21 import org.slf4j.Logger;\r
22 import org.slf4j.LoggerFactory;\r
23 \r
24 /**\r
25  * Decorator for running delegate syncup in Future.\r
26  */\r
27 public class SyncReactorFutureDecorator implements SyncReactor {\r
28 \r
29     private static final Logger LOG = LoggerFactory.getLogger(SyncReactorFutureDecorator.class);\r
30 \r
31     private final SyncReactor delegate;\r
32     private final ListeningExecutorService executorService;\r
33 \r
34     public static final String FRM_RPC_CLIENT_PREFIX = "FRM-RPC-client-";\r
35 \r
36     public SyncReactorFutureDecorator(SyncReactor delegate, ListeningExecutorService executorService) {\r
37         this.delegate = delegate;\r
38         this.executorService = executorService;\r
39     }\r
40 \r
41     public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,\r
42             final FlowCapableNode configTree, final FlowCapableNode operationalTree) throws InterruptedException {\r
43         final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
44         LOG.trace("syncup {}", nodeId.getValue());\r
45 \r
46         final ListenableFuture<Boolean> syncup = executorService.submit(new Callable<Boolean>() {\r
47             public Boolean call() throws Exception {\r
48                 final String oldThreadName = updateThreadName(nodeId);\r
49 \r
50                 try {\r
51                     final Boolean ret = doSyncupInFuture(flowcapableNodePath, configTree, operationalTree)\r
52                             .get(10000, TimeUnit.MILLISECONDS);\r
53                     LOG.trace("ret {} {}", nodeId.getValue(), ret);\r
54                     return true;\r
55                 } catch (TimeoutException e) {\r
56                     LOG.error("doSyncupInFuture timeout occured {}", nodeId.getValue(), e);\r
57                     return false;\r
58                 } finally {\r
59                     updateThreadName(oldThreadName);\r
60                 }\r
61             }\r
62         });\r
63         \r
64         return syncup;\r
65     }\r
66 \r
67     protected ListenableFuture<Boolean> doSyncupInFuture(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,\r
68             final FlowCapableNode configTree, final FlowCapableNode operationalTree)\r
69                     throws InterruptedException {\r
70         final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
71         LOG.trace("doSyncupInFuture {}", nodeId.getValue());\r
72 \r
73         return delegate.syncup(flowcapableNodePath, configTree, operationalTree);\r
74     }\r
75 \r
76     static String threadName() {\r
77         final Thread currentThread = Thread.currentThread();\r
78         return currentThread.getName();\r
79     }\r
80 \r
81     protected String updateThreadName(NodeId nodeId) {\r
82         final Thread currentThread = Thread.currentThread();\r
83         final String oldName = currentThread.getName();\r
84         try {\r
85             if (oldName.startsWith(SyncReactorFutureDecorator.FRM_RPC_CLIENT_PREFIX)) {\r
86                 currentThread.setName(oldName + "@" + nodeId.getValue());\r
87             } else {\r
88                 LOG.warn("try to update foreign thread name {} {}", nodeId, oldName);\r
89             }\r
90         } catch (Exception e) {\r
91             LOG.error("failed updating threadName {}", nodeId, e);\r
92         }\r
93         return oldName;\r
94     }\r
95 \r
96     protected String updateThreadName(String name) {\r
97         final Thread currentThread = Thread.currentThread();\r
98         final String oldName = currentThread.getName();\r
99         try {\r
100             if (oldName.startsWith(SyncReactorFutureDecorator.FRM_RPC_CLIENT_PREFIX)) {\r
101                 currentThread.setName(name);\r
102             } else {\r
103                 LOG.warn("try to update foreign thread name {} {}", oldName, name);\r
104             }\r
105         } catch (Exception e) {\r
106             LOG.error("failed updating threadName {}", name, e);\r
107         }\r
108         return oldName;\r
109     }\r
110 }\r