Merge "Add functional tests for regenerator type"
[transportpce.git] / common / src / main / java / org / opendaylight / transportpce / common / device / DeviceTransactionManagerImpl.java
1 /*
2  * Copyright © 2017 Orange, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.transportpce.common.device;
10
11 import static java.util.Objects.requireNonNull;
12
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.ListeningExecutorService;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import java.util.Optional;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.concurrent.ConcurrentMap;
21 import java.util.concurrent.CountDownLatch;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.Future;
25 import java.util.concurrent.ScheduledExecutorService;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.TimeoutException;
28 import org.opendaylight.mdsal.binding.api.DataBroker;
29 import org.opendaylight.mdsal.binding.api.MountPoint;
30 import org.opendaylight.mdsal.binding.api.MountPointService;
31 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
32 import org.opendaylight.transportpce.common.InstanceIdentifiers;
33 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
34 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
35 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
36 import org.opendaylight.yangtools.yang.binding.DataObject;
37 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
38 import org.osgi.service.component.annotations.Activate;
39 import org.osgi.service.component.annotations.Component;
40 import org.osgi.service.component.annotations.Deactivate;
41 import org.osgi.service.component.annotations.Reference;
42 import org.osgi.service.metatype.annotations.AttributeDefinition;
43 import org.osgi.service.metatype.annotations.Designate;
44 import org.osgi.service.metatype.annotations.ObjectClassDefinition;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 @Component
49 @Designate(ocd = DeviceTransactionManagerImpl.Configuration.class)
50 public final class DeviceTransactionManagerImpl implements DeviceTransactionManager {
51     @ObjectClassDefinition
52     public @interface Configuration {
53         @AttributeDefinition(description = "Minimum number of threads in the checking pool", min = "0")
54         int checkingMinThreads() default DEFAULT_CHECKING_MIN_THREADS;
55         @AttributeDefinition(description = "Number of threads in the listening pool", min = "1")
56         int listeningThreads() default DEFAULT_LISTENING_THREADS;
57         @AttributeDefinition(description = "Maximum time to wait for transaction submit, in milliseconds", min = "0")
58         long maxDurationToSubmit() default DEFAULT_MAX_DURATION_TO_SUBMIT;
59         @AttributeDefinition(description = "Maximum time to wait for get-data submit, in milliseconds", min = "0")
60         long maxDurationToGetData() default DEFAULT_MAX_DURATION_TO_GET_DATA;
61     }
62
63     // TODO cache device data brokers
64     // TODO remove disconnected devices from maps
65
66     private static final Logger LOG = LoggerFactory.getLogger(DeviceTransactionManagerImpl.class);
67     private static final long DEFAULT_MAX_DURATION_TO_GET_DATA = 3000;
68     private static final long DEFAULT_MAX_DURATION_TO_SUBMIT = 15000;
69     private static final int DEFAULT_CHECKING_MIN_THREADS = 4;
70     private static final int DEFAULT_LISTENING_THREADS = 4;
71
72     private final MountPointService mountPointService;
73     private final ScheduledExecutorService checkingExecutor;
74     private final ListeningExecutorService listeningExecutor;
75     private final ConcurrentMap<String, CountDownLatch> deviceLocks = new ConcurrentHashMap<>();
76     private final long maxDurationToSubmitTransaction;
77     private final long maxDurationToGetData;
78
79     @Activate
80     public DeviceTransactionManagerImpl(@Reference MountPointService mountPointService, Configuration configuration) {
81         this(mountPointService, configuration.maxDurationToSubmit(), configuration.maxDurationToGetData(),
82             configuration.checkingMinThreads(), configuration.listeningThreads());
83     }
84
85     public DeviceTransactionManagerImpl(MountPointService mountPointService, long maxDurationToSubmitTransaction) {
86         this(mountPointService, maxDurationToSubmitTransaction, DEFAULT_MAX_DURATION_TO_GET_DATA,
87             DEFAULT_CHECKING_MIN_THREADS, DEFAULT_LISTENING_THREADS);
88     }
89
90     public DeviceTransactionManagerImpl(MountPointService mountPointService, long maxDurationToSubmitTransaction,
91             long maxDurationToGetData, int checkingPoolMinThreads, int listeningPoolThreads) {
92         this.mountPointService = requireNonNull(mountPointService);
93         this.maxDurationToSubmitTransaction = maxDurationToSubmitTransaction;
94         this.maxDurationToGetData = maxDurationToGetData;
95         this.checkingExecutor = Executors.newScheduledThreadPool(checkingPoolMinThreads);
96         this.listeningExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(listeningPoolThreads));
97     }
98
99     @Override
100     public Future<Optional<DeviceTransaction>> getDeviceTransaction(String deviceId) {
101         return getDeviceTransaction(deviceId, maxDurationToSubmitTransaction, TimeUnit.MILLISECONDS);
102     }
103
104     @Override
105     public Future<Optional<DeviceTransaction>> getDeviceTransaction(String deviceId, long timeoutToSubmit,
106             TimeUnit timeUnit) {
107         CountDownLatch newLock = new CountDownLatch(1);
108         ListenableFuture<Optional<DeviceTransaction>> future = listeningExecutor.submit(() -> {
109             LOG.debug("Starting creation of transaction for device {}.", deviceId);
110             // get current lock from device and set new lock
111             CountDownLatch actualLock = swapActualLock(deviceId, newLock);
112             if (actualLock != null) {
113                 // if lock was present on device wait until it unlocks
114                 actualLock.await();
115             }
116
117             Optional<DataBroker> deviceDataBrokerOpt = getDeviceDataBroker(deviceId);
118             DataBroker deviceDataBroker;
119             if (deviceDataBrokerOpt.isPresent()) {
120                 deviceDataBroker = deviceDataBrokerOpt.get();
121             } else {
122                 newLock.countDown();
123                 return Optional.empty();
124             }
125             LOG.debug("Created transaction for device {}.", deviceId);
126             return Optional.of(new DeviceTransaction(deviceDataBroker.newReadWriteTransaction(), newLock));
127         });
128
129         Futures.addCallback(future, new FutureCallback<Optional<DeviceTransaction>>() {
130             @Override
131             public void onSuccess(Optional<DeviceTransaction> deviceTransactionOptional) {
132                 // creates timeout for transaction to submit right after transaction is created
133                 // if time will run out and transaction was not closed then it will be cancelled (and unlocked)
134                 checkingExecutor.schedule(() -> {
135                     if (deviceTransactionOptional.isPresent()) {
136                         DeviceTransaction deviceTx = deviceTransactionOptional.get();
137                         LOG.debug("Timeout to submit transaction run out! Transaction was {} submitted or canceled.",
138                                 deviceTx.wasSubmittedOrCancelled().get() ? "" : "not");
139                         if (!deviceTx.wasSubmittedOrCancelled().get()) {
140                             LOG.error(
141                                 "Transaction for node {} not submitted/canceled after {} ms. Cancelling transaction.",
142                                 deviceId, timeoutToSubmit);
143                             deviceTx.cancel();
144                         }
145                     }
146                 }, timeoutToSubmit, timeUnit);
147             }
148
149             @Override
150             public void onFailure(Throwable throwable) {
151                 LOG.error("Exception thrown while getting device transaction for device {}! Unlocking device.",
152                         deviceId, throwable);
153                 newLock.countDown();
154             }
155         }, checkingExecutor);
156
157         return future;
158     }
159
160     private synchronized CountDownLatch swapActualLock(String deviceId, CountDownLatch newLock) {
161         return deviceLocks.put(deviceId, newLock);
162     }
163
164     private Optional<DataBroker> getDeviceDataBroker(String deviceId) {
165         Optional<MountPoint> netconfNode = getDeviceMountPoint(deviceId);
166         if (netconfNode.isPresent()) {
167             return netconfNode.get().getService(DataBroker.class);
168         } else {
169             LOG.error("Device mount point not found for : {}", deviceId);
170             return Optional.empty();
171         }
172     }
173
174     @Override
175     public Optional<MountPoint> getDeviceMountPoint(String deviceId) {
176         InstanceIdentifier<Node> netconfNodeIID = InstanceIdentifiers.NETCONF_TOPOLOGY_II.child(Node.class,
177                 new NodeKey(new NodeId(deviceId)));
178         return mountPointService.getMountPoint(netconfNodeIID);
179     }
180
181     @Override
182     public <T extends DataObject> Optional<T> getDataFromDevice(String deviceId,
183             LogicalDatastoreType logicalDatastoreType, InstanceIdentifier<T> path, long timeout, TimeUnit timeUnit) {
184         Optional<DeviceTransaction> deviceTxOpt;
185         try {
186             deviceTxOpt = getDeviceTransaction(deviceId, timeout, timeUnit).get();
187         } catch (InterruptedException | ExecutionException e) {
188             LOG.error("Exception thrown while getting transaction for device {}!", deviceId, e);
189             return Optional.empty();
190         }
191         if (deviceTxOpt.isPresent()) {
192             DeviceTransaction deviceTx = deviceTxOpt.get();
193             try {
194                 return deviceTx.read(logicalDatastoreType, path).get(timeout, timeUnit);
195             } catch (InterruptedException | ExecutionException | TimeoutException e) {
196                 LOG.error("Exception thrown while reading data from device {}! IID: {}", deviceId, path, e);
197             } finally {
198                 deviceTx.commit(maxDurationToGetData, TimeUnit.MILLISECONDS);
199             }
200         } else {
201             LOG.error("Could not obtain transaction for device {}!", deviceId);
202         }
203         return Optional.empty();
204     }
205
206     @Override
207     public boolean isDeviceMounted(String deviceId) {
208         return getDeviceDataBroker(deviceId).isPresent();
209     }
210
211     @Deactivate
212     public void preDestroy() {
213         checkingExecutor.shutdown();
214         listeningExecutor.shutdown();
215     }
216
217     public long getMaxDurationToSubmitTransaction() {
218         return maxDurationToSubmitTransaction;
219     }
220 }