ca0e91976260ed811aaea50b063f990af66c0eae
[transportpce.git] / common / src / main / java / org / opendaylight / transportpce / common / device / DeviceTransactionManagerImpl.java
1 /*
2  * Copyright © 2017 Orange, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.transportpce.common.device;
10
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.ListeningExecutorService;
15 import com.google.common.util.concurrent.MoreExecutors;
16
17 import java.util.Optional;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.ConcurrentMap;
20 import java.util.concurrent.CountDownLatch;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.Future;
24 import java.util.concurrent.ScheduledExecutorService;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.TimeoutException;
27
28 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
29 import org.opendaylight.controller.md.sal.binding.api.MountPoint;
30 import org.opendaylight.controller.md.sal.binding.api.MountPointService;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.transportpce.common.InstanceIdentifiers;
33 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
34 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
35 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
36 import org.opendaylight.yangtools.yang.binding.DataObject;
37 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41
42 public class DeviceTransactionManagerImpl implements DeviceTransactionManager {
43
44     // TODO cache device data brokers
45     // TODO remove disconnected devices from maps
46
47     private static final Logger LOG = LoggerFactory.getLogger(DeviceTransactionManagerImpl.class);
48     private static final int NUMBER_OF_THREADS = 4;
49     private static final long GET_DATA_SUBMIT_TIMEOUT = 3000;
50     private static final TimeUnit GET_DATA_SUBMIT_TIME_UNIT = TimeUnit.MILLISECONDS;
51
52     private final MountPointService mountPointService;
53     private final ScheduledExecutorService checkingExecutor;
54     private final ListeningExecutorService listeningExecutor;
55     private final ConcurrentMap<String, CountDownLatch> deviceLocks;
56     private final long maxDurationToSubmitTransaction; // TODO set reasonable value in blueprint
57     private final TimeUnit maxDurationToSubmitTransactionTimeUnit = TimeUnit.MILLISECONDS;
58
59     public DeviceTransactionManagerImpl(MountPointService mountPointService, long maxDurationToSubmitTransaction) {
60         this.mountPointService = mountPointService;
61         this.maxDurationToSubmitTransaction = maxDurationToSubmitTransaction;
62         this.deviceLocks = new ConcurrentHashMap<>();
63         this.checkingExecutor = Executors.newScheduledThreadPool(NUMBER_OF_THREADS);
64         this.listeningExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(NUMBER_OF_THREADS));
65     }
66
67     @Override
68     public Future<Optional<DeviceTransaction>> getDeviceTransaction(String deviceId) {
69         return getDeviceTransaction(deviceId, maxDurationToSubmitTransaction, maxDurationToSubmitTransactionTimeUnit);
70     }
71
72     @Override
73     public Future<Optional<DeviceTransaction>> getDeviceTransaction(String deviceId, long timeoutToSubmit,
74             TimeUnit timeUnit) {
75         CountDownLatch newLock = new CountDownLatch(1);
76         ListenableFuture<Optional<DeviceTransaction>> future = listeningExecutor.submit(() -> {
77             LOG.debug("Starting creation of transaction for device {}.", deviceId);
78             // get current lock from device and set new lock
79             CountDownLatch actualLock = swapActualLock(deviceId, newLock);
80             if (actualLock != null) {
81                 // if lock was present on device wait until it unlocks
82                 actualLock.await();
83             }
84
85             Optional<DataBroker> deviceDataBrokerOpt = getDeviceDataBroker(deviceId);
86             DataBroker deviceDataBroker;
87             if (deviceDataBrokerOpt.isPresent()) {
88                 deviceDataBroker = deviceDataBrokerOpt.get();
89             } else {
90                 newLock.countDown();
91                 return Optional.empty();
92             }
93             LOG.debug("Created transaction for device {}.", deviceId);
94             return Optional.of(new DeviceTransaction(deviceDataBroker.newReadWriteTransaction(), newLock));
95         });
96
97         Futures.addCallback(future, new FutureCallback<Optional<DeviceTransaction>>() {
98             @Override
99             public void onSuccess(Optional<DeviceTransaction> deviceTransactionOptional) {
100                 // creates timeout for transaction to submit right after transaction is created
101                 // if time will run out and transaction was not closed then it will be cancelled (and unlocked)
102                 checkingExecutor.schedule(() -> {
103                     if (deviceTransactionOptional.isPresent()) {
104                         DeviceTransaction deviceTx = deviceTransactionOptional.get();
105                         LOG.debug("Timeout to submit transaction run out! Transaction was {} submitted or canceled.",
106                                 deviceTx.wasSubmittedOrCancelled().get() ? "" : "not");
107                         if (!deviceTx.wasSubmittedOrCancelled().get()) {
108                             LOG.error(String.format("Transaction for node %s was not submitted or canceled after %s"
109                                             + " milliseconds! Cancelling transaction!", deviceId,
110                                     timeoutToSubmit));
111                             deviceTx.cancel();
112                         }
113                     }
114                 }, timeoutToSubmit, timeUnit);
115             }
116
117             @Override
118             public void onFailure(Throwable throwable) {
119                 LOG.error("Exception thrown while getting device transaction for device {}! Unlocking device.",
120                         deviceId, throwable);
121                 newLock.countDown();
122             }
123         }, checkingExecutor);
124
125         return future;
126     }
127
128     private synchronized CountDownLatch swapActualLock(String deviceId, CountDownLatch newLock) {
129         return deviceLocks.put(deviceId, newLock);
130     }
131
132     private Optional<DataBroker> getDeviceDataBroker(String deviceId) {
133         Optional<MountPoint> netconfNode = getDeviceMountPoint(deviceId);
134         if (netconfNode.isPresent()) {
135             return netconfNode.get().getService(DataBroker.class).toJavaUtil();
136         } else {
137             LOG.error("Device mount point not found for : {}", deviceId);
138             return Optional.empty();
139         }
140     }
141
142     @Override
143     public Optional<MountPoint> getDeviceMountPoint(String deviceId) {
144         InstanceIdentifier<Node> netconfNodeIID = InstanceIdentifiers.NETCONF_TOPOLOGY_II.child(Node.class,
145                 new NodeKey(new NodeId(deviceId)));
146         return mountPointService.getMountPoint(netconfNodeIID).toJavaUtil();
147     }
148
149     @Override
150     public <T extends DataObject> Optional<T> getDataFromDevice(String deviceId,
151             LogicalDatastoreType logicalDatastoreType, InstanceIdentifier<T> path, long timeout, TimeUnit timeUnit) {
152         Optional<DeviceTransaction> deviceTxOpt;
153         try {
154             deviceTxOpt = getDeviceTransaction(deviceId, timeout, timeUnit).get();
155         } catch (InterruptedException | ExecutionException e) {
156             LOG.error("Exception thrown while getting transaction for device {}!", deviceId, e);
157             return Optional.empty();
158         }
159         if (deviceTxOpt.isPresent()) {
160             DeviceTransaction deviceTx = deviceTxOpt.get();
161             try {
162                 return deviceTx.read(logicalDatastoreType, path).get(timeout, timeUnit).toJavaUtil();
163             } catch (InterruptedException | ExecutionException | TimeoutException e) {
164                 LOG.error("Exception thrown while reading data from device {}! IID: {}", deviceId, path, e);
165             } finally {
166                 deviceTx.submit(GET_DATA_SUBMIT_TIMEOUT, GET_DATA_SUBMIT_TIME_UNIT);
167             }
168         } else {
169             LOG.error("Could not obtain transaction for device {}!", deviceId);
170         }
171         return Optional.empty();
172     }
173
174     @Override
175     public boolean isDeviceMounted(String deviceId) {
176         return getDeviceDataBroker(deviceId).isPresent();
177     }
178
179     public void preDestroy() {
180         checkingExecutor.shutdown();
181         listeningExecutor.shutdown();
182     }
183
184     public long getMaxDurationToSubmitTransaction() {
185         return maxDurationToSubmitTransaction;
186     }
187 }