fix import extra separations
[transportpce.git] / common / src / main / java / org / opendaylight / transportpce / common / device / DeviceTransactionManagerImpl.java
1 /*
2  * Copyright © 2017 Orange, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.transportpce.common.device;
10
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.ListeningExecutorService;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import java.util.Optional;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.ConcurrentMap;
19 import java.util.concurrent.CountDownLatch;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.ScheduledExecutorService;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.TimeoutException;
26 import org.opendaylight.mdsal.binding.api.DataBroker;
27 import org.opendaylight.mdsal.binding.api.MountPoint;
28 import org.opendaylight.mdsal.binding.api.MountPointService;
29 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
30 import org.opendaylight.transportpce.common.InstanceIdentifiers;
31 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
32 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
33 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
34 import org.opendaylight.yangtools.yang.binding.DataObject;
35 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39
40 public class DeviceTransactionManagerImpl implements DeviceTransactionManager {
41
42     // TODO cache device data brokers
43     // TODO remove disconnected devices from maps
44
45     private static final Logger LOG = LoggerFactory.getLogger(DeviceTransactionManagerImpl.class);
46     private static final int NUMBER_OF_THREADS = 4;
47     private static final long GET_DATA_SUBMIT_TIMEOUT = 3000;
48     private static final TimeUnit GET_DATA_SUBMIT_TIME_UNIT = TimeUnit.MILLISECONDS;
49     private static final TimeUnit MAX_DURATION_TO_SUBMIT_TIMEUNIT = TimeUnit.MILLISECONDS;
50
51     private final MountPointService mountPointService;
52     private final ScheduledExecutorService checkingExecutor;
53     private final ListeningExecutorService listeningExecutor;
54     private final ConcurrentMap<String, CountDownLatch> deviceLocks;
55     // TODO set reasonable value in blueprint for maxDurationToSubmitTransaction
56     private final long maxDurationToSubmitTransaction;
57
58     public DeviceTransactionManagerImpl(MountPointService mountPointService, long maxDurationToSubmitTransaction) {
59         this.mountPointService = mountPointService;
60         this.maxDurationToSubmitTransaction = maxDurationToSubmitTransaction;
61         this.deviceLocks = new ConcurrentHashMap<>();
62         this.checkingExecutor = Executors.newScheduledThreadPool(NUMBER_OF_THREADS);
63         this.listeningExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(NUMBER_OF_THREADS));
64     }
65
66     @Override
67     public Future<Optional<DeviceTransaction>> getDeviceTransaction(String deviceId) {
68         return getDeviceTransaction(deviceId, maxDurationToSubmitTransaction, MAX_DURATION_TO_SUBMIT_TIMEUNIT);
69     }
70
71     @Override
72     public Future<Optional<DeviceTransaction>> getDeviceTransaction(String deviceId, long timeoutToSubmit,
73             TimeUnit timeUnit) {
74         CountDownLatch newLock = new CountDownLatch(1);
75         ListenableFuture<Optional<DeviceTransaction>> future = listeningExecutor.submit(() -> {
76             LOG.debug("Starting creation of transaction for device {}.", deviceId);
77             // get current lock from device and set new lock
78             CountDownLatch actualLock = swapActualLock(deviceId, newLock);
79             if (actualLock != null) {
80                 // if lock was present on device wait until it unlocks
81                 actualLock.await();
82             }
83
84             Optional<DataBroker> deviceDataBrokerOpt = getDeviceDataBroker(deviceId);
85             DataBroker deviceDataBroker;
86             if (deviceDataBrokerOpt.isPresent()) {
87                 deviceDataBroker = deviceDataBrokerOpt.get();
88             } else {
89                 newLock.countDown();
90                 return Optional.empty();
91             }
92             LOG.debug("Created transaction for device {}.", deviceId);
93             return Optional.of(new DeviceTransaction(deviceDataBroker.newReadWriteTransaction(), newLock));
94         });
95
96         Futures.addCallback(future, new FutureCallback<Optional<DeviceTransaction>>() {
97             @Override
98             public void onSuccess(Optional<DeviceTransaction> deviceTransactionOptional) {
99                 // creates timeout for transaction to submit right after transaction is created
100                 // if time will run out and transaction was not closed then it will be cancelled (and unlocked)
101                 checkingExecutor.schedule(() -> {
102                     if (deviceTransactionOptional.isPresent()) {
103                         DeviceTransaction deviceTx = deviceTransactionOptional.get();
104                         LOG.debug("Timeout to submit transaction run out! Transaction was {} submitted or canceled.",
105                                 deviceTx.wasSubmittedOrCancelled().get() ? "" : "not");
106                         if (!deviceTx.wasSubmittedOrCancelled().get()) {
107                             LOG.error(
108                                 "Transaction for node {} not submitted/canceled after {} ms. Cancelling transaction.",
109                                 deviceId, timeoutToSubmit);
110                             deviceTx.cancel();
111                         }
112                     }
113                 }, timeoutToSubmit, timeUnit);
114             }
115
116             @Override
117             public void onFailure(Throwable throwable) {
118                 LOG.error("Exception thrown while getting device transaction for device {}! Unlocking device.",
119                         deviceId, throwable);
120                 newLock.countDown();
121             }
122         }, checkingExecutor);
123
124         return future;
125     }
126
127     private synchronized CountDownLatch swapActualLock(String deviceId, CountDownLatch newLock) {
128         return deviceLocks.put(deviceId, newLock);
129     }
130
131     private Optional<DataBroker> getDeviceDataBroker(String deviceId) {
132         Optional<MountPoint> netconfNode = getDeviceMountPoint(deviceId);
133         if (netconfNode.isPresent()) {
134             return netconfNode.get().getService(DataBroker.class);
135         } else {
136             LOG.error("Device mount point not found for : {}", deviceId);
137             return Optional.empty();
138         }
139     }
140
141     @Override
142     public Optional<MountPoint> getDeviceMountPoint(String deviceId) {
143         InstanceIdentifier<Node> netconfNodeIID = InstanceIdentifiers.NETCONF_TOPOLOGY_II.child(Node.class,
144                 new NodeKey(new NodeId(deviceId)));
145         return mountPointService.getMountPoint(netconfNodeIID);
146     }
147
148     @Override
149     public <T extends DataObject> Optional<T> getDataFromDevice(String deviceId,
150             LogicalDatastoreType logicalDatastoreType, InstanceIdentifier<T> path, long timeout, TimeUnit timeUnit) {
151         Optional<DeviceTransaction> deviceTxOpt;
152         try {
153             deviceTxOpt = getDeviceTransaction(deviceId, timeout, timeUnit).get();
154         } catch (InterruptedException | ExecutionException e) {
155             LOG.error("Exception thrown while getting transaction for device {}!", deviceId, e);
156             return Optional.empty();
157         }
158         if (deviceTxOpt.isPresent()) {
159             DeviceTransaction deviceTx = deviceTxOpt.get();
160             try {
161                 return deviceTx.read(logicalDatastoreType, path).get(timeout, timeUnit);
162             } catch (InterruptedException | ExecutionException | TimeoutException e) {
163                 LOG.error("Exception thrown while reading data from device {}! IID: {}", deviceId, path, e);
164             } finally {
165                 deviceTx.commit(GET_DATA_SUBMIT_TIMEOUT, GET_DATA_SUBMIT_TIME_UNIT);
166             }
167         } else {
168             LOG.error("Could not obtain transaction for device {}!", deviceId);
169         }
170         return Optional.empty();
171     }
172
173     @Override
174     public boolean isDeviceMounted(String deviceId) {
175         return getDeviceDataBroker(deviceId).isPresent();
176     }
177
178     public void preDestroy() {
179         checkingExecutor.shutdown();
180         listeningExecutor.shutdown();
181     }
182
183     public long getMaxDurationToSubmitTransaction() {
184         return maxDurationToSubmitTransaction;
185     }
186 }