2 * Copyright (c) 2020 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netvirt.elan.l2gw.ha;
10 import com.google.common.util.concurrent.FluentFuture;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import com.google.common.util.concurrent.SettableFuture;
16 import java.util.Collection;
18 import java.util.Optional;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.atomic.AtomicInteger;
22 import org.eclipse.jdt.annotation.NonNull;
23 import org.opendaylight.genius.utils.batching.ResourceBatchingManager;
24 import org.opendaylight.mdsal.binding.api.query.QueryExpression;
25 import org.opendaylight.mdsal.binding.api.query.QueryResult;
26 import org.opendaylight.mdsal.binding.util.Datastore;
27 import org.opendaylight.mdsal.binding.util.Datastore.Configuration;
28 import org.opendaylight.mdsal.binding.util.TypedReadWriteTransaction;
29 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
30 import org.opendaylight.netvirt.elan.l2gw.ha.listeners.HAJobScheduler;
31 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
32 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
33 import org.opendaylight.yangtools.yang.binding.DataObject;
34 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
38 public class BatchedTransaction<D extends Datastore> implements TypedReadWriteTransaction<D> {
40 private static final Logger LOG = LoggerFactory.getLogger(BatchedTransaction.class);
41 private static Map<InstanceIdentifier, ListenableFuture<Void>> configInProgress = new ConcurrentHashMap<>();
42 private static Map<InstanceIdentifier, ListenableFuture<Void>> opInProgress = new ConcurrentHashMap<>();
44 private Map<InstanceIdentifier, ListenableFuture<Void>> currentOps = new ConcurrentHashMap<>();
46 private SettableFuture<Void> result = SettableFuture.create();
47 private boolean updateMetric;
48 private NodeId srcNodeId;
49 private Class<D> type;
51 public BatchedTransaction(Class<D> logicalDatastoreType) {
52 this.type = logicalDatastoreType;
55 public ListenableFuture<Void> getResult() {
60 public <T extends DataObject> FluentFuture<Optional<T>> read(InstanceIdentifier<T> instanceIdentifier) {
62 return ResourceBatchingManager.getInstance()
63 .read(getShard().name(), instanceIdentifier);
64 } catch (ExecutionException | InterruptedException e) {
65 LOG.error("BatchTxn failed to Read {}", instanceIdentifier);
67 return FluentFutures.immediateFailedFluentFuture(new Throwable());
71 public FluentFuture<Boolean> exists(InstanceIdentifier<?> path) {
73 return FluentFutures.immediateFailedFluentFuture(new Throwable());
76 ResourceBatchingManager.ShardResource getShard() {
77 if (Configuration.class.equals(type)) {
78 return ResourceBatchingManager.ShardResource.CONFIG_TOPOLOGY;
80 return ResourceBatchingManager.ShardResource.OPERATIONAL_TOPOLOGY;
83 public static synchronized <D extends Datastore> void markUpdateInProgress(Class<D> type,
84 InstanceIdentifier instanceIdentifier, ListenableFuture<Void> ft) {
85 markUpdateInProgress(type, instanceIdentifier, ft, "");
89 public static synchronized <D extends Datastore> void markUpdateInProgress(Class<D> type,
90 InstanceIdentifier instanceIdentifier,ListenableFuture<Void> ft, String desc) {
91 if (Configuration.class.equals(type)) {
92 // NodeKey nodeKey = (NodeKey) instanceIdentifier.firstKeyOf(Node.class);
93 configInProgress.put(instanceIdentifier, ft);
94 Futures.addCallback(ft, new FutureCallback<Void>() {
96 public void onSuccess(Void result) {
97 configInProgress.remove(instanceIdentifier);
101 public void onFailure(Throwable throwable) {
102 configInProgress.remove(instanceIdentifier);
103 LOG.error("Failed to update mdsal op {}", instanceIdentifier, throwable);
105 }, MoreExecutors.directExecutor());
107 opInProgress.put(instanceIdentifier, ft);
108 Futures.addCallback(ft, new FutureCallback<Void>() {
110 public void onSuccess(Void result) {
111 opInProgress.remove(instanceIdentifier);
115 public void onFailure(Throwable throwable) {
116 opInProgress.remove(instanceIdentifier);
118 }, MoreExecutors.directExecutor());
122 public static synchronized boolean isInProgress(LogicalDatastoreType logicalDatastoreType,
123 InstanceIdentifier instanceIdentifier) {
125 ListenableFuture<Void> ft = getInprogressFt(logicalDatastoreType, instanceIdentifier);
126 return ft != null && !ft.isDone() && !ft.isCancelled();
129 public static synchronized boolean addCallbackIfInProgress(LogicalDatastoreType logicalDatastoreType,
130 InstanceIdentifier instanceIdentifier,
133 ListenableFuture<Void> ft = getInprogressFt(logicalDatastoreType, instanceIdentifier);
134 if (ft != null && !ft.isDone() && !ft.isCancelled()) {
135 Futures.addCallback(ft, new FutureCallback<Void>() {
137 public void onSuccess(Void result) {
138 HAJobScheduler.getInstance().submitJob(runnable);
142 public void onFailure(Throwable throwable) {
143 HAJobScheduler.getInstance().submitJob(runnable);
145 }, MoreExecutors.directExecutor());
148 if (logicalDatastoreType == LogicalDatastoreType.CONFIGURATION) {
149 configInProgress.remove(instanceIdentifier);
151 opInProgress.remove(instanceIdentifier);
156 static ListenableFuture<Void> getInprogressFt(LogicalDatastoreType logicalDatastoreType,
157 InstanceIdentifier instanceIdentifier) {
158 if (logicalDatastoreType == LogicalDatastoreType.CONFIGURATION) {
159 return configInProgress.get(instanceIdentifier);
161 return opInProgress.get(instanceIdentifier);
165 public void waitForCompletion() {
166 if (currentOps.isEmpty()) {
169 Collection<ListenableFuture<Void>> fts = currentOps.values();
170 for (ListenableFuture<Void> ft : fts) {
173 } catch (InterruptedException | ExecutionException e) {
174 LOG.error("Failed to get ft result ", e);
180 public <T extends DataObject> void put(InstanceIdentifier<T> instanceIdentifier, T dataObj) {
181 ListenableFuture<Void> ft = ResourceBatchingManager.getInstance().put(getShard(),
182 instanceIdentifier, dataObj);
183 markUpdateInProgress(type, instanceIdentifier, ft);
184 currentOps.put(instanceIdentifier, ft);
188 public <T extends DataObject> void mergeParentStructurePut(InstanceIdentifier<T> path, T data) {
193 public <T extends DataObject> void merge(InstanceIdentifier<T> instanceIdentifier, T dataObj) {
194 ListenableFuture<Void> ft = ResourceBatchingManager.getInstance().merge(getShard(),
195 instanceIdentifier, dataObj);
196 markUpdateInProgress(type, instanceIdentifier, ft);
197 currentOps.put(instanceIdentifier, ft);
201 public <T extends DataObject> void mergeParentStructureMerge(InstanceIdentifier<T> path,
206 public ListenableFuture<Void> getFt(InstanceIdentifier instanceIdentifier) {
207 return currentOps.get(instanceIdentifier);
211 public void delete(InstanceIdentifier<?> instanceIdentifier) {
212 ListenableFuture<Void> ft = ResourceBatchingManager.getInstance().delete(getShard(),
214 markUpdateInProgress(type, instanceIdentifier, ft);
215 currentOps.put(instanceIdentifier, ft);
218 public ListenableFuture<Void> submit() {
219 if (currentOps.isEmpty()) {
220 return Futures.immediateFuture(null);
222 Collection<ListenableFuture<Void>> fts = currentOps.values();
223 AtomicInteger waitCount = new AtomicInteger(fts.size());
224 for (ListenableFuture<Void> ft : fts) {
225 Futures.addCallback(ft, new FutureCallback<Void>() {
227 public void onSuccess(Void voidResult) {
228 if (waitCount.decrementAndGet() == 0) {
234 public void onFailure(Throwable throwable) {
235 result.setException(throwable);
237 }, MoreExecutors.directExecutor());
243 public Object getIdentifier() {
244 return "BatchedTransaction";
247 public void setSrcNodeId(NodeId srcNodeId) {
248 this.srcNodeId = srcNodeId;
251 public NodeId getSrcNodeId() {
255 public boolean updateMetric() {
259 public void updateMetric(Boolean update) {
260 this.updateMetric = update;
264 public <T extends @NonNull DataObject> FluentFuture<QueryResult<T>> execute(QueryExpression<T> query) {
265 throw new UnsupportedOperationException();