Freeze upstream versions
[netvirt.git] / elanmanager / impl / src / main / java / org / opendaylight / netvirt / elan / l2gw / ha / BatchedTransaction.java
1 /*
2  * Copyright (c) 2020 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netvirt.elan.l2gw.ha;
9
10 import com.google.common.util.concurrent.FluentFuture;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import com.google.common.util.concurrent.SettableFuture;
16 import java.util.Collection;
17 import java.util.Map;
18 import java.util.Optional;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.atomic.AtomicInteger;
22 import org.eclipse.jdt.annotation.NonNull;
23 import org.opendaylight.genius.utils.batching.ResourceBatchingManager;
24 import org.opendaylight.mdsal.binding.api.query.QueryExpression;
25 import org.opendaylight.mdsal.binding.api.query.QueryResult;
26 import org.opendaylight.mdsal.binding.util.Datastore;
27 import org.opendaylight.mdsal.binding.util.Datastore.Configuration;
28 import org.opendaylight.mdsal.binding.util.TypedReadWriteTransaction;
29 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
30 import org.opendaylight.netvirt.elan.l2gw.ha.listeners.HAJobScheduler;
31 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
32 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
33 import org.opendaylight.yangtools.yang.binding.DataObject;
34 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 public class BatchedTransaction<D extends Datastore> implements TypedReadWriteTransaction<D> {
39
40     private static final Logger LOG = LoggerFactory.getLogger(BatchedTransaction.class);
41     private static Map<InstanceIdentifier, ListenableFuture<Void>> configInProgress = new ConcurrentHashMap<>();
42     private static Map<InstanceIdentifier, ListenableFuture<Void>> opInProgress = new ConcurrentHashMap<>();
43
44     private Map<InstanceIdentifier, ListenableFuture<Void>> currentOps = new ConcurrentHashMap<>();
45
46     private SettableFuture<Void> result = SettableFuture.create();
47     private boolean updateMetric;
48     private NodeId srcNodeId;
49     private Class<D> type;
50
51     public BatchedTransaction(Class<D> logicalDatastoreType) {
52         this.type = logicalDatastoreType;
53     }
54
55     public ListenableFuture<Void> getResult() {
56         return result;
57     }
58
59     @Override
60     public <T extends DataObject> FluentFuture<Optional<T>> read(InstanceIdentifier<T> instanceIdentifier) {
61         try {
62             return ResourceBatchingManager.getInstance()
63                 .read(getShard().name(), instanceIdentifier);
64         } catch (ExecutionException | InterruptedException e) {
65             LOG.error("BatchTxn failed to Read {}", instanceIdentifier);
66         }
67         return FluentFutures.immediateFailedFluentFuture(new Throwable());
68     }
69
70     @Override
71     public FluentFuture<Boolean> exists(InstanceIdentifier<?> path) {
72         // NOT SUPPORTED
73         return FluentFutures.immediateFailedFluentFuture(new Throwable());
74     }
75
76     ResourceBatchingManager.ShardResource getShard() {
77         if (Configuration.class.equals(type)) {
78             return ResourceBatchingManager.ShardResource.CONFIG_TOPOLOGY;
79         }
80         return ResourceBatchingManager.ShardResource.OPERATIONAL_TOPOLOGY;
81     }
82
83     public static synchronized <D extends Datastore> void markUpdateInProgress(Class<D> type,
84         InstanceIdentifier instanceIdentifier, ListenableFuture<Void> ft) {
85         markUpdateInProgress(type, instanceIdentifier, ft, "");
86
87     }
88
89     public static synchronized <D extends Datastore> void markUpdateInProgress(Class<D> type,
90         InstanceIdentifier instanceIdentifier,ListenableFuture<Void> ft, String desc) {
91         if (Configuration.class.equals(type)) {
92 //            NodeKey nodeKey = (NodeKey) instanceIdentifier.firstKeyOf(Node.class);
93             configInProgress.put(instanceIdentifier, ft);
94             Futures.addCallback(ft, new FutureCallback<Void>() {
95                 @Override
96                 public void onSuccess(Void result) {
97                     configInProgress.remove(instanceIdentifier);
98                 }
99
100                 @Override
101                 public void onFailure(Throwable throwable) {
102                     configInProgress.remove(instanceIdentifier);
103                     LOG.error("Failed to update mdsal op {}", instanceIdentifier, throwable);
104                 }
105             }, MoreExecutors.directExecutor());
106         } else {
107             opInProgress.put(instanceIdentifier, ft);
108             Futures.addCallback(ft, new FutureCallback<Void>() {
109                 @Override
110                 public void onSuccess(Void result) {
111                     opInProgress.remove(instanceIdentifier);
112                 }
113
114                 @Override
115                 public void onFailure(Throwable throwable) {
116                     opInProgress.remove(instanceIdentifier);
117                 }
118             }, MoreExecutors.directExecutor());
119         }
120     }
121
122     public static synchronized boolean isInProgress(LogicalDatastoreType logicalDatastoreType,
123                                                     InstanceIdentifier instanceIdentifier) {
124
125         ListenableFuture<Void> ft = getInprogressFt(logicalDatastoreType, instanceIdentifier);
126         return ft != null && !ft.isDone() && !ft.isCancelled();
127     }
128
129     public static synchronized boolean addCallbackIfInProgress(LogicalDatastoreType logicalDatastoreType,
130                                                                InstanceIdentifier instanceIdentifier,
131                                                                Runnable runnable) {
132
133         ListenableFuture<Void> ft = getInprogressFt(logicalDatastoreType, instanceIdentifier);
134         if (ft != null && !ft.isDone() && !ft.isCancelled()) {
135             Futures.addCallback(ft, new FutureCallback<Void>() {
136                 @Override
137                 public void onSuccess(Void result) {
138                     HAJobScheduler.getInstance().submitJob(runnable);
139                 }
140
141                 @Override
142                 public void onFailure(Throwable throwable) {
143                     HAJobScheduler.getInstance().submitJob(runnable);
144                 }
145             }, MoreExecutors.directExecutor());
146             return true;
147         }
148         if (logicalDatastoreType == LogicalDatastoreType.CONFIGURATION) {
149             configInProgress.remove(instanceIdentifier);
150         } else {
151             opInProgress.remove(instanceIdentifier);
152         }
153         return false;
154     }
155
156     static ListenableFuture<Void> getInprogressFt(LogicalDatastoreType logicalDatastoreType,
157                                                   InstanceIdentifier instanceIdentifier) {
158         if (logicalDatastoreType == LogicalDatastoreType.CONFIGURATION) {
159             return configInProgress.get(instanceIdentifier);
160         } else {
161             return opInProgress.get(instanceIdentifier);
162         }
163     }
164
165     public void waitForCompletion() {
166         if (currentOps.isEmpty()) {
167             return;
168         }
169         Collection<ListenableFuture<Void>> fts = currentOps.values();
170         for (ListenableFuture<Void> ft : fts) {
171             try {
172                 ft.get();
173             } catch (InterruptedException | ExecutionException e) {
174                 LOG.error("Failed to get ft result ", e);
175             }
176         }
177     }
178
179     @Override
180     public <T extends DataObject> void put(InstanceIdentifier<T> instanceIdentifier, T dataObj) {
181         ListenableFuture<Void> ft = ResourceBatchingManager.getInstance().put(getShard(),
182                 instanceIdentifier, dataObj);
183         markUpdateInProgress(type, instanceIdentifier, ft);
184         currentOps.put(instanceIdentifier, ft);
185     }
186
187     @Override
188     public <T extends DataObject> void mergeParentStructurePut(InstanceIdentifier<T> path, T data) {
189
190     }
191
192     @Override
193     public <T extends DataObject> void merge(InstanceIdentifier<T> instanceIdentifier, T dataObj) {
194         ListenableFuture<Void> ft = ResourceBatchingManager.getInstance().merge(getShard(),
195                 instanceIdentifier, dataObj);
196         markUpdateInProgress(type, instanceIdentifier, ft);
197         currentOps.put(instanceIdentifier, ft);
198     }
199
200     @Override
201     public <T extends DataObject> void mergeParentStructureMerge(InstanceIdentifier<T> path,
202         T data) {
203         //NOT SUPPORTED
204     }
205
206     public ListenableFuture<Void> getFt(InstanceIdentifier instanceIdentifier) {
207         return currentOps.get(instanceIdentifier);
208     }
209
210     @Override
211     public void delete(InstanceIdentifier<?> instanceIdentifier) {
212         ListenableFuture<Void> ft = ResourceBatchingManager.getInstance().delete(getShard(),
213                 instanceIdentifier);
214         markUpdateInProgress(type, instanceIdentifier, ft);
215         currentOps.put(instanceIdentifier, ft);
216     }
217
218     public ListenableFuture<Void> submit() {
219         if (currentOps.isEmpty()) {
220             return Futures.immediateFuture(null);
221         }
222         Collection<ListenableFuture<Void>> fts = currentOps.values();
223         AtomicInteger waitCount = new AtomicInteger(fts.size());
224         for (ListenableFuture<Void> ft : fts) {
225             Futures.addCallback(ft, new FutureCallback<Void>() {
226                 @Override
227                 public void onSuccess(Void voidResult) {
228                     if (waitCount.decrementAndGet() == 0) {
229                         result.set(null);
230                     }
231                 }
232
233                 @Override
234                 public void onFailure(Throwable throwable) {
235                     result.setException(throwable);
236                 }
237             }, MoreExecutors.directExecutor());
238         }
239         return result;
240     }
241
242     @Override
243     public Object getIdentifier() {
244         return "BatchedTransaction";
245     }
246
247     public void setSrcNodeId(NodeId srcNodeId) {
248         this.srcNodeId = srcNodeId;
249     }
250
251     public NodeId getSrcNodeId() {
252         return srcNodeId;
253     }
254
255     public boolean updateMetric() {
256         return updateMetric;
257     }
258
259     public void updateMetric(Boolean update) {
260         this.updateMetric = update;
261     }
262
263     @Override
264     public <T extends @NonNull DataObject> FluentFuture<QueryResult<T>> execute(QueryExpression<T> query) {
265         throw new UnsupportedOperationException();
266     }
267 }