2 * Copyright (c) 2020 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netvirt.elan.l2gw.ha;
10 import com.google.common.util.concurrent.FluentFuture;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import com.google.common.util.concurrent.SettableFuture;
16 import java.util.Collection;
18 import java.util.Optional;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.atomic.AtomicInteger;
22 import org.opendaylight.genius.utils.batching.ResourceBatchingManager;
23 import org.opendaylight.mdsal.binding.util.Datastore;
24 import org.opendaylight.mdsal.binding.util.Datastore.Configuration;
25 import org.opendaylight.mdsal.binding.util.TypedReadWriteTransaction;
26 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
27 import org.opendaylight.netvirt.elan.l2gw.ha.listeners.HAJobScheduler;
28 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
29 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
30 import org.opendaylight.yangtools.yang.binding.DataObject;
31 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
35 public class BatchedTransaction<D extends Datastore> implements TypedReadWriteTransaction<D> {
37 private static final Logger LOG = LoggerFactory.getLogger(BatchedTransaction.class);
38 private static Map<InstanceIdentifier, ListenableFuture<Void>> configInProgress = new ConcurrentHashMap<>();
39 private static Map<InstanceIdentifier, ListenableFuture<Void>> opInProgress = new ConcurrentHashMap<>();
41 private Map<InstanceIdentifier, ListenableFuture<Void>> currentOps = new ConcurrentHashMap<>();
43 private SettableFuture<Void> result = SettableFuture.create();
44 private boolean updateMetric;
45 private NodeId srcNodeId;
46 private Class<D> type;
48 public BatchedTransaction(Class<D> logicalDatastoreType) {
49 this.type = logicalDatastoreType;
52 public ListenableFuture<Void> getResult() {
57 public <T extends DataObject> FluentFuture<Optional<T>> read(InstanceIdentifier<T> instanceIdentifier) {
59 return ResourceBatchingManager.getInstance()
60 .read(getShard().name(), instanceIdentifier);
61 } catch (ExecutionException | InterruptedException e) {
62 LOG.error("BatchTxn failed to Read {}", instanceIdentifier);
64 return FluentFutures.immediateFailedFluentFuture(new Throwable());
68 public FluentFuture<Boolean> exists(InstanceIdentifier<?> path) {
70 return FluentFutures.immediateFailedFluentFuture(new Throwable());
73 ResourceBatchingManager.ShardResource getShard() {
74 if (Configuration.class.equals(type)) {
75 return ResourceBatchingManager.ShardResource.CONFIG_TOPOLOGY;
77 return ResourceBatchingManager.ShardResource.OPERATIONAL_TOPOLOGY;
80 public static synchronized <D extends Datastore> void markUpdateInProgress(Class<D> type,
81 InstanceIdentifier instanceIdentifier, ListenableFuture<Void> ft) {
82 markUpdateInProgress(type, instanceIdentifier, ft, "");
86 public static synchronized <D extends Datastore> void markUpdateInProgress(Class<D> type,
87 InstanceIdentifier instanceIdentifier,ListenableFuture<Void> ft, String desc) {
88 if (Configuration.class.equals(type)) {
89 // NodeKey nodeKey = (NodeKey) instanceIdentifier.firstKeyOf(Node.class);
90 configInProgress.put(instanceIdentifier, ft);
91 Futures.addCallback(ft, new FutureCallback<Void>() {
93 public void onSuccess(Void result) {
94 configInProgress.remove(instanceIdentifier);
98 public void onFailure(Throwable throwable) {
99 configInProgress.remove(instanceIdentifier);
100 LOG.error("Failed to update mdsal op {}", instanceIdentifier, throwable);
102 }, MoreExecutors.directExecutor());
104 opInProgress.put(instanceIdentifier, ft);
105 Futures.addCallback(ft, new FutureCallback<Void>() {
107 public void onSuccess(Void result) {
108 opInProgress.remove(instanceIdentifier);
112 public void onFailure(Throwable throwable) {
113 opInProgress.remove(instanceIdentifier);
115 }, MoreExecutors.directExecutor());
119 public static synchronized boolean isInProgress(LogicalDatastoreType logicalDatastoreType,
120 InstanceIdentifier instanceIdentifier) {
122 ListenableFuture<Void> ft = getInprogressFt(logicalDatastoreType, instanceIdentifier);
123 return ft != null && !ft.isDone() && !ft.isCancelled();
126 public static synchronized boolean addCallbackIfInProgress(LogicalDatastoreType logicalDatastoreType,
127 InstanceIdentifier instanceIdentifier,
130 ListenableFuture<Void> ft = getInprogressFt(logicalDatastoreType, instanceIdentifier);
131 if (ft != null && !ft.isDone() && !ft.isCancelled()) {
132 Futures.addCallback(ft, new FutureCallback<Void>() {
134 public void onSuccess(Void result) {
135 HAJobScheduler.getInstance().submitJob(runnable);
139 public void onFailure(Throwable throwable) {
140 HAJobScheduler.getInstance().submitJob(runnable);
142 }, MoreExecutors.directExecutor());
145 if (logicalDatastoreType == LogicalDatastoreType.CONFIGURATION) {
146 configInProgress.remove(instanceIdentifier);
148 opInProgress.remove(instanceIdentifier);
153 static ListenableFuture<Void> getInprogressFt(LogicalDatastoreType logicalDatastoreType,
154 InstanceIdentifier instanceIdentifier) {
155 if (logicalDatastoreType == LogicalDatastoreType.CONFIGURATION) {
156 return configInProgress.get(instanceIdentifier);
158 return opInProgress.get(instanceIdentifier);
162 public void waitForCompletion() {
163 if (currentOps.isEmpty()) {
166 Collection<ListenableFuture<Void>> fts = currentOps.values();
167 for (ListenableFuture<Void> ft : fts) {
170 } catch (InterruptedException | ExecutionException e) {
171 LOG.error("Failed to get ft result ", e);
177 public <T extends DataObject> void put(InstanceIdentifier<T> instanceIdentifier, T dataObj) {
178 ListenableFuture<Void> ft = ResourceBatchingManager.getInstance().put(getShard(),
179 instanceIdentifier, dataObj);
180 markUpdateInProgress(type, instanceIdentifier, ft);
181 currentOps.put(instanceIdentifier, ft);
185 public <T extends DataObject> void mergeParentStructurePut(InstanceIdentifier<T> path, T data) {
190 public <T extends DataObject> void merge(InstanceIdentifier<T> instanceIdentifier, T dataObj) {
191 ListenableFuture<Void> ft = ResourceBatchingManager.getInstance().merge(getShard(),
192 instanceIdentifier, dataObj);
193 markUpdateInProgress(type, instanceIdentifier, ft);
194 currentOps.put(instanceIdentifier, ft);
198 public <T extends DataObject> void mergeParentStructureMerge(InstanceIdentifier<T> path,
203 public ListenableFuture<Void> getFt(InstanceIdentifier instanceIdentifier) {
204 return currentOps.get(instanceIdentifier);
208 public void delete(InstanceIdentifier<?> instanceIdentifier) {
209 ListenableFuture<Void> ft = ResourceBatchingManager.getInstance().delete(getShard(),
211 markUpdateInProgress(type, instanceIdentifier, ft);
212 currentOps.put(instanceIdentifier, ft);
215 public ListenableFuture<Void> submit() {
216 if (currentOps.isEmpty()) {
217 return Futures.immediateFuture(null);
219 Collection<ListenableFuture<Void>> fts = currentOps.values();
220 AtomicInteger waitCount = new AtomicInteger(fts.size());
221 for (ListenableFuture<Void> ft : fts) {
222 Futures.addCallback(ft, new FutureCallback<Void>() {
224 public void onSuccess(Void voidResult) {
225 if (waitCount.decrementAndGet() == 0) {
231 public void onFailure(Throwable throwable) {
232 result.setException(throwable);
234 }, MoreExecutors.directExecutor());
240 public Object getIdentifier() {
241 return "BatchedTransaction";
244 public void setSrcNodeId(NodeId srcNodeId) {
245 this.srcNodeId = srcNodeId;
248 public NodeId getSrcNodeId() {
252 public boolean updateMetric() {
256 public void updateMetric(Boolean update) {
257 this.updateMetric = update;