Bulk merge of l2gw changes
[netvirt.git] / elanmanager / impl / src / main / java / org / opendaylight / netvirt / elan / l2gw / ha / BatchedTransaction.java
1 /*
2  * Copyright (c) 2020 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netvirt.elan.l2gw.ha;
9
10 import com.google.common.util.concurrent.FluentFuture;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import com.google.common.util.concurrent.SettableFuture;
16 import java.util.Collection;
17 import java.util.Map;
18 import java.util.Optional;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.atomic.AtomicInteger;
22 import org.opendaylight.genius.utils.batching.ResourceBatchingManager;
23 import org.opendaylight.mdsal.binding.util.Datastore;
24 import org.opendaylight.mdsal.binding.util.Datastore.Configuration;
25 import org.opendaylight.mdsal.binding.util.TypedReadWriteTransaction;
26 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
27 import org.opendaylight.netvirt.elan.l2gw.ha.listeners.HAJobScheduler;
28 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
29 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
30 import org.opendaylight.yangtools.yang.binding.DataObject;
31 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 public class BatchedTransaction<D extends Datastore> implements TypedReadWriteTransaction<D> {
36
37     private static final Logger LOG = LoggerFactory.getLogger(BatchedTransaction.class);
38     private static Map<InstanceIdentifier, ListenableFuture<Void>> configInProgress = new ConcurrentHashMap<>();
39     private static Map<InstanceIdentifier, ListenableFuture<Void>> opInProgress = new ConcurrentHashMap<>();
40
41     private Map<InstanceIdentifier, ListenableFuture<Void>> currentOps = new ConcurrentHashMap<>();
42
43     private SettableFuture<Void> result = SettableFuture.create();
44     private boolean updateMetric;
45     private NodeId srcNodeId;
46     private Class<D> type;
47
48     public BatchedTransaction(Class<D> logicalDatastoreType) {
49         this.type = logicalDatastoreType;
50     }
51
52     public ListenableFuture<Void> getResult() {
53         return result;
54     }
55
56     @Override
57     public <T extends DataObject> FluentFuture<Optional<T>> read(InstanceIdentifier<T> instanceIdentifier) {
58         try {
59             return ResourceBatchingManager.getInstance()
60                 .read(getShard().name(), instanceIdentifier);
61         } catch (ExecutionException | InterruptedException e) {
62             LOG.error("BatchTxn failed to Read {}", instanceIdentifier);
63         }
64         return FluentFutures.immediateFailedFluentFuture(new Throwable());
65     }
66
67     @Override
68     public FluentFuture<Boolean> exists(InstanceIdentifier<?> path) {
69         // NOT SUPPORTED
70         return FluentFutures.immediateFailedFluentFuture(new Throwable());
71     }
72
73     ResourceBatchingManager.ShardResource getShard() {
74         if (Configuration.class.equals(type)) {
75             return ResourceBatchingManager.ShardResource.CONFIG_TOPOLOGY;
76         }
77         return ResourceBatchingManager.ShardResource.OPERATIONAL_TOPOLOGY;
78     }
79
80     public static synchronized <D extends Datastore> void markUpdateInProgress(Class<D> type,
81         InstanceIdentifier instanceIdentifier, ListenableFuture<Void> ft) {
82         markUpdateInProgress(type, instanceIdentifier, ft, "");
83
84     }
85
86     public static synchronized <D extends Datastore> void markUpdateInProgress(Class<D> type,
87         InstanceIdentifier instanceIdentifier,ListenableFuture<Void> ft, String desc) {
88         if (Configuration.class.equals(type)) {
89 //            NodeKey nodeKey = (NodeKey) instanceIdentifier.firstKeyOf(Node.class);
90             configInProgress.put(instanceIdentifier, ft);
91             Futures.addCallback(ft, new FutureCallback<Void>() {
92                 @Override
93                 public void onSuccess(Void result) {
94                     configInProgress.remove(instanceIdentifier);
95                 }
96
97                 @Override
98                 public void onFailure(Throwable throwable) {
99                     configInProgress.remove(instanceIdentifier);
100                     LOG.error("Failed to update mdsal op {}", instanceIdentifier, throwable);
101                 }
102             }, MoreExecutors.directExecutor());
103         } else {
104             opInProgress.put(instanceIdentifier, ft);
105             Futures.addCallback(ft, new FutureCallback<Void>() {
106                 @Override
107                 public void onSuccess(Void result) {
108                     opInProgress.remove(instanceIdentifier);
109                 }
110
111                 @Override
112                 public void onFailure(Throwable throwable) {
113                     opInProgress.remove(instanceIdentifier);
114                 }
115             }, MoreExecutors.directExecutor());
116         }
117     }
118
119     public static synchronized boolean isInProgress(LogicalDatastoreType logicalDatastoreType,
120                                                     InstanceIdentifier instanceIdentifier) {
121
122         ListenableFuture<Void> ft = getInprogressFt(logicalDatastoreType, instanceIdentifier);
123         return ft != null && !ft.isDone() && !ft.isCancelled();
124     }
125
126     public static synchronized boolean addCallbackIfInProgress(LogicalDatastoreType logicalDatastoreType,
127                                                                InstanceIdentifier instanceIdentifier,
128                                                                Runnable runnable) {
129
130         ListenableFuture<Void> ft = getInprogressFt(logicalDatastoreType, instanceIdentifier);
131         if (ft != null && !ft.isDone() && !ft.isCancelled()) {
132             Futures.addCallback(ft, new FutureCallback<Void>() {
133                 @Override
134                 public void onSuccess(Void result) {
135                     HAJobScheduler.getInstance().submitJob(runnable);
136                 }
137
138                 @Override
139                 public void onFailure(Throwable throwable) {
140                     HAJobScheduler.getInstance().submitJob(runnable);
141                 }
142             }, MoreExecutors.directExecutor());
143             return true;
144         }
145         if (logicalDatastoreType == LogicalDatastoreType.CONFIGURATION) {
146             configInProgress.remove(instanceIdentifier);
147         } else {
148             opInProgress.remove(instanceIdentifier);
149         }
150         return false;
151     }
152
153     static ListenableFuture<Void> getInprogressFt(LogicalDatastoreType logicalDatastoreType,
154                                                   InstanceIdentifier instanceIdentifier) {
155         if (logicalDatastoreType == LogicalDatastoreType.CONFIGURATION) {
156             return configInProgress.get(instanceIdentifier);
157         } else {
158             return opInProgress.get(instanceIdentifier);
159         }
160     }
161
162     public void waitForCompletion() {
163         if (currentOps.isEmpty()) {
164             return;
165         }
166         Collection<ListenableFuture<Void>> fts = currentOps.values();
167         for (ListenableFuture<Void> ft : fts) {
168             try {
169                 ft.get();
170             } catch (InterruptedException | ExecutionException e) {
171                 LOG.error("Failed to get ft result ", e);
172             }
173         }
174     }
175
176     @Override
177     public <T extends DataObject> void put(InstanceIdentifier<T> instanceIdentifier, T dataObj) {
178         ListenableFuture<Void> ft = ResourceBatchingManager.getInstance().put(getShard(),
179                 instanceIdentifier, dataObj);
180         markUpdateInProgress(type, instanceIdentifier, ft);
181         currentOps.put(instanceIdentifier, ft);
182     }
183
184     @Override
185     public <T extends DataObject> void mergeParentStructurePut(InstanceIdentifier<T> path, T data) {
186
187     }
188
189     @Override
190     public <T extends DataObject> void merge(InstanceIdentifier<T> instanceIdentifier, T dataObj) {
191         ListenableFuture<Void> ft = ResourceBatchingManager.getInstance().merge(getShard(),
192                 instanceIdentifier, dataObj);
193         markUpdateInProgress(type, instanceIdentifier, ft);
194         currentOps.put(instanceIdentifier, ft);
195     }
196
197     @Override
198     public <T extends DataObject> void mergeParentStructureMerge(InstanceIdentifier<T> path,
199         T data) {
200         //NOT SUPPORTED
201     }
202
203     public ListenableFuture<Void> getFt(InstanceIdentifier instanceIdentifier) {
204         return currentOps.get(instanceIdentifier);
205     }
206
207     @Override
208     public void delete(InstanceIdentifier<?> instanceIdentifier) {
209         ListenableFuture<Void> ft = ResourceBatchingManager.getInstance().delete(getShard(),
210                 instanceIdentifier);
211         markUpdateInProgress(type, instanceIdentifier, ft);
212         currentOps.put(instanceIdentifier, ft);
213     }
214
215     public ListenableFuture<Void> submit() {
216         if (currentOps.isEmpty()) {
217             return Futures.immediateFuture(null);
218         }
219         Collection<ListenableFuture<Void>> fts = currentOps.values();
220         AtomicInteger waitCount = new AtomicInteger(fts.size());
221         for (ListenableFuture<Void> ft : fts) {
222             Futures.addCallback(ft, new FutureCallback<Void>() {
223                 @Override
224                 public void onSuccess(Void voidResult) {
225                     if (waitCount.decrementAndGet() == 0) {
226                         result.set(null);
227                     }
228                 }
229
230                 @Override
231                 public void onFailure(Throwable throwable) {
232                     result.setException(throwable);
233                 }
234             }, MoreExecutors.directExecutor());
235         }
236         return result;
237     }
238
239     @Override
240     public Object getIdentifier() {
241         return "BatchedTransaction";
242     }
243
244     public void setSrcNodeId(NodeId srcNodeId) {
245         this.srcNodeId = srcNodeId;
246     }
247
248     public NodeId getSrcNodeId() {
249         return srcNodeId;
250     }
251
252     public boolean updateMetric() {
253         return updateMetric;
254     }
255
256     public void updateMetric(Boolean update) {
257         this.updateMetric = update;
258     }
259 }