2 * Copyright © 2017 Orange, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.transportpce.common.device;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.ListeningExecutorService;
15 import com.google.common.util.concurrent.MoreExecutors;
17 import java.util.Optional;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.ConcurrentMap;
20 import java.util.concurrent.CountDownLatch;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.Future;
24 import java.util.concurrent.ScheduledExecutorService;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.TimeoutException;
28 import org.opendaylight.mdsal.binding.api.DataBroker;
29 import org.opendaylight.mdsal.binding.api.MountPoint;
30 import org.opendaylight.mdsal.binding.api.MountPointService;
31 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
32 import org.opendaylight.transportpce.common.InstanceIdentifiers;
33 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
34 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
35 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
36 import org.opendaylight.yangtools.yang.binding.DataObject;
37 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
42 public class DeviceTransactionManagerImpl implements DeviceTransactionManager {
44 // TODO cache device data brokers
45 // TODO remove disconnected devices from maps
47 private static final Logger LOG = LoggerFactory.getLogger(DeviceTransactionManagerImpl.class);
48 private static final int NUMBER_OF_THREADS = 4;
49 private static final long GET_DATA_SUBMIT_TIMEOUT = 3000;
50 private static final TimeUnit GET_DATA_SUBMIT_TIME_UNIT = TimeUnit.MILLISECONDS;
51 private static final TimeUnit MAX_DURATION_TO_SUBMIT_TIMEUNIT = TimeUnit.MILLISECONDS;
53 private final MountPointService mountPointService;
54 private final ScheduledExecutorService checkingExecutor;
55 private final ListeningExecutorService listeningExecutor;
56 private final ConcurrentMap<String, CountDownLatch> deviceLocks;
57 // TODO set reasonable value in blueprint for maxDurationToSubmitTransaction
58 private final long maxDurationToSubmitTransaction;
60 public DeviceTransactionManagerImpl(MountPointService mountPointService, long maxDurationToSubmitTransaction) {
61 this.mountPointService = mountPointService;
62 this.maxDurationToSubmitTransaction = maxDurationToSubmitTransaction;
63 this.deviceLocks = new ConcurrentHashMap<>();
64 this.checkingExecutor = Executors.newScheduledThreadPool(NUMBER_OF_THREADS);
65 this.listeningExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(NUMBER_OF_THREADS));
69 public Future<Optional<DeviceTransaction>> getDeviceTransaction(String deviceId) {
70 return getDeviceTransaction(deviceId, maxDurationToSubmitTransaction, MAX_DURATION_TO_SUBMIT_TIMEUNIT);
74 public Future<Optional<DeviceTransaction>> getDeviceTransaction(String deviceId, long timeoutToSubmit,
76 CountDownLatch newLock = new CountDownLatch(1);
77 ListenableFuture<Optional<DeviceTransaction>> future = listeningExecutor.submit(() -> {
78 LOG.debug("Starting creation of transaction for device {}.", deviceId);
79 // get current lock from device and set new lock
80 CountDownLatch actualLock = swapActualLock(deviceId, newLock);
81 if (actualLock != null) {
82 // if lock was present on device wait until it unlocks
86 Optional<DataBroker> deviceDataBrokerOpt = getDeviceDataBroker(deviceId);
87 DataBroker deviceDataBroker;
88 if (deviceDataBrokerOpt.isPresent()) {
89 deviceDataBroker = deviceDataBrokerOpt.get();
92 return Optional.empty();
94 LOG.debug("Created transaction for device {}.", deviceId);
95 return Optional.of(new DeviceTransaction(deviceDataBroker.newReadWriteTransaction(), newLock));
98 Futures.addCallback(future, new FutureCallback<Optional<DeviceTransaction>>() {
100 public void onSuccess(Optional<DeviceTransaction> deviceTransactionOptional) {
101 // creates timeout for transaction to submit right after transaction is created
102 // if time will run out and transaction was not closed then it will be cancelled (and unlocked)
103 checkingExecutor.schedule(() -> {
104 if (deviceTransactionOptional.isPresent()) {
105 DeviceTransaction deviceTx = deviceTransactionOptional.get();
106 LOG.debug("Timeout to submit transaction run out! Transaction was {} submitted or canceled.",
107 deviceTx.wasSubmittedOrCancelled().get() ? "" : "not");
108 if (!deviceTx.wasSubmittedOrCancelled().get()) {
109 LOG.error(String.format("Transaction for node %s was not submitted or canceled after %s"
110 + " milliseconds! Cancelling transaction!", deviceId,
115 }, timeoutToSubmit, timeUnit);
119 public void onFailure(Throwable throwable) {
120 LOG.error("Exception thrown while getting device transaction for device {}! Unlocking device.",
121 deviceId, throwable);
124 }, checkingExecutor);
129 private synchronized CountDownLatch swapActualLock(String deviceId, CountDownLatch newLock) {
130 return deviceLocks.put(deviceId, newLock);
133 private Optional<DataBroker> getDeviceDataBroker(String deviceId) {
134 Optional<MountPoint> netconfNode = getDeviceMountPoint(deviceId);
135 if (netconfNode.isPresent()) {
136 return netconfNode.get().getService(DataBroker.class);
138 LOG.error("Device mount point not found for : {}", deviceId);
139 return Optional.empty();
144 public Optional<MountPoint> getDeviceMountPoint(String deviceId) {
145 InstanceIdentifier<Node> netconfNodeIID = InstanceIdentifiers.NETCONF_TOPOLOGY_II.child(Node.class,
146 new NodeKey(new NodeId(deviceId)));
147 return mountPointService.getMountPoint(netconfNodeIID);
151 public <T extends DataObject> Optional<T> getDataFromDevice(String deviceId,
152 LogicalDatastoreType logicalDatastoreType, InstanceIdentifier<T> path, long timeout, TimeUnit timeUnit) {
153 Optional<DeviceTransaction> deviceTxOpt;
155 deviceTxOpt = getDeviceTransaction(deviceId, timeout, timeUnit).get();
156 } catch (InterruptedException | ExecutionException e) {
157 LOG.error("Exception thrown while getting transaction for device {}!", deviceId, e);
158 return Optional.empty();
160 if (deviceTxOpt.isPresent()) {
161 DeviceTransaction deviceTx = deviceTxOpt.get();
163 return deviceTx.read(logicalDatastoreType, path).get(timeout, timeUnit);
164 } catch (InterruptedException | ExecutionException | TimeoutException e) {
165 LOG.error("Exception thrown while reading data from device {}! IID: {}", deviceId, path, e);
167 deviceTx.commit(GET_DATA_SUBMIT_TIMEOUT, GET_DATA_SUBMIT_TIME_UNIT);
170 LOG.error("Could not obtain transaction for device {}!", deviceId);
172 return Optional.empty();
176 public boolean isDeviceMounted(String deviceId) {
177 return getDeviceDataBroker(deviceId).isPresent();
180 public void preDestroy() {
181 checkingExecutor.shutdown();
182 listeningExecutor.shutdown();
185 public long getMaxDurationToSubmitTransaction() {
186 return maxDurationToSubmitTransaction;