Merge "Bug 5974: He plugin: Don't invalidate session context that is not valid."
[openflowplugin.git] / applications / forwardingrules-sync / src / main / java / org / opendaylight / openflowplugin / applications / frsync / impl / SyncReactorFutureDecorator.java
1 /**
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.frsync.impl;
10
11 import com.google.common.util.concurrent.ListenableFuture;
12 import com.google.common.util.concurrent.ListeningExecutorService;
13 import java.util.concurrent.Callable;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.TimeoutException;
16 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
17 import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
18 import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
21 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24
25 /**
26  * Decorator for running delegate syncup in Future.
27  */
28 public class SyncReactorFutureDecorator implements SyncReactor {
29
30     private static final Logger LOG = LoggerFactory.getLogger(SyncReactorFutureDecorator.class);
31     public static final String FRM_RPC_CLIENT_PREFIX = "FRM-RPC-client-";
32     private final SyncReactor delegate;
33     private final ListeningExecutorService executorService;
34
35     public SyncReactorFutureDecorator(SyncReactor delegate, ListeningExecutorService executorService) {
36         this.delegate = delegate;
37         this.executorService = executorService;
38     }
39
40     public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
41                                             final FlowCapableNode configTree, final FlowCapableNode operationalTree,
42                                             final LogicalDatastoreType dsType) throws InterruptedException {
43         final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
44         LOG.trace("syncup future {}", nodeId.getValue());
45
46         final ListenableFuture<Boolean> syncup = executorService.submit(new Callable<Boolean>() {
47             public Boolean call() throws Exception {
48                 final String oldThreadName = updateThreadName(nodeId);
49
50                 try {
51                     final Boolean ret = doSyncupInFuture(flowcapableNodePath, configTree, operationalTree, dsType)
52                             .get(10000, TimeUnit.MILLISECONDS);
53                     LOG.trace("ret {} {}", nodeId.getValue(), ret);
54                     return true;
55                 } catch (TimeoutException e) {
56                     LOG.error("doSyncupInFuture timeout occured {}", nodeId.getValue(), e);
57                     return false;
58                 } finally {
59                     updateThreadName(oldThreadName);
60                 }
61             }
62         });
63
64         return syncup;
65     }
66
67     protected ListenableFuture<Boolean> doSyncupInFuture(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
68                                                          final FlowCapableNode configTree, final FlowCapableNode operationalTree,
69                                                          final LogicalDatastoreType dsType) throws InterruptedException {
70         final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
71         LOG.trace("doSyncupInFuture future {}", nodeId.getValue());
72
73         return delegate.syncup(flowcapableNodePath, configTree, operationalTree, dsType);
74     }
75
76     private String updateThreadName(NodeId nodeId) {
77         final Thread currentThread = Thread.currentThread();
78         final String oldName = currentThread.getName();
79         try {
80             if (oldName.startsWith(SyncReactorFutureDecorator.FRM_RPC_CLIENT_PREFIX)) {
81                 currentThread.setName(oldName + "@" + nodeId.getValue());
82             } else {
83                 LOG.warn("try to update foreign thread name {} {}", nodeId, oldName);
84             }
85         } catch (Exception e) {
86             LOG.error("failed updating threadName {}", nodeId, e);
87         }
88         return oldName;
89     }
90
91     private String updateThreadName(String name) {
92         final Thread currentThread = Thread.currentThread();
93         final String oldName = currentThread.getName();
94         try {
95             if (oldName.startsWith(SyncReactorFutureDecorator.FRM_RPC_CLIENT_PREFIX)) {
96                 currentThread.setName(name);
97             } else {
98                 LOG.warn("try to update foreign thread name {} {}", oldName, name);
99             }
100         } catch (Exception e) {
101             LOG.error("failed updating threadName {}", name, e);
102         }
103         return oldName;
104     }
105 }