80c4b19d9003ee98370b435916dac9581911fc82
[genius.git] / mdsalutil / mdsalutil-api / src / main / java / org / opendaylight / genius / utils / batching / ResourceBatchingManager.java
1 /*
2  * Copyright (c) 2015 - 2018 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.genius.utils.batching;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.util.concurrent.FluentFuture;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import com.google.common.util.concurrent.SettableFuture;
17 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
18 import java.util.ArrayList;
19 import java.util.HashMap;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.Optional;
23 import java.util.Set;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.ScheduledExecutorService;
29 import java.util.concurrent.TimeUnit;
30 import org.apache.commons.lang3.tuple.ImmutablePair;
31 import org.apache.commons.lang3.tuple.Pair;
32 import org.eclipse.jdt.annotation.NonNull;
33 import org.opendaylight.infrautils.utils.concurrent.Executors;
34 import org.opendaylight.mdsal.binding.api.DataBroker;
35 import org.opendaylight.mdsal.binding.api.ReadTransaction;
36 import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
37 import org.opendaylight.mdsal.binding.api.WriteTransaction;
38 import org.opendaylight.mdsal.common.api.CommitInfo;
39 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
40 import org.opendaylight.mdsal.common.api.ReadFailedException;
41 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
42 import org.opendaylight.yangtools.yang.binding.DataObject;
43 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 /**
48  * This class lets other modules submit their CRUD methods to it. This class
49  * will then supply a single transaction to such CRUD methods of the
50  * subscribers, on which such subscribers write data to that transaction.
51  * Finally the framework attempts to reliably write this single transaction
52  * which represents a batch of an ordered list of entities owned by that subscriber,
53  * to be written/updated/removed from a specific datastore as registered by the subscriber.
54  */
55 public class ResourceBatchingManager implements AutoCloseable {
56
57     private static final Logger LOG = LoggerFactory.getLogger(ResourceBatchingManager.class);
58
59     private static final int INITIAL_DELAY = 3000;
60     private static final TimeUnit TIME_UNIT = TimeUnit.MILLISECONDS;
61
62     private static final int PERIODICITY_IN_MS = 500;
63     private static final int BATCH_SIZE = 1000;
64
65     public enum ShardResource {
66         CONFIG_TOPOLOGY(LogicalDatastoreType.CONFIGURATION),
67         OPERATIONAL_TOPOLOGY(LogicalDatastoreType.OPERATIONAL),
68         CONFIG_INVENTORY(LogicalDatastoreType.CONFIGURATION),
69         OPERATIONAL_INVENTORY(LogicalDatastoreType.OPERATIONAL);
70
71         BlockingQueue<ActionableResource> queue = new LinkedBlockingQueue<>();
72         LogicalDatastoreType datastoreType;
73
74         ShardResource(LogicalDatastoreType datastoreType) {
75             this.datastoreType = datastoreType;
76         }
77
78         public LogicalDatastoreType getDatastoreType() {
79             return datastoreType;
80         }
81
82         BlockingQueue<ActionableResource> getQueue() {
83             return queue;
84         }
85     }
86
87     private final ConcurrentHashMap<String, Pair<BlockingQueue<ActionableResource>, ResourceHandler>>
88             resourceHandlerMapper = new ConcurrentHashMap<>();
89
90     private final ConcurrentHashMap<String, ScheduledExecutorService>
91             resourceBatchingThreadMapper = new ConcurrentHashMap<>();
92
93     private final Map<String, Set<InstanceIdentifier<?>>> pendingModificationByResourceType = new ConcurrentHashMap<>();
94
95     private static ResourceBatchingManager instance;
96
97     static {
98         instance = new ResourceBatchingManager();
99     }
100
101     public static ResourceBatchingManager getInstance() {
102         return instance;
103     }
104
105     @Override
106     public void close() {
107         LOG.trace("ResourceBatchingManager Closed, closing all batched resources");
108         resourceBatchingThreadMapper.values().forEach(ScheduledExecutorService::shutdown);
109     }
110
111     public void registerBatchableResource(
112             String resourceType, final BlockingQueue<ActionableResource> resQueue, final ResourceHandler resHandler) {
113         Preconditions.checkNotNull(resQueue, "ResourceQueue to use for batching cannot not be null.");
114         Preconditions.checkNotNull(resHandler, "ResourceHandler cannot not be null.");
115
116         resourceHandlerMapper.put(resourceType, new ImmutablePair<>(resQueue, resHandler));
117         ScheduledExecutorService resDelegatorService =
118                 Executors.newListeningScheduledThreadPool(1, "ResourceBatchingManager", LOG);
119         resourceBatchingThreadMapper.put(resourceType, resDelegatorService);
120         LOG.info("Registered resourceType {} with batchSize {} and batchInterval {}", resourceType,
121                 resHandler.getBatchSize(), resHandler.getBatchInterval());
122         resDelegatorService.scheduleWithFixedDelay(
123                 new Batcher(resourceType), resHandler.getBatchInterval(), resHandler.getBatchInterval(), TIME_UNIT);
124         pendingModificationByResourceType.putIfAbsent(resourceType, ConcurrentHashMap.newKeySet());
125     }
126
127     public void registerDefaultBatchHandlers(DataBroker broker) {
128         LOG.trace("Registering default batch handlers");
129         Integer batchSize = Integer.getInteger("resource.manager.batch.size", BATCH_SIZE);
130         Integer batchInterval = Integer.getInteger("resource.manager.batch.periodicity.ms", PERIODICITY_IN_MS);
131
132         for (ShardResource shardResource : ShardResource.values()) {
133             if (resourceHandlerMapper.containsKey(shardResource.name())) {
134                 continue;
135             }
136             DefaultBatchHandler batchHandler = new DefaultBatchHandler(broker, shardResource.datastoreType, batchSize,
137                     batchInterval);
138             registerBatchableResource(shardResource.name(), shardResource.getQueue(), batchHandler);
139         }
140     }
141
142     private void beforeModification(String resoureType, InstanceIdentifier<?> iid) {
143         pendingModificationByResourceType.get(resoureType).add(iid);
144     }
145
146     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
147         justification = "https://github.com/spotbugs/spotbugs/issues/811")
148     private void afterModification(String resoureType, InstanceIdentifier<?> iid) {
149         pendingModificationByResourceType.get(resoureType).remove(iid);
150     }
151
152     /**
153      * Reads the identifier of the given resource type.
154      * Not to be used by the applications  which uses their own resource queue
155      *
156      * @param resourceType resource type that was registered with batch manager
157      * @param identifier   identifier to be read
158      * @param <T> DataObject subclass
159      * @return a CheckFuture containing the result of the read
160      */
161     public <T extends DataObject> FluentFuture<Optional<T>> read(
162             String resourceType, InstanceIdentifier<T> identifier) {
163         BlockingQueue<ActionableResource> queue = getQueue(resourceType);
164         if (queue != null) {
165             if (pendingModificationByResourceType.get(resourceType).contains(identifier)) {
166                 SettableFuture<Optional<T>> readFuture = SettableFuture.create();
167                 queue.add(new ActionableReadResource<>(identifier, readFuture));
168                 return FluentFuture.from(Futures.makeChecked(readFuture, ReadFailedException.MAPPER));
169             } else {
170                 ResourceHandler resourceHandler = resourceHandlerMapper.get(resourceType).getRight();
171                 try (ReadTransaction tx = resourceHandler.getResourceBroker().newReadOnlyTransaction()) {
172                     return tx.read(resourceHandler.getDatastoreType(), identifier);
173                 }
174             }
175         }
176
177         return FluentFutures.immediateFailedFluentFuture(new ReadFailedException(
178                 "No batch handler was registered for resource " + resourceType));
179     }
180
181     public ListenableFuture<Void> merge(ShardResource shardResource, InstanceIdentifier<?> identifier,
182                                         DataObject updatedData) {
183         BlockingQueue<ActionableResource> queue = shardResource.getQueue();
184         if (queue != null) {
185             beforeModification(shardResource.name(), identifier);
186             ActionableResource actResource = new ActionableResourceImpl(
187                     identifier, ActionableResource.UPDATE, updatedData, null/*oldData*/);
188             queue.add(actResource);
189             return actResource.getResultFuture();
190         }
191         return Futures
192                 .immediateFailedFuture(new IllegalStateException("Queue missing for provided shardResource "
193                         + shardResource.name()));
194     }
195
196     public void merge(String resourceType, InstanceIdentifier<?> identifier, DataObject updatedData) {
197         BlockingQueue<ActionableResource> queue = getQueue(resourceType);
198         if (queue != null) {
199             beforeModification(resourceType, identifier);
200             ActionableResource actResource = new ActionableResourceImpl(
201                     identifier, ActionableResource.UPDATE, updatedData, null/*oldData*/);
202             queue.add(actResource);
203         }
204     }
205
206     public ListenableFuture<Void> delete(ShardResource shardResource, InstanceIdentifier<?> identifier) {
207         BlockingQueue<ActionableResource> queue = shardResource.getQueue();
208         if (queue != null) {
209             beforeModification(shardResource.name(), identifier);
210             ActionableResource actResource = new ActionableResourceImpl(
211                     identifier, ActionableResource.DELETE, null, null/*oldData*/);
212             queue.add(actResource);
213             return actResource.getResultFuture();
214         }
215         return Futures
216                 .immediateFailedFuture(new IllegalStateException("Queue missing for provided shardResource "
217                         + shardResource.name()));
218     }
219
220     public void delete(String resourceType, InstanceIdentifier<?> identifier) {
221         BlockingQueue<ActionableResource> queue = getQueue(resourceType);
222         if (queue != null) {
223             beforeModification(resourceType, identifier);
224             ActionableResource actResource = new ActionableResourceImpl(
225                     identifier, ActionableResource.DELETE, null, null/*oldData*/);
226             queue.add(actResource);
227         }
228     }
229
230     public ListenableFuture<Void> put(ShardResource shardResource, InstanceIdentifier<?> identifier,
231                                       DataObject updatedData) {
232         BlockingQueue<ActionableResource> queue = shardResource.getQueue();
233         if (queue != null) {
234             beforeModification(shardResource.name(), identifier);
235             ActionableResource actResource = new ActionableResourceImpl(
236                     identifier, ActionableResource.CREATE, updatedData, null/*oldData*/);
237             queue.add(actResource);
238             return actResource.getResultFuture();
239         }
240         return Futures
241                 .immediateFailedFuture(new IllegalStateException("Queue missing for provided shardResource "
242                         + shardResource.name()));
243     }
244
245     public void put(String resourceType, InstanceIdentifier<?> identifier, DataObject updatedData) {
246         BlockingQueue<ActionableResource> queue = getQueue(resourceType);
247         if (queue != null) {
248             beforeModification(resourceType, identifier);
249             ActionableResource actResource = new ActionableResourceImpl(
250                     identifier, ActionableResource.CREATE, updatedData, null/*oldData*/);
251             queue.add(actResource);
252         }
253     }
254
255     private BlockingQueue<ActionableResource> getQueue(String resourceType) {
256         if (resourceHandlerMapper.containsKey(resourceType)) {
257             return resourceHandlerMapper.get(resourceType).getLeft();
258         }
259         return null;
260     }
261
262     public void deregisterBatchableResource(String resourceType) {
263         ScheduledExecutorService scheduledThreadPoolExecutor = resourceBatchingThreadMapper.get(resourceType);
264         if (scheduledThreadPoolExecutor != null) {
265             scheduledThreadPoolExecutor.shutdown();
266         }
267         resourceHandlerMapper.remove(resourceType);
268         resourceBatchingThreadMapper.remove(resourceType);
269     }
270
271     private class Batcher implements Runnable {
272         private final String resourceType;
273
274         Batcher(String resourceType) {
275             this.resourceType = resourceType;
276         }
277
278         @Override
279         public void run() {
280             List<ActionableResource> resList = new ArrayList<>();
281
282             try {
283                 Pair<BlockingQueue<ActionableResource>, ResourceHandler> resMapper =
284                         resourceHandlerMapper.get(resourceType);
285                 if (resMapper == null) {
286                     LOG.error("Unable to find resourceMapper for batching the ResourceType {}", resourceType);
287                     return;
288                 }
289                 BlockingQueue<ActionableResource> resQueue = resMapper.getLeft();
290                 ResourceHandler resHandler = resMapper.getRight();
291                 resList.add(resQueue.take());
292                 resQueue.drainTo(resList);
293
294                 long start = System.currentTimeMillis();
295                 int batchSize = resHandler.getBatchSize();
296
297                 int batches = resList.size() / batchSize;
298                 if (resList.size() > batchSize) {
299                     LOG.info("Batched up resources of size {} into batches {} for resourcetype {}",
300                             resList.size(), batches, resourceType);
301                     for (int i = 0, j = 0; i < batches; j = j + batchSize,i++) {
302                         new MdsalDsTask<>(resourceType, resList.subList(j, j + batchSize)).process();
303                     }
304                     // process remaining routes
305                     LOG.trace("Picked up 1 size {} ", resList.subList(batches * batchSize, resList.size()).size());
306                     new MdsalDsTask<>(resourceType, resList.subList(batches * batchSize, resList.size())).process();
307                 } else {
308                     // process less than OR == batchsize routes
309                     LOG.trace("Picked up 2 size {}", resList.size());
310                     new MdsalDsTask<>(resourceType, resList).process();
311                 }
312
313                 long timetaken = System.currentTimeMillis() - start;
314                 LOG.debug("Total taken ##time = {}ms for resourceList of size {} for resourceType {}",
315                         timetaken, resList.size(), resourceType);
316
317             } catch (InterruptedException e) {
318                 LOG.error("InterruptedException during run()", e);
319             }
320
321         }
322     }
323
324     private class MdsalDsTask<T extends DataObject> {
325         String resourceType;
326         List<ActionableResource> actResourceList;
327
328         MdsalDsTask(String resourceType, List<ActionableResource> actResourceList) {
329             this.resourceType = resourceType;
330             this.actResourceList = actResourceList;
331         }
332
333         @SuppressWarnings("unchecked")
334         public void process() {
335             LOG.trace("Picked up 3 size {} of resourceType {}", actResourceList.size(), resourceType);
336             Pair<BlockingQueue<ActionableResource>, ResourceHandler> resMapper =
337                     resourceHandlerMapper.get(resourceType);
338             if (resMapper == null) {
339                 LOG.error("Unable to find resourceMapper for batching the ResourceType {}", resourceType);
340                 return;
341             }
342             ResourceHandler resHandler = resMapper.getRight();
343             DataBroker broker = resHandler.getResourceBroker();
344             LogicalDatastoreType dsType = resHandler.getDatastoreType();
345             ReadWriteTransaction tx = broker.newReadWriteTransaction();
346             List<SubTransaction> transactionObjects = new ArrayList<>();
347             Map<SubTransaction, SettableFuture<Void>> txMap = new HashMap<>();
348             for (ActionableResource actResource : actResourceList) {
349                 int startSize = transactionObjects.size();
350                 switch (actResource.getAction()) {
351                     case ActionableResource.CREATE:
352                         resHandler.create(tx, dsType, actResource.getInstanceIdentifier(), actResource.getInstance(),
353                                 transactionObjects);
354                         break;
355                     case ActionableResource.UPDATE:
356                         Object updated = actResource.getInstance();
357                         Object original = actResource.getOldInstance();
358                         resHandler.update(tx, dsType, actResource.getInstanceIdentifier(), original,
359                                 updated,transactionObjects);
360                         break;
361                     case ActionableResource.UPDATECONTAINER:
362                         Object updatedContainer = actResource.getInstance();
363                         Object originalContainer = actResource.getOldInstance();
364                         resHandler.updateContainer(tx, dsType, actResource.getInstanceIdentifier(),
365                                 originalContainer, updatedContainer,transactionObjects);
366                         break;
367                     case ActionableResource.DELETE:
368                         resHandler.delete(tx, dsType, actResource.getInstanceIdentifier(), actResource.getInstance(),
369                                 transactionObjects);
370                         break;
371                     case ActionableResource.READ:
372                         ActionableReadResource<DataObject> readAction = (ActionableReadResource<DataObject>)actResource;
373                         ListenableFuture<Optional<DataObject>> future =
374                                 tx.read(dsType, readAction.getInstanceIdentifier());
375                         Futures.addCallback(future, new FutureCallback<Optional<DataObject>>() {
376                             @Override
377                             public void onSuccess(Optional<DataObject> result) {
378                                 readAction.getReadFuture().set(result);
379                             }
380
381                             @Override
382                             public void onFailure(Throwable failure) {
383                                 readAction.getReadFuture().setException(failure);
384                             }
385                         }, MoreExecutors.directExecutor());
386                         break;
387                     default:
388                         LOG.error("Unable to determine Action for ResourceType {} with ResourceKey {}",
389                                 resourceType, actResource);
390                 }
391                 int endSize = transactionObjects.size();
392                 if (endSize > startSize) {
393                     txMap.put(transactionObjects.get(endSize - 1),
394                             (SettableFuture<Void>) actResource.getResultFuture());
395                 }
396             }
397
398
399             long start = System.currentTimeMillis();
400             FluentFuture<? extends @NonNull CommitInfo> futures = tx.commit();
401
402             try {
403                 futures.get();
404                 actResourceList.forEach(actionableResource -> {
405                     ((SettableFuture<Void>) actionableResource.getResultFuture()).set(null);
406                     postCommit(actionableResource.getAction(), actionableResource.getInstanceIdentifier());
407                 });
408                 long time = System.currentTimeMillis() - start;
409                 LOG.trace("##### Time taken for {} = {}ms", actResourceList.size(), time);
410
411             } catch (InterruptedException | ExecutionException e) {
412                 LOG.error("Exception occurred while batch writing to datastore", e);
413                 LOG.info("Trying to submit transaction operations one at a time for resType {}", resourceType);
414                 for (SubTransaction object : transactionObjects) {
415                     WriteTransaction writeTransaction = broker.newWriteOnlyTransaction();
416                     switch (object.getAction()) {
417                         case SubTransaction.CREATE:
418                             writeTransaction.put(dsType, object.getInstanceIdentifier(),
419                                     (DataObject) object.getInstance(), true);
420                             break;
421                         case SubTransaction.DELETE:
422                             writeTransaction.delete(dsType, object.getInstanceIdentifier());
423                             break;
424                         case SubTransaction.UPDATE:
425                             writeTransaction.merge(dsType, object.getInstanceIdentifier(),
426                                     (DataObject) object.getInstance(), true);
427                             break;
428                         default:
429                             LOG.error("Unable to determine Action for transaction object with id {}",
430                                     object.getInstanceIdentifier());
431                     }
432                     FluentFuture<? extends @NonNull CommitInfo> futureOperation = writeTransaction.commit();
433                     try {
434                         futureOperation.get();
435                         if (txMap.containsKey(object)) {
436                             txMap.get(object).set(null);
437                         } else {
438                             LOG.error("Subtx object {} has no Actionable-resource associated with it !! ",
439                                     object.getInstanceIdentifier());
440                         }
441                     } catch (InterruptedException | ExecutionException exception) {
442                         if (txMap.containsKey(object)) {
443                             txMap.get(object).setException(exception);
444                         }
445                         LOG.error("Error {} to datastore (path, data) : ({}, {})", object.getAction(),
446                                 object.getInstanceIdentifier(), object.getInstance(), exception);
447                     } finally {
448                         postCommit(object.getAction(), object.getInstanceIdentifier());
449                     }
450                 }
451             }
452         }
453
454         private void postCommit(int action, InstanceIdentifier iid) {
455             switch (action) {
456                 case ActionableResource.CREATE:
457                 case ActionableResource.UPDATE:
458                 case ActionableResource.DELETE:
459                     afterModification(resourceType, iid);
460                     break;
461                 default:
462                     break;
463             }
464         }
465     }
466
467     private static class ActionableReadResource<T extends DataObject> extends ActionableResourceImpl {
468         private final SettableFuture<Optional<T>> readFuture;
469
470         ActionableReadResource(InstanceIdentifier<T> identifier, SettableFuture<Optional<T>> readFuture) {
471             super(identifier, ActionableResource.READ, null, null);
472             this.readFuture = readFuture;
473         }
474
475         SettableFuture<Optional<T>> getReadFuture() {
476             return readFuture;
477         }
478     }
479 }