mdsalutil-api clean up Checkstyle violations (not enforced yet)
[genius.git] / mdsalutil / mdsalutil-api / src / main / java / org / opendaylight / genius / utils / batching / ResourceBatchingManager.java
1 /*
2  * Copyright (c) 2015 - 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.genius.utils.batching;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.util.concurrent.CheckedFuture;
12 import java.util.ArrayList;
13 import java.util.List;
14 import java.util.concurrent.BlockingQueue;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ExecutionException;
17 import java.util.concurrent.Executors;
18 import java.util.concurrent.ScheduledThreadPoolExecutor;
19 import java.util.concurrent.TimeUnit;
20 import org.apache.commons.lang3.tuple.ImmutablePair;
21 import org.apache.commons.lang3.tuple.Pair;
22 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
23 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
24 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
25 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
26 import org.opendaylight.yangtools.yang.binding.DataObject;
27 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 public class ResourceBatchingManager implements AutoCloseable {
32
33     private static final Logger LOG = LoggerFactory.getLogger(ResourceBatchingManager.class);
34
35     private static final int INITIAL_DELAY = 3000;
36     private static final TimeUnit TIME_UNIT = TimeUnit.MILLISECONDS;
37
38     private final ConcurrentHashMap<String, Pair<BlockingQueue, ResourceHandler>>
39         resourceHandlerMapper = new ConcurrentHashMap<>();
40
41     private final ConcurrentHashMap<String, ScheduledThreadPoolExecutor>
42         resourceBatchingThreadMapper = new ConcurrentHashMap<>();
43
44     private static ResourceBatchingManager instance;
45
46     static {
47         instance = new ResourceBatchingManager();
48     }
49
50     public static ResourceBatchingManager getInstance() {
51         return instance;
52     }
53
54     @Override
55     public void close() throws Exception {
56         LOG.trace("ResourceBatchingManager Closed");
57     }
58
59     public void registerBatchableResource(
60             String resourceType, final BlockingQueue<ActionableResource> resQueue, final ResourceHandler resHandler) {
61         Preconditions.checkNotNull(resQueue, "ResourceQueue to use for batching cannot not be null.");
62         Preconditions.checkNotNull(resHandler, "ResourceHandler cannot not be null.");
63         if (resourceHandlerMapper.contains(resourceType)) {
64             throw new RuntimeException("Resource type already registered");
65         }
66         resourceHandlerMapper.put(resourceType, new ImmutablePair<>(resQueue, resHandler));
67         ScheduledThreadPoolExecutor resDelegatorService = (ScheduledThreadPoolExecutor)
68                 Executors.newScheduledThreadPool(1);
69         resourceBatchingThreadMapper.put(resourceType, resDelegatorService);
70         LOG.info("Registered resourceType {} with batchSize {} and batchInterval {}", resourceType,
71                 resHandler.getBatchSize(), resHandler.getBatchInterval());
72         if (resDelegatorService.getPoolSize() == 0 ) {
73             resDelegatorService.scheduleWithFixedDelay(
74                     new Batcher(resourceType), INITIAL_DELAY, resHandler.getBatchInterval(), TIME_UNIT);
75         }
76     }
77
78     public void put(String resourceType, InstanceIdentifier identifier, DataObject updatedData) {
79         BlockingQueue<ActionableResource> queue = getQueue(resourceType);
80         if (queue != null) {
81             ActionableResource actResource = new ActionableResourceImpl(identifier.toString(),
82                     identifier, ActionableResource.CREATE, updatedData, null/*oldData*/);
83             queue.add(actResource);
84         }
85     }
86
87     public void merge(String resourceType, InstanceIdentifier identifier, DataObject updatedData) {
88         BlockingQueue<ActionableResource> queue = getQueue(resourceType);
89         if (queue != null) {
90             ActionableResource actResource = new ActionableResourceImpl(identifier.toString(),
91                     identifier, ActionableResource.UPDATE, updatedData, null/*oldData*/);
92             queue.add(actResource);
93         }
94     }
95
96     public void delete(String resourceType, InstanceIdentifier identifier) {
97         BlockingQueue<ActionableResource> queue = getQueue(resourceType);
98         if (queue != null) {
99             ActionableResource actResource = new ActionableResourceImpl(identifier.toString(),
100                     identifier, ActionableResource.DELETE, null, null/*oldData*/);
101             queue.add(actResource);
102         }
103     }
104
105     private BlockingQueue<ActionableResource> getQueue(String resourceType) {
106         if (resourceHandlerMapper.containsKey(resourceType)) {
107             return resourceHandlerMapper.get(resourceType).getLeft();
108         }
109         return null;
110     }
111
112
113     public void deregisterBatchableResource(String resourceType) {
114         resourceHandlerMapper.remove(resourceType);
115         resourceBatchingThreadMapper.remove(resourceType);
116     }
117
118     private class Batcher implements Runnable {
119         private final String resourceType;
120
121         Batcher(String resourceType) {
122             this.resourceType = resourceType;
123         }
124
125         @Override
126         public void run() {
127             List<ActionableResource> resList = new ArrayList<>();
128
129             try {
130                 Pair<BlockingQueue, ResourceHandler> resMapper = resourceHandlerMapper.get(resourceType);
131                 if (resMapper == null) {
132                     LOG.error("Unable to find resourceMapper for batching the ResourceType {}", resourceType);
133                     return;
134                 }
135                 BlockingQueue<ActionableResource> resQueue = resMapper.getLeft();
136                 ResourceHandler resHandler = resMapper.getRight();
137                 resList.add(resQueue.take());
138                 resQueue.drainTo(resList);
139
140                 long start = System.currentTimeMillis();
141                 int batchSize = resHandler.getBatchSize();
142
143                 int batches = resList.size() / batchSize;
144                 if (resList.size() > batchSize) {
145                     LOG.info("Batched up resources of size {} into batches {} for resourcetype {}",
146                             resList.size(), batches, resourceType);
147                     for (int i = 0, j = 0; i < batches; j = j + batchSize,i++) {
148                         new MdsalDsTask<>(resourceType, resList.subList(j, j + batchSize)).process();
149                     }
150                     // process remaining routes
151                     LOG.trace("Picked up 1 size {} ", resList.subList(batches * batchSize, resList.size()).size());
152                     new MdsalDsTask<>(resourceType, resList.subList(batches * batchSize, resList.size())).process();
153                 } else {
154                     // process less than OR == batchsize routes
155                     LOG.trace("Picked up 2 size {}", resList.size());
156                     new MdsalDsTask<>(resourceType, resList).process();
157                 }
158
159                 long timetaken = System.currentTimeMillis() - start;
160                 LOG.info("Total taken ##time = {}ms for resourceList of size {} for resourceType {}",
161                         timetaken, resList.size(), resourceType);
162
163             } catch (InterruptedException e) {
164                 LOG.error("InterruptedException during run()", e);
165             }
166
167         }
168     }
169
170     private class MdsalDsTask<T extends DataObject> {
171         String resourceType;
172         List<ActionableResource> actResourceList;
173
174         MdsalDsTask(String resourceType, List<ActionableResource> actResourceList) {
175             this.resourceType = resourceType;
176             this.actResourceList = actResourceList;
177         }
178
179         @SuppressWarnings("checkstyle:IllegalCatch")
180         public void process() {
181             InstanceIdentifier<T> identifier;
182             Object instance;
183             try {
184                 LOG.trace("Picked up 3 size {} of resourceType {}", actResourceList.size(), resourceType);
185                 Pair<BlockingQueue, ResourceHandler> resMapper = resourceHandlerMapper.get(resourceType);
186                 if (resMapper == null) {
187                     LOG.error("Unable to find resourceMapper for batching the ResourceType {}", resourceType);
188                     return;
189                 }
190                 ResourceHandler resHandler = resMapper.getRight();
191                 DataBroker broker = resHandler.getResourceBroker();
192                 LogicalDatastoreType dsType = resHandler.getDatastoreType();
193                 WriteTransaction tx = broker.newWriteOnlyTransaction();
194                 List<SubTransaction> transactionObjects = new ArrayList<>();
195                 for (ActionableResource actResource : actResourceList) {
196                     switch (actResource.getAction()) {
197                         case ActionableResource.CREATE:
198                             identifier = actResource.getInstanceIdentifier();
199                             instance = actResource.getInstance();
200                             resHandler.create(tx, dsType, identifier, instance,transactionObjects);
201                             break;
202                         case ActionableResource.UPDATE:
203                             identifier = actResource.getInstanceIdentifier();
204                             Object updated = actResource.getInstance();
205                             Object original = actResource.getOldInstance();
206                             resHandler.update(tx, dsType, identifier, original, updated,transactionObjects);
207                             break;
208                         case ActionableResource.DELETE:
209                             identifier = actResource.getInstanceIdentifier();
210                             instance = actResource.getInstance();
211                             resHandler.delete(tx, dsType, identifier, instance,transactionObjects);
212                             break;
213                         default:
214                             LOG.error("Unable to determine Action for ResourceType {} with ResourceKey {}",
215                                     resourceType, actResource.getKey());
216                     }
217                 }
218
219                 long start = System.currentTimeMillis();
220                 CheckedFuture<Void, TransactionCommitFailedException> futures = tx.submit();
221
222                 try {
223                     futures.get();
224                     long time = System.currentTimeMillis() - start;
225                     LOG.trace("##### Time taken for {} = {}ms", actResourceList.size(), time);
226
227                 } catch (InterruptedException | ExecutionException e) {
228                     LOG.error("Exception occurred while batch writing to datastore", e);
229                     LOG.info("Trying to submit transaction operations one at a time for resType {}", resourceType);
230                     for (SubTransaction object : transactionObjects) {
231                         WriteTransaction writeTransaction = broker.newWriteOnlyTransaction();
232                         switch (object.getAction()) {
233                             case SubTransaction.CREATE :
234                                 writeTransaction.put(dsType, object.getInstanceIdentifier(),
235                                     (DataObject) object.getInstance(), true);
236                                 break;
237                             case SubTransaction.DELETE :
238                                 writeTransaction.delete(dsType, object.getInstanceIdentifier());
239                                 break;
240                             case SubTransaction.UPDATE :
241                                 writeTransaction.merge(dsType, object.getInstanceIdentifier(),
242                                     (DataObject) object.getInstance(), true);
243                                 break;
244                             default:
245                                 LOG.error("Unable to determine Action for transaction object with id {}",
246                                     object.getInstanceIdentifier());
247                         }
248                         try {
249                             writeTransaction.submit().get();
250                         } catch (InterruptedException | ExecutionException exception) {
251                             LOG.error("Error {} to datastore (path, data) : ({}, {})", object.getAction(),
252                                     object.getInstanceIdentifier(), object.getInstance(), exception);
253                         }
254                     }
255                 }
256
257             } catch (final Exception e) {
258                 LOG.error("Transaction submission failed", e);
259             }
260         }
261     }
262 }