2 * Copyright © 2017 Orange, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.transportpce.common.device;
11 import static java.util.Objects.requireNonNull;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.ListeningExecutorService;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import java.util.Optional;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.concurrent.ConcurrentMap;
21 import java.util.concurrent.CountDownLatch;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.Future;
25 import java.util.concurrent.ScheduledExecutorService;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.TimeoutException;
28 import org.opendaylight.mdsal.binding.api.DataBroker;
29 import org.opendaylight.mdsal.binding.api.MountPoint;
30 import org.opendaylight.mdsal.binding.api.MountPointService;
31 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
32 import org.opendaylight.transportpce.common.InstanceIdentifiers;
33 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
34 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
35 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
36 import org.opendaylight.yangtools.yang.binding.DataObject;
37 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
38 import org.osgi.service.component.annotations.Activate;
39 import org.osgi.service.component.annotations.Component;
40 import org.osgi.service.component.annotations.Deactivate;
41 import org.osgi.service.component.annotations.Reference;
42 import org.osgi.service.metatype.annotations.AttributeDefinition;
43 import org.osgi.service.metatype.annotations.Designate;
44 import org.osgi.service.metatype.annotations.ObjectClassDefinition;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
49 @Designate(ocd = DeviceTransactionManagerImpl.Configuration.class)
50 public final class DeviceTransactionManagerImpl implements DeviceTransactionManager {
51 @ObjectClassDefinition
52 public @interface Configuration {
53 @AttributeDefinition(description = "Minimum number of threads in the checking pool", min = "0")
54 int checkingMinThreads() default DEFAULT_CHECKING_MIN_THREADS;
55 @AttributeDefinition(description = "Number of threads in the listening pool", min = "1")
56 int listeningThreads() default DEFAULT_LISTENING_THREADS;
57 @AttributeDefinition(description = "Maximum time to wait for transaction submit, in milliseconds", min = "0")
58 long maxDurationToSubmit() default DEFAULT_MAX_DURATION_TO_SUBMIT;
59 @AttributeDefinition(description = "Maximum time to wait for get-data submit, in milliseconds", min = "0")
60 long maxDurationToGetData() default DEFAULT_MAX_DURATION_TO_GET_DATA;
63 // TODO cache device data brokers
64 // TODO remove disconnected devices from maps
66 private static final Logger LOG = LoggerFactory.getLogger(DeviceTransactionManagerImpl.class);
67 private static final long DEFAULT_MAX_DURATION_TO_GET_DATA = 3000;
68 private static final long DEFAULT_MAX_DURATION_TO_SUBMIT = 15000;
69 private static final int DEFAULT_CHECKING_MIN_THREADS = 4;
70 private static final int DEFAULT_LISTENING_THREADS = 4;
72 private final MountPointService mountPointService;
73 private final ScheduledExecutorService checkingExecutor;
74 private final ListeningExecutorService listeningExecutor;
75 private final ConcurrentMap<String, CountDownLatch> deviceLocks = new ConcurrentHashMap<>();
76 private final long maxDurationToSubmitTransaction;
77 private final long maxDurationToGetData;
80 public DeviceTransactionManagerImpl(@Reference MountPointService mountPointService, Configuration configuration) {
81 this(mountPointService, configuration.maxDurationToSubmit(), configuration.maxDurationToGetData(),
82 configuration.checkingMinThreads(), configuration.listeningThreads());
85 public DeviceTransactionManagerImpl(MountPointService mountPointService, long maxDurationToSubmitTransaction) {
86 this(mountPointService, maxDurationToSubmitTransaction, DEFAULT_MAX_DURATION_TO_GET_DATA,
87 DEFAULT_CHECKING_MIN_THREADS, DEFAULT_LISTENING_THREADS);
90 public DeviceTransactionManagerImpl(MountPointService mountPointService, long maxDurationToSubmitTransaction,
91 long maxDurationToGetData, int checkingPoolMinThreads, int listeningPoolThreads) {
92 this.mountPointService = requireNonNull(mountPointService);
93 this.maxDurationToSubmitTransaction = maxDurationToSubmitTransaction;
94 this.maxDurationToGetData = maxDurationToGetData;
95 this.checkingExecutor = Executors.newScheduledThreadPool(checkingPoolMinThreads);
96 this.listeningExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(listeningPoolThreads));
100 public Future<Optional<DeviceTransaction>> getDeviceTransaction(String deviceId) {
101 return getDeviceTransaction(deviceId, maxDurationToSubmitTransaction, TimeUnit.MILLISECONDS);
105 public Future<Optional<DeviceTransaction>> getDeviceTransaction(String deviceId, long timeoutToSubmit,
107 CountDownLatch newLock = new CountDownLatch(1);
108 ListenableFuture<Optional<DeviceTransaction>> future = listeningExecutor.submit(() -> {
109 LOG.debug("Starting creation of transaction for device {}.", deviceId);
110 // get current lock from device and set new lock
111 CountDownLatch actualLock = swapActualLock(deviceId, newLock);
112 if (actualLock != null) {
113 // if lock was present on device wait until it unlocks
117 Optional<DataBroker> deviceDataBrokerOpt = getDeviceDataBroker(deviceId);
118 DataBroker deviceDataBroker;
119 if (deviceDataBrokerOpt.isPresent()) {
120 deviceDataBroker = deviceDataBrokerOpt.get();
123 return Optional.empty();
125 LOG.debug("Created transaction for device {}.", deviceId);
126 return Optional.of(new DeviceTransaction(deviceDataBroker.newReadWriteTransaction(), newLock));
129 Futures.addCallback(future, new FutureCallback<Optional<DeviceTransaction>>() {
131 public void onSuccess(Optional<DeviceTransaction> deviceTransactionOptional) {
132 // creates timeout for transaction to submit right after transaction is created
133 // if time will run out and transaction was not closed then it will be cancelled (and unlocked)
134 checkingExecutor.schedule(() -> {
135 if (deviceTransactionOptional.isPresent()) {
136 DeviceTransaction deviceTx = deviceTransactionOptional.get();
137 LOG.debug("Timeout to submit transaction run out! Transaction was {} submitted or canceled.",
138 deviceTx.wasSubmittedOrCancelled().get() ? "" : "not");
139 if (!deviceTx.wasSubmittedOrCancelled().get()) {
141 "Transaction for node {} not submitted/canceled after {} ms. Cancelling transaction.",
142 deviceId, timeoutToSubmit);
146 }, timeoutToSubmit, timeUnit);
150 public void onFailure(Throwable throwable) {
151 LOG.error("Exception thrown while getting device transaction for device {}! Unlocking device.",
152 deviceId, throwable);
155 }, checkingExecutor);
160 private synchronized CountDownLatch swapActualLock(String deviceId, CountDownLatch newLock) {
161 return deviceLocks.put(deviceId, newLock);
164 private Optional<DataBroker> getDeviceDataBroker(String deviceId) {
165 Optional<MountPoint> netconfNode = getDeviceMountPoint(deviceId);
166 if (netconfNode.isPresent()) {
167 return netconfNode.get().getService(DataBroker.class);
169 LOG.error("Device mount point not found for : {}", deviceId);
170 return Optional.empty();
175 public Optional<MountPoint> getDeviceMountPoint(String deviceId) {
176 InstanceIdentifier<Node> netconfNodeIID = InstanceIdentifiers.NETCONF_TOPOLOGY_II.child(Node.class,
177 new NodeKey(new NodeId(deviceId)));
178 return mountPointService.getMountPoint(netconfNodeIID);
182 public <T extends DataObject> Optional<T> getDataFromDevice(String deviceId,
183 LogicalDatastoreType logicalDatastoreType, InstanceIdentifier<T> path, long timeout, TimeUnit timeUnit) {
184 Optional<DeviceTransaction> deviceTxOpt;
186 deviceTxOpt = getDeviceTransaction(deviceId, timeout, timeUnit).get();
187 } catch (InterruptedException | ExecutionException e) {
188 LOG.error("Exception thrown while getting transaction for device {}!", deviceId, e);
189 return Optional.empty();
191 if (deviceTxOpt.isPresent()) {
192 DeviceTransaction deviceTx = deviceTxOpt.get();
194 return deviceTx.read(logicalDatastoreType, path).get(timeout, timeUnit);
195 } catch (InterruptedException | ExecutionException | TimeoutException e) {
196 LOG.error("Exception thrown while reading data from device {}! IID: {}", deviceId, path, e);
198 deviceTx.commit(maxDurationToGetData, TimeUnit.MILLISECONDS);
201 LOG.error("Could not obtain transaction for device {}!", deviceId);
203 return Optional.empty();
207 public boolean isDeviceMounted(String deviceId) {
208 return getDeviceDataBroker(deviceId).isPresent();
212 public void preDestroy() {
213 checkingExecutor.shutdown();
214 listeningExecutor.shutdown();
217 public long getMaxDurationToSubmitTransaction() {
218 return maxDurationToSubmitTransaction;