Optional clean-up
[genius.git] / idmanager / idmanager-impl / src / main / java / org / opendaylight / genius / idmanager / IdManager.java
1 /*
2  * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.genius.idmanager;
10
11 import static java.util.Comparator.comparing;
12 import static java.util.stream.Collectors.toCollection;
13 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
14
15 import com.google.common.base.Optional;
16 import com.google.common.util.concurrent.ListenableFuture;
17
18 import java.util.ArrayList;
19 import java.util.Collections;
20 import java.util.HashMap;
21 import java.util.LinkedList;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Timer;
25 import java.util.TimerTask;
26 import java.util.concurrent.CompletableFuture;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.ConcurrentMap;
29 import java.util.concurrent.CopyOnWriteArrayList;
30 import java.util.concurrent.CountDownLatch;
31 import java.util.concurrent.ExecutionException;
32 import java.util.concurrent.Future;
33 import java.util.concurrent.TimeUnit;
34
35 import javax.annotation.PostConstruct;
36 import javax.annotation.PreDestroy;
37 import javax.inject.Inject;
38 import javax.inject.Singleton;
39
40 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
41 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
42 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
43 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
44 import org.opendaylight.daexim.DataImportBootReady;
45 import org.opendaylight.genius.datastoreutils.DataStoreJobCoordinator;
46 import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
47 import org.opendaylight.genius.idmanager.ReleasedIdHolder.DelayedIdEntry;
48 import org.opendaylight.genius.idmanager.api.IdManagerMonitor;
49 import org.opendaylight.genius.idmanager.jobs.CleanUpJob;
50 import org.opendaylight.genius.idmanager.jobs.IdHolderSyncJob;
51 import org.opendaylight.genius.idmanager.jobs.LocalPoolCreateJob;
52 import org.opendaylight.genius.idmanager.jobs.LocalPoolDeleteJob;
53 import org.opendaylight.genius.idmanager.jobs.UpdateIdEntryJob;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdInput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdOutput;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdOutputBuilder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdRangeInput;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdRangeOutput;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdRangeOutputBuilder;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.CreateIdPoolInput;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.DeleteIdPoolInput;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.IdManagerService;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.IdPools;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.ReleaseIdInput;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.IdPool;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.IdPoolBuilder;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.IdPoolKey;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.AvailableIdsHolder;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.AvailableIdsHolderBuilder;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.ChildPools;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.IdEntries;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.ReleasedIdsHolder;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.ReleasedIdsHolderBuilder;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.released.ids.DelayedIdEntries;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockManagerService;
76 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
77 import org.opendaylight.yangtools.yang.common.OperationFailedException;
78 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
79 import org.opendaylight.yangtools.yang.common.RpcResult;
80 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
81 import org.ops4j.pax.cdi.api.OsgiService;
82 import org.slf4j.Logger;
83 import org.slf4j.LoggerFactory;
84
85 @Singleton
86 public class IdManager implements IdManagerService, IdManagerMonitor {
87
88     private static final Logger LOG = LoggerFactory.getLogger(IdManager.class);
89     private static final long DEFAULT_IDLE_TIME = 24 * 60 * 60;
90
91     private final DataBroker broker;
92     private final SingleTransactionDataBroker singleTxDB;
93     private final LockManagerService lockManager;
94     private final IdUtils idUtils;
95
96     private final ConcurrentMap<String, IdLocalPool> localPool;
97     private final Timer cleanJobTimer = new Timer();
98
99     @Inject
100     public IdManager(DataBroker db, LockManagerService lockManager, IdUtils idUtils,
101             @OsgiService DataImportBootReady dataImportBootReady) throws ReadFailedException {
102         this.broker = db;
103         this.singleTxDB = new SingleTransactionDataBroker(db);
104         this.lockManager = lockManager;
105         this.idUtils = idUtils;
106
107         // NB: We do not "use" the DataImportBootReady, but it's presence in the OSGi
108         // Service Registry is the required "signal" that the Daexim "import on boot"
109         // has fully completed (which we want to wait for).  Therefore, making this
110         // dependent on that defers the Blueprint initialization, as we'd like to,
111         // so that we do not start giving out new IDs before an import went in.
112         // Thus, please DO NOT remove the DataImportBootReady argument, even if
113         // it appears to be (is) un-used from a Java code PoV!
114
115         this.localPool = new ConcurrentHashMap<>();
116         populateCache();
117     }
118
119     @Override
120     public Map<String, String> getLocalPoolsDetails() {
121         Map<String, String> map = new HashMap<>();
122         localPool.entrySet().stream().forEach(entry -> map.put(entry.getKey(), entry.getValue().toString()));
123         return map;
124     }
125
126     @PostConstruct
127     public void start() {
128         LOG.info("{} start", getClass().getSimpleName());
129     }
130
131     @PreDestroy
132     public void close() {
133         LOG.info("{} close", getClass().getSimpleName());
134     }
135
136     private void populateCache() throws ReadFailedException {
137         // If IP changes during reboot, then there will be orphaned child pools.
138         InstanceIdentifier<IdPools> idPoolsInstance = idUtils.getIdPools();
139         Optional<IdPools> idPoolsOptional = singleTxDB.syncReadOptional(CONFIGURATION, idPoolsInstance);
140         if (!idPoolsOptional.isPresent()) {
141             return;
142         }
143         IdPools idPools = idPoolsOptional.get();
144         List<IdPool> idPoolList = idPools.getIdPool();
145         idPoolList
146                 .stream()
147                 .filter(idPool -> idPool.getParentPoolName() != null
148                         && !idPool.getParentPoolName().isEmpty()
149                         && idUtils.getLocalPoolName(idPool.getParentPoolName())
150                                 .equals(idPool.getPoolName()))
151                 .forEach(
152                     idPool -> updateLocalIdPoolCache(idPool,
153                         idPool.getParentPoolName()));
154     }
155
156     public boolean updateLocalIdPoolCache(IdPool idPool, String parentPoolName) {
157         AvailableIdsHolder availableIdsHolder = idPool.getAvailableIdsHolder();
158         AvailableIdHolder availableIdHolder = new AvailableIdHolder(idUtils, availableIdsHolder.getStart(),
159                 availableIdsHolder.getEnd());
160         availableIdHolder.setCur(availableIdsHolder.getCursor());
161         ReleasedIdsHolder releasedIdsHolder = idPool.getReleasedIdsHolder();
162         ReleasedIdHolder releasedIdHolder = new ReleasedIdHolder(idUtils, releasedIdsHolder.getDelayedTimeSec());
163         releasedIdHolder.setAvailableIdCount(releasedIdsHolder.getAvailableIdCount());
164         List<DelayedIdEntries> delayedEntries = releasedIdsHolder.getDelayedIdEntries();
165         List<DelayedIdEntry> delayedIdEntryInCache = new CopyOnWriteArrayList<>();
166         if (delayedEntries != null) {
167             delayedIdEntryInCache = delayedEntries
168                     .stream()
169                     .map(delayedIdEntry -> new DelayedIdEntry(delayedIdEntry
170                             .getId(), delayedIdEntry.getReadyTimeSec()))
171                             .sorted(comparing(DelayedIdEntry::getReadyTimeSec))
172                             .collect(toCollection(CopyOnWriteArrayList::new));
173         }
174         releasedIdHolder.setDelayedEntries(delayedIdEntryInCache);
175
176         IdLocalPool idLocalPool = new IdLocalPool(idUtils, idPool.getPoolName());
177         idLocalPool.setAvailableIds(availableIdHolder);
178         idLocalPool.setReleasedIds(releasedIdHolder);
179         localPool.put(parentPoolName, idLocalPool);
180         if (LOG.isDebugEnabled()) {
181             LOG.debug("Populating cache for {} with {}", idLocalPool.getPoolName(), idLocalPool);
182         }
183         return true;
184     }
185
186     @Override
187     public Future<RpcResult<Void>> createIdPool(CreateIdPoolInput input) {
188         LOG.info("createIdPool called with input {}", input);
189         String poolName = input.getPoolName();
190         long low = input.getLow();
191         long high = input.getHigh();
192         long blockSize = idUtils.computeBlockSize(low, high);
193         Future<RpcResult<Void>> futureResult;
194         try {
195             idUtils.lock(lockManager, poolName);
196             WriteTransaction tx = broker.newWriteOnlyTransaction();
197             poolName = poolName.intern();
198             IdPool idPool;
199             idPool = createGlobalPool(tx, poolName, low, high, blockSize);
200             String localPoolName = idUtils.getLocalPoolName(poolName);
201             IdLocalPool idLocalPool = localPool.get(poolName);
202             if (idLocalPool == null) {
203                 createLocalPool(tx, localPoolName, idPool);
204                 idUtils.updateChildPool(tx, idPool.getPoolName(), localPoolName);
205             }
206             tx.submit().checkedGet();
207             futureResult = RpcResultBuilder.<Void>success().buildFuture();
208         } catch (OperationFailedException | IdManagerException e) {
209             futureResult = buildFailedRpcResultFuture("createIdPool failed: " + input.toString(), e);
210         } finally {
211             idUtils.unlock(lockManager, poolName);
212         }
213         return futureResult;
214     }
215
216     @Override
217     public Future<RpcResult<AllocateIdOutput>> allocateId(AllocateIdInput input) {
218         LOG.debug("AllocateId called with input {}", input);
219         String idKey = input.getIdKey();
220         String poolName = input.getPoolName();
221         String localPoolName = idUtils.getLocalPoolName(poolName);
222         long newIdValue = -1;
223         AllocateIdOutputBuilder output = new AllocateIdOutputBuilder();
224         Future<RpcResult<AllocateIdOutput>> futureResult;
225         try {
226             //allocateIdFromLocalPool method returns a list of IDs with one element. This element is obtained by get(0)
227             newIdValue = allocateIdFromLocalPool(poolName, localPoolName, idKey, 1).get(0);
228             output.setIdValue(newIdValue);
229             futureResult = RpcResultBuilder.<AllocateIdOutput>success().withResult(output.build()).buildFuture();
230         } catch (OperationFailedException | IdManagerException e) {
231             completeExceptionallyIfPresent(poolName, idKey, e);
232             futureResult = buildFailedRpcResultFuture("allocateId failed: " + input.toString(), e);
233         }
234         return futureResult;
235     }
236
237     private void completeExceptionallyIfPresent(String poolName, String idKey, Exception exception) {
238         CompletableFuture<List<Long>> completableFuture =
239                 idUtils.allocatedIdMap.remove(idUtils.getUniqueKey(poolName, idKey));
240         if (completableFuture != null) {
241             completableFuture.completeExceptionally(exception);
242         }
243     }
244
245     @Override
246     public Future<RpcResult<AllocateIdRangeOutput>> allocateIdRange(AllocateIdRangeInput input) {
247         if (LOG.isDebugEnabled()) {
248             LOG.debug("AllocateIdRange called with input {}", input);
249         }
250         String idKey = input.getIdKey();
251         String poolName = input.getPoolName();
252         long size = input.getSize();
253         String localPoolName = idUtils.getLocalPoolName(poolName);
254         List<Long> newIdValuesList = new ArrayList<>();
255         AllocateIdRangeOutputBuilder output = new AllocateIdRangeOutputBuilder();
256         Future<RpcResult<AllocateIdRangeOutput>> futureResult;
257         try {
258             newIdValuesList = allocateIdFromLocalPool(poolName, localPoolName, idKey, size);
259             Collections.sort(newIdValuesList);
260             output.setIdValues(newIdValuesList);
261             futureResult = RpcResultBuilder.<AllocateIdRangeOutput>success().withResult(output.build()).buildFuture();
262         } catch (OperationFailedException | IdManagerException e) {
263             completeExceptionallyIfPresent(poolName, idKey, e);
264             futureResult = buildFailedRpcResultFuture("allocateIdRange failed: " + input.toString(), e);
265         }
266         return futureResult;
267     }
268
269     @Override
270     public Future<RpcResult<Void>> deleteIdPool(DeleteIdPoolInput input) {
271         if (LOG.isDebugEnabled()) {
272             LOG.debug("DeleteIdPool called with input {}", input);
273         }
274         String poolName = input.getPoolName();
275         Future<RpcResult<Void>> futureResult;
276         try {
277             InstanceIdentifier<IdPool> idPoolToBeDeleted = idUtils.getIdPoolInstance(poolName);
278             poolName = poolName.intern();
279             synchronized (poolName) {
280                 IdPool idPool = singleTxDB.syncRead(CONFIGURATION, idPoolToBeDeleted);
281                 List<ChildPools> childPoolList = idPool.getChildPools();
282                 if (childPoolList != null) {
283                     childPoolList.stream().forEach(childPool -> deletePool(childPool.getChildPoolName()));
284                 }
285                 singleTxDB.syncDelete(CONFIGURATION, idPoolToBeDeleted);
286                 if (LOG.isDebugEnabled()) {
287                     LOG.debug("Deleted id pool {}", poolName);
288                 }
289             }
290             futureResult = RpcResultBuilder.<Void>success().buildFuture();
291         } catch (OperationFailedException e) {
292             futureResult = buildFailedRpcResultFuture("deleteIdPool failed: " + input.toString(), e);
293         }
294         return futureResult;
295     }
296
297     @Override
298     public Future<RpcResult<Void>> releaseId(ReleaseIdInput input) {
299         String poolName = input.getPoolName();
300         String idKey = input.getIdKey();
301         LOG.info("Releasing ID {} from pool {}", idKey, poolName);
302         Future<RpcResult<Void>> futureResult;
303         String uniqueKey = idUtils.getUniqueKey(poolName, idKey);
304         try {
305             idUtils.lock(lockManager, uniqueKey);
306             releaseIdFromLocalPool(poolName, idUtils.getLocalPoolName(poolName), idKey);
307             futureResult = RpcResultBuilder.<Void>success().buildFuture();
308         } catch (ReadFailedException | IdManagerException e) {
309             futureResult = buildFailedRpcResultFuture("releaseId failed: " + input.toString(), e);
310             idUtils.unlock(lockManager, uniqueKey);
311         }
312         return futureResult;
313     }
314
315     private <T> ListenableFuture<RpcResult<T>> buildFailedRpcResultFuture(String msg, Exception exception) {
316         if (exception instanceof IdDoesNotExistException) {
317             // Do not log full stack trace in case ID does not exist
318             LOG.error(msg, exception);
319         } else {
320             LOG.error(msg, exception);
321         }
322         RpcResultBuilder<T> failedRpcResultBuilder = RpcResultBuilder.failed();
323         failedRpcResultBuilder.withError(ErrorType.APPLICATION, msg, exception);
324         if (exception instanceof OperationFailedException) {
325             failedRpcResultBuilder.withRpcErrors(((OperationFailedException) exception).getErrorList());
326         }
327         return failedRpcResultBuilder.buildFuture();
328     }
329
330     private List<Long> allocateIdFromLocalPool(String parentPoolName, String localPoolName,
331             String idKey, long size) throws OperationFailedException, IdManagerException {
332         LOG.debug("Allocating id from local pool {}. Parent pool {}. Idkey {}", localPoolName, parentPoolName, idKey);
333         String uniqueIdKey = idUtils.getUniqueKey(parentPoolName, idKey);
334         CompletableFuture<List<Long>> futureIdValues = new CompletableFuture<>();
335         CompletableFuture<List<Long>> existingFutureIdValue =
336                 idUtils.allocatedIdMap.putIfAbsent(uniqueIdKey, futureIdValues);
337         if (existingFutureIdValue != null) {
338             try {
339                 return existingFutureIdValue.get();
340             } catch (InterruptedException | ExecutionException e) {
341                 LOG.warn("Could not obtain id from existing futureIdValue for idKey {} and pool {}.",
342                         idKey, parentPoolName);
343                 throw new IdManagerException(e.getMessage(), e);
344             }
345         }
346         try {
347             List<Long> newIdValuesList = checkForIdInIdEntries(parentPoolName, idKey, uniqueIdKey, futureIdValues,
348                     existingFutureIdValue);
349             if (!newIdValuesList.isEmpty()) {
350                 return newIdValuesList;
351             }
352             //This get will not help in concurrent reads. Hence the same read needs to be done again.
353             IdLocalPool localIdPool = getOrCreateLocalIdPool(parentPoolName, localPoolName);
354             LOG.info("Got pool {}", localIdPool);
355             long newIdValue = -1;
356             localPoolName = localPoolName.intern();
357             if (size == 1) {
358                 newIdValue = getIdFromLocalPoolCache(localIdPool, parentPoolName);
359                 newIdValuesList.add(newIdValue);
360             } else {
361                 getRangeOfIds(parentPoolName, localPoolName, size, newIdValuesList, localIdPool, newIdValue);
362             }
363             LOG.info("The newIdValues {} for the idKey {}", newIdValuesList, idKey);
364             idUtils.releaseIdLatchMap.put(uniqueIdKey, new CountDownLatch(1));
365             UpdateIdEntryJob job = new UpdateIdEntryJob(parentPoolName, localPoolName, idKey, newIdValuesList, broker,
366                     idUtils, lockManager);
367             DataStoreJobCoordinator.getInstance().enqueueJob(parentPoolName, job, IdUtils.RETRY_COUNT);
368             futureIdValues.complete(newIdValuesList);
369             return newIdValuesList;
370         } catch (OperationFailedException | IdManagerException e) {
371             idUtils.unlock(lockManager, uniqueIdKey);
372             throw e;
373         }
374     }
375
376     private Long getIdFromLocalPoolCache(IdLocalPool localIdPool, String parentPoolName)
377             throws OperationFailedException, IdManagerException {
378         while (true) {
379             IdHolder releasedIds = localIdPool.getReleasedIds();
380             Optional<Long> releasedId = Optional.absent();
381             releasedId = releasedIds.allocateId();
382             if (releasedId.isPresent()) {
383                 IdHolderSyncJob poolSyncJob =
384                         new IdHolderSyncJob(localIdPool.getPoolName(), localIdPool.getReleasedIds(), broker,
385                                 idUtils);
386                 DataStoreJobCoordinator.getInstance().enqueueJob(localIdPool.getPoolName(),
387                         poolSyncJob, IdUtils.RETRY_COUNT);
388                 return releasedId.get();
389             }
390             Optional<Long> availableId = Optional.absent();
391             IdHolder availableIds = localIdPool.getAvailableIds();
392             if (availableIds != null) {
393                 availableId = availableIds.allocateId();
394                 if (availableId.isPresent()) {
395                     IdHolderSyncJob poolSyncJob =
396                             new IdHolderSyncJob(localIdPool.getPoolName(), localIdPool.getAvailableIds(),
397                                     broker, idUtils);
398                     DataStoreJobCoordinator.getInstance().enqueueJob(localIdPool.getPoolName(),
399                             poolSyncJob, IdUtils.RETRY_COUNT);
400                     return availableId.get();
401                 }
402             }
403             long idCount = getIdBlockFromParentPool(parentPoolName, localIdPool);
404             if (idCount <= 0) {
405                 if (LOG.isDebugEnabled()) {
406                     LOG.debug("Unable to allocate Id block from global pool {}", parentPoolName);
407                 }
408                 throw new IdManagerException(String.format("Ids exhausted for pool : %s", parentPoolName));
409             }
410         }
411     }
412
413     /**
414      * Changes made to availableIds and releasedIds will not be persisted to the datastore.
415      */
416     private long getIdBlockFromParentPool(String parentPoolName, IdLocalPool localIdPool)
417             throws OperationFailedException, IdManagerException {
418         if (LOG.isDebugEnabled()) {
419             LOG.debug("Allocating block of id from parent pool {}", parentPoolName);
420         }
421         InstanceIdentifier<IdPool> idPoolInstanceIdentifier = idUtils.getIdPoolInstance(parentPoolName);
422         parentPoolName = parentPoolName.intern();
423         idUtils.lock(lockManager, parentPoolName);
424         long idCount = 0;
425         try {
426             // Check if the childpool already got id block.
427             long availableIdCount =
428                     localIdPool.getAvailableIds().getAvailableIdCount()
429                             + localIdPool.getReleasedIds().getAvailableIdCount();
430             if (availableIdCount > 0) {
431                 return availableIdCount;
432             }
433             WriteTransaction tx = broker.newWriteOnlyTransaction();
434             IdPool parentIdPool = singleTxDB.syncRead(CONFIGURATION, idPoolInstanceIdentifier);
435             idCount = allocateIdBlockFromParentPool(localIdPool, parentIdPool, tx);
436             tx.submit().checkedGet();
437         } catch (IdManagerException | NullPointerException e) {
438             LOG.error("Error getting id block from parent pool", e);
439         } finally {
440             idUtils.unlock(lockManager, parentPoolName);
441         }
442         return idCount;
443     }
444
445     private long allocateIdBlockFromParentPool(IdLocalPool localPoolCache, IdPool parentIdPool, WriteTransaction tx)
446             throws OperationFailedException, IdManagerException {
447         long idCount = -1;
448         ReleasedIdsHolderBuilder releasedIdsBuilderParent = idUtils.getReleaseIdsHolderBuilder(parentIdPool);
449         while (true) {
450             idCount = allocateIdBlockFromReleasedIdsHolder(localPoolCache, releasedIdsBuilderParent, parentIdPool, tx);
451             if (idCount > 0) {
452                 return idCount;
453             }
454             idCount = allocateIdBlockFromAvailableIdsHolder(localPoolCache, parentIdPool, tx);
455             if (idCount > 0) {
456                 return idCount;
457             }
458             idCount = getIdsFromOtherChildPools(releasedIdsBuilderParent, parentIdPool);
459             if (idCount <= 0) {
460                 if (LOG.isDebugEnabled()) {
461                     LOG.debug("Unable to allocate Id block from global pool");
462                 }
463                 throw new IdManagerException(String.format("Ids exhausted for pool : %s", parentIdPool.getPoolName()));
464             }
465         }
466     }
467
468     private long getIdsFromOtherChildPools(ReleasedIdsHolderBuilder releasedIdsBuilderParent, IdPool parentIdPool)
469             throws OperationFailedException {
470         List<ChildPools> childPoolsList = parentIdPool.getChildPools();
471         // Sorting the child pools on last accessed time so that the pool that
472         // was not accessed for a long time comes first.
473         Collections.sort(childPoolsList, comparing(ChildPools::getLastAccessTime));
474         long currentTime = System.currentTimeMillis() / 1000;
475         for (ChildPools childPools : childPoolsList) {
476             if (childPools.getLastAccessTime() + DEFAULT_IDLE_TIME > currentTime) {
477                 break;
478             }
479             if (!childPools.getChildPoolName().equals(idUtils.getLocalPoolName(parentIdPool.getPoolName()))) {
480                 InstanceIdentifier<IdPool> idPoolInstanceIdentifier = idUtils
481                         .getIdPoolInstance(childPools.getChildPoolName());
482                 IdPool otherChildPool = singleTxDB.syncRead(CONFIGURATION, idPoolInstanceIdentifier);
483                 ReleasedIdsHolderBuilder releasedIds = idUtils.getReleaseIdsHolderBuilder(otherChildPool);
484
485                 List<DelayedIdEntries> delayedIdEntriesChild = releasedIds.getDelayedIdEntries();
486                 List<DelayedIdEntries> delayedIdEntriesParent = releasedIdsBuilderParent.getDelayedIdEntries();
487                 if (delayedIdEntriesParent == null) {
488                     delayedIdEntriesParent = new LinkedList<>();
489                 }
490                 delayedIdEntriesParent.addAll(delayedIdEntriesChild);
491                 delayedIdEntriesChild.removeAll(delayedIdEntriesChild);
492
493                 AvailableIdsHolderBuilder availableIds = idUtils.getAvailableIdsHolderBuilder(otherChildPool);
494                 while (idUtils.isIdAvailable(availableIds)) {
495                     long cursor = availableIds.getCursor() + 1;
496                     delayedIdEntriesParent.add(idUtils.createDelayedIdEntry(cursor, currentTime));
497                     availableIds.setCursor(cursor);
498                 }
499
500                 long totalAvailableIdCount = releasedIds.getDelayedIdEntries().size()
501                         + idUtils.getAvailableIdsCount(availableIds);
502                 long count = releasedIdsBuilderParent.getAvailableIdCount() + totalAvailableIdCount;
503                 releasedIdsBuilderParent.setDelayedIdEntries(delayedIdEntriesParent).setAvailableIdCount(count);
504                 singleTxDB.syncUpdate(CONFIGURATION, idPoolInstanceIdentifier,
505                         new IdPoolBuilder().setKey(new IdPoolKey(otherChildPool.getPoolName()))
506                                 .setAvailableIdsHolder(availableIds.build()).setReleasedIdsHolder(releasedIds.build())
507                                 .build());
508                 return totalAvailableIdCount;
509             }
510         }
511         return 0;
512     }
513
514     private long allocateIdBlockFromReleasedIdsHolder(IdLocalPool localIdPool,
515             ReleasedIdsHolderBuilder releasedIdsBuilderParent, IdPool parentIdPool, WriteTransaction tx) {
516         if (releasedIdsBuilderParent.getAvailableIdCount() == 0) {
517             LOG.debug("Ids unavailable in releasedIds of parent pool {}", parentIdPool);
518             return 0;
519         }
520         List<DelayedIdEntries> delayedIdEntriesParent = releasedIdsBuilderParent.getDelayedIdEntries();
521         int idCount = Math.min(delayedIdEntriesParent.size(), parentIdPool.getBlockSize());
522         List<DelayedIdEntries> idEntriesToBeRemoved = delayedIdEntriesParent.subList(0, idCount);
523         ReleasedIdHolder releasedIds = (ReleasedIdHolder) localIdPool.getReleasedIds();
524         List<DelayedIdEntry> delayedIdEntriesLocalCache = releasedIds.getDelayedEntries();
525         List<DelayedIdEntry> delayedIdEntriesFromParentPool = idEntriesToBeRemoved
526                 .stream()
527                 .map(delayedIdEntry -> new DelayedIdEntry(delayedIdEntry
528                         .getId(), delayedIdEntry.getReadyTimeSec()))
529                 .sorted(comparing(DelayedIdEntry::getReadyTimeSec))
530                 .collect(toCollection(CopyOnWriteArrayList::new));
531         delayedIdEntriesFromParentPool.addAll(delayedIdEntriesLocalCache);
532         releasedIds.setDelayedEntries(delayedIdEntriesFromParentPool);
533         releasedIds.setAvailableIdCount(releasedIds.getAvailableIdCount() + idCount);
534         localIdPool.setReleasedIds(releasedIds);
535         delayedIdEntriesParent.removeAll(idEntriesToBeRemoved);
536         releasedIdsBuilderParent.setDelayedIdEntries(delayedIdEntriesParent);
537         InstanceIdentifier<ReleasedIdsHolder> releasedIdsHolderInstanceIdentifier = InstanceIdentifier
538                 .builder(IdPools.class).child(IdPool.class,
539                         new IdPoolKey(parentIdPool.getPoolName())).child(ReleasedIdsHolder.class).build();
540         releasedIdsBuilderParent.setAvailableIdCount(releasedIdsBuilderParent.getAvailableIdCount() - idCount);
541         LOG.debug("Allocated {} ids from releasedIds of parent pool {}", idCount, parentIdPool);
542         tx.merge(CONFIGURATION, releasedIdsHolderInstanceIdentifier,
543                 releasedIdsBuilderParent.build(), true);
544         return idCount;
545     }
546
547     private long allocateIdBlockFromAvailableIdsHolder(IdLocalPool localIdPool, IdPool parentIdPool,
548             WriteTransaction tx) {
549         long idCount = 0;
550         AvailableIdsHolderBuilder availableIdsBuilderParent = idUtils.getAvailableIdsHolderBuilder(parentIdPool);
551         long end = availableIdsBuilderParent.getEnd();
552         long cur = availableIdsBuilderParent.getCursor();
553         if (!idUtils.isIdAvailable(availableIdsBuilderParent)) {
554             if (LOG.isDebugEnabled()) {
555                 LOG.debug("Ids exhausted in parent pool {}", parentIdPool);
556             }
557             return idCount;
558         }
559         // Update availableIdsHolder of Local Pool
560         idCount = Math.min(end - cur, parentIdPool.getBlockSize());
561         AvailableIdHolder availableIds = new AvailableIdHolder(idUtils, cur + 1, cur + idCount);
562         localIdPool.setAvailableIds(availableIds);
563         // Update availableIdsHolder of Global Pool
564         InstanceIdentifier<AvailableIdsHolder> availableIdsHolderInstanceIdentifier = InstanceIdentifier
565                 .builder(IdPools.class).child(IdPool.class,
566                         new IdPoolKey(parentIdPool.getPoolName())).child(AvailableIdsHolder.class).build();
567         availableIdsBuilderParent.setCursor(cur + idCount);
568         if (LOG.isDebugEnabled()) {
569             LOG.debug("Allocated {} ids from availableIds of global pool {}", idCount, parentIdPool);
570         }
571         tx.merge(CONFIGURATION, availableIdsHolderInstanceIdentifier,
572                 availableIdsBuilderParent.build(), true);
573         return idCount;
574     }
575
576     private void releaseIdFromLocalPool(String parentPoolName, String localPoolName, String idKey)
577             throws ReadFailedException, IdManagerException {
578         String idLatchKey = idUtils.getUniqueKey(parentPoolName, idKey);
579         LOG.debug("Releasing ID {} from pool {}", idKey, localPoolName);
580         CountDownLatch latch = idUtils.releaseIdLatchMap.get(idLatchKey);
581         if (latch != null) {
582             try {
583                 latch.await(10, TimeUnit.SECONDS);
584             } catch (InterruptedException ignored) {
585                 LOG.warn("Thread interrupted while releasing id {} from id pool {}", idKey, parentPoolName);
586             } finally {
587                 idUtils.releaseIdLatchMap.remove(idLatchKey);
588             }
589         }
590         localPoolName = localPoolName.intern();
591         InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier = idUtils.getIdPoolInstance(parentPoolName);
592         IdPool parentIdPool = singleTxDB.syncRead(CONFIGURATION, parentIdPoolInstanceIdentifier);
593         List<IdEntries> idEntries = parentIdPool.getIdEntries();
594         List<IdEntries> newIdEntries = idEntries;
595         if (idEntries == null) {
596             throw new IdDoesNotExistException(parentPoolName, idKey);
597         }
598         InstanceIdentifier<IdEntries> existingId = idUtils.getIdEntry(parentIdPoolInstanceIdentifier, idKey);
599         Optional<IdEntries> existingIdEntryObject = singleTxDB.syncReadOptional(CONFIGURATION, existingId);
600         if (!existingIdEntryObject.isPresent()) {
601             LOG.info("Specified Id key {} does not exist in id pool {}", idKey, parentPoolName);
602             idUtils.unlock(lockManager, idLatchKey);
603             return;
604         }
605         IdEntries existingIdEntry = existingIdEntryObject.get();
606         List<Long> idValuesList = existingIdEntry.getIdValue();
607         IdLocalPool localIdPoolCache = localPool.get(parentPoolName);
608         boolean isRemoved = newIdEntries.remove(existingIdEntry);
609         LOG.debug("The entry {} is removed {}", existingIdEntry, isRemoved);
610         updateDelayedEntriesInLocalCache(idValuesList, parentPoolName, localIdPoolCache);
611         IdHolderSyncJob poolSyncJob = new IdHolderSyncJob(localPoolName, localIdPoolCache.getReleasedIds(), broker,
612                 idUtils);
613         DataStoreJobCoordinator.getInstance().enqueueJob(localPoolName, poolSyncJob, IdUtils.RETRY_COUNT);
614         scheduleCleanUpTask(localIdPoolCache, parentPoolName, parentIdPool.getBlockSize());
615         LOG.debug("Released id ({}, {}) from pool {}", idKey, idValuesList, localPoolName);
616         // Updating id entries in the parent pool. This will be used for restart scenario
617         UpdateIdEntryJob job = new UpdateIdEntryJob(parentPoolName, localPoolName, idKey, null, broker, idUtils,
618                         lockManager);
619         DataStoreJobCoordinator.getInstance().enqueueJob(parentPoolName, job, IdUtils.RETRY_COUNT);
620     }
621
622     private void scheduleCleanUpTask(final IdLocalPool localIdPoolCache,
623             final String parentPoolName, final int blockSize) {
624         TimerTask scheduledTask = new TimerTask() {
625             @Override
626             public void run() {
627                 CleanUpJob job = new CleanUpJob(localIdPoolCache, broker, parentPoolName, blockSize, lockManager,
628                         idUtils);
629                 DataStoreJobCoordinator.getInstance().enqueueJob(localIdPoolCache.getPoolName(), job,
630                         IdUtils.RETRY_COUNT);
631             }
632         };
633         cleanJobTimer.schedule(scheduledTask, IdUtils.DEFAULT_DELAY_TIME * 1000);
634     }
635
636     private IdPool createGlobalPool(WriteTransaction tx, String poolName, long low, long high, long blockSize)
637             throws ReadFailedException {
638         IdPool idPool;
639         InstanceIdentifier<IdPool> idPoolInstanceIdentifier = idUtils.getIdPoolInstance(poolName);
640         Optional<IdPool> existingIdPool = singleTxDB.syncReadOptional(CONFIGURATION, idPoolInstanceIdentifier);
641         if (!existingIdPool.isPresent()) {
642             if (LOG.isDebugEnabled()) {
643                 LOG.debug("Creating new global pool {}", poolName);
644             }
645             idPool = idUtils.createGlobalPool(poolName, low, high, blockSize);
646             tx.put(CONFIGURATION, idPoolInstanceIdentifier, idPool, true);
647         } else {
648             idPool = existingIdPool.get();
649             if (LOG.isDebugEnabled()) {
650                 LOG.debug("GlobalPool exists {}", idPool);
651             }
652         }
653         return idPool;
654     }
655
656     private IdLocalPool createLocalPool(WriteTransaction tx, String localPoolName, IdPool idPool)
657             throws OperationFailedException, IdManagerException {
658         localPoolName = localPoolName.intern();
659         IdLocalPool idLocalPool = new IdLocalPool(idUtils, localPoolName);
660         allocateIdBlockFromParentPool(idLocalPool, idPool, tx);
661         String parentPool = idPool.getPoolName();
662         localPool.put(parentPool, idLocalPool);
663         LocalPoolCreateJob job = new LocalPoolCreateJob(idLocalPool, broker, idPool.getPoolName(),
664                 idPool.getBlockSize(), idUtils);
665         DataStoreJobCoordinator.getInstance().enqueueJob(localPoolName, job, IdUtils.RETRY_COUNT);
666         return idLocalPool;
667     }
668
669     private void deletePool(String poolName) {
670         LocalPoolDeleteJob job = new LocalPoolDeleteJob(poolName, broker, idUtils);
671         DataStoreJobCoordinator.getInstance().enqueueJob(poolName, job, IdUtils.RETRY_COUNT);
672     }
673
674     public void poolDeleted(String parentPoolName, String poolName) {
675         IdLocalPool idLocalPool = localPool.get(parentPoolName);
676         if (idLocalPool != null) {
677             if (idLocalPool.getPoolName().equals(poolName)) {
678                 localPool.remove(parentPoolName);
679             }
680         }
681     }
682
683     private void updateDelayedEntriesInLocalCache(List<Long> idsList, String parentPoolName,
684             IdLocalPool localPoolCache) {
685         for (long idValue : idsList) {
686             localPoolCache.getReleasedIds().addId(idValue);
687         }
688         localPool.put(parentPoolName, localPoolCache);
689     }
690
691     public java.util.Optional<IdLocalPool> getIdLocalPool(String parentPoolName) {
692         return java.util.Optional.ofNullable(localPool.get(parentPoolName)).map(IdLocalPool::deepCopyOf);
693     }
694
695     private List<Long> checkForIdInIdEntries(String parentPoolName, String idKey, String uniqueIdKey,
696             CompletableFuture<List<Long>> futureIdValues, CompletableFuture<List<Long>> existingFutureIdValue)
697             throws IdManagerException, ReadFailedException {
698         InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier = idUtils.getIdPoolInstance(parentPoolName);
699         InstanceIdentifier<IdEntries> existingId = idUtils.getIdEntry(parentIdPoolInstanceIdentifier, idKey);
700         idUtils.lock(lockManager, uniqueIdKey);
701         List<Long> newIdValuesList = new ArrayList<>();
702         Optional<IdEntries> existingIdEntry = singleTxDB.syncReadOptional(CONFIGURATION, existingId);
703         if (existingIdEntry.isPresent()) {
704             newIdValuesList = existingIdEntry.get().getIdValue();
705             LOG.debug("Existing ids {} for the key {} ", newIdValuesList, idKey);
706             // Inform other waiting threads about this new value.
707             futureIdValues.complete(newIdValuesList);
708             // This is to avoid stale entries in the map. If this thread had populated the map,
709             // then the entry should be removed.
710             if (existingFutureIdValue == null) {
711                 idUtils.allocatedIdMap.remove(uniqueIdKey);
712             }
713             idUtils.unlock(lockManager, uniqueIdKey);
714             return newIdValuesList;
715         }
716         return newIdValuesList;
717     }
718
719     private IdLocalPool getOrCreateLocalIdPool(String parentPoolName, String localPoolName)
720             throws IdManagerException, ReadFailedException, OperationFailedException, TransactionCommitFailedException {
721         IdLocalPool localIdPool = localPool.get(parentPoolName);
722         if (localIdPool == null) {
723             idUtils.lock(lockManager, parentPoolName);
724             try {
725                 // Check if a previous thread that got the cluster-wide lock
726                 // first, has created the localPool
727                 if (localPool.get(parentPoolName) == null) {
728                     WriteTransaction tx = broker.newWriteOnlyTransaction();
729                     InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier = idUtils
730                             .getIdPoolInstance(parentPoolName);
731                     IdPool parentIdPool = singleTxDB.syncRead(CONFIGURATION, parentIdPoolInstanceIdentifier);
732                     // Return localIdPool
733                     localIdPool = createLocalPool(tx, localPoolName, parentIdPool);
734                     tx.submit().checkedGet();
735                 } else {
736                     localIdPool = localPool.get(parentPoolName);
737                 }
738             } finally {
739                 idUtils.unlock(lockManager, parentPoolName);
740             }
741         }
742         return localIdPool;
743     }
744
745     private void getRangeOfIds(String parentPoolName, String localPoolName, long size, List<Long> newIdValuesList,
746             IdLocalPool localIdPool, long newIdValue) throws ReadFailedException, IdManagerException {
747         InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier1 = idUtils.getIdPoolInstance(parentPoolName);
748         IdPool parentIdPool = singleTxDB.syncRead(CONFIGURATION, parentIdPoolInstanceIdentifier1);
749         long totalAvailableIdCount = localIdPool.getAvailableIds().getAvailableIdCount()
750                 + localIdPool.getReleasedIds().getAvailableIdCount();
751         AvailableIdsHolderBuilder availableParentIds = idUtils.getAvailableIdsHolderBuilder(parentIdPool);
752         ReleasedIdsHolderBuilder releasedParentIds = idUtils.getReleaseIdsHolderBuilder(parentIdPool);
753         totalAvailableIdCount = totalAvailableIdCount + releasedParentIds.getAvailableIdCount()
754                 + idUtils.getAvailableIdsCount(availableParentIds);
755         if (totalAvailableIdCount > size) {
756             while (size > 0) {
757                 try {
758                     newIdValue = getIdFromLocalPoolCache(localIdPool, parentPoolName);
759                 } catch (OperationFailedException e) {
760                     if (LOG.isDebugEnabled()) {
761                         LOG.debug("Releasing IDs to pool {}", localPoolName);
762                     }
763                     // Releasing the IDs added in newIdValuesList since
764                     // a null list would be returned now, as the
765                     // requested size of list IDs exceeds the number of
766                     // available IDs.
767                     updateDelayedEntriesInLocalCache(newIdValuesList, parentPoolName, localIdPool);
768                 }
769                 newIdValuesList.add(newIdValue);
770                 size--;
771             }
772         } else {
773             throw new IdManagerException(String.format("Ids exhausted for pool : %s", parentPoolName));
774         }
775     }
776 }