2 * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.genius.idmanager;
10 import static java.util.Comparator.comparing;
11 import static java.util.stream.Collectors.toCollection;
12 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
14 import com.google.common.base.Optional;
15 import com.google.common.util.concurrent.Futures;
16 import java.util.ArrayList;
17 import java.util.Collections;
18 import java.util.HashMap;
19 import java.util.LinkedList;
20 import java.util.List;
22 import java.util.Timer;
23 import java.util.TimerTask;
24 import java.util.concurrent.CompletableFuture;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ConcurrentMap;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.Future;
30 import java.util.concurrent.TimeUnit;
31 import javax.annotation.PostConstruct;
32 import javax.annotation.PreDestroy;
33 import javax.inject.Inject;
34 import javax.inject.Singleton;
35 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
36 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
37 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
38 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
39 import org.opendaylight.daexim.DataImportBootReady;
40 import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
41 import org.opendaylight.genius.idmanager.ReleasedIdHolder.DelayedIdEntry;
42 import org.opendaylight.genius.idmanager.api.IdManagerMonitor;
43 import org.opendaylight.genius.idmanager.jobs.CleanUpJob;
44 import org.opendaylight.genius.idmanager.jobs.IdHolderSyncJob;
45 import org.opendaylight.genius.idmanager.jobs.LocalPoolCreateJob;
46 import org.opendaylight.genius.idmanager.jobs.LocalPoolDeleteJob;
47 import org.opendaylight.genius.idmanager.jobs.UpdateIdEntryJob;
48 import org.opendaylight.genius.infra.FutureRpcResults;
49 import org.opendaylight.genius.infra.FutureRpcResults.LogLevel;
50 import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
51 import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
52 import org.opendaylight.infrautils.jobcoordinator.JobCoordinator;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdInput;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdOutput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdOutputBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdRangeInput;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdRangeOutput;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdRangeOutputBuilder;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.CreateIdPoolInput;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.DeleteIdPoolInput;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.IdManagerService;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.IdPools;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.ReleaseIdInput;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.IdPool;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.IdPoolBuilder;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.IdPoolKey;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.AvailableIdsHolder;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.AvailableIdsHolderBuilder;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.ChildPools;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.IdEntries;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.ReleasedIdsHolder;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.ReleasedIdsHolderBuilder;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.released.ids.DelayedIdEntries;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockManagerService;
75 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
76 import org.opendaylight.yangtools.yang.common.OperationFailedException;
77 import org.opendaylight.yangtools.yang.common.RpcResult;
78 import org.ops4j.pax.cdi.api.OsgiService;
79 import org.slf4j.Logger;
80 import org.slf4j.LoggerFactory;
83 public class IdManager implements IdManagerService, IdManagerMonitor {
85 private static final Logger LOG = LoggerFactory.getLogger(IdManager.class);
86 private static final long DEFAULT_IDLE_TIME = 24 * 60 * 60;
88 private final DataBroker broker;
89 private final ManagedNewTransactionRunner txRunner;
90 private final SingleTransactionDataBroker singleTxDB;
91 private final LockManagerService lockManager;
92 private final IdUtils idUtils;
93 private final JobCoordinator jobCoordinator;
95 private final ConcurrentMap<String, IdLocalPool> localPool;
96 private final Timer cleanJobTimer = new Timer();
99 public IdManager(DataBroker db, LockManagerService lockManager, IdUtils idUtils,
100 @OsgiService DataImportBootReady dataImportBootReady, JobCoordinator jobCoordinator)
101 throws ReadFailedException {
103 this.txRunner = new ManagedNewTransactionRunnerImpl(db);
104 this.singleTxDB = new SingleTransactionDataBroker(db);
105 this.lockManager = lockManager;
106 this.idUtils = idUtils;
107 this.jobCoordinator = jobCoordinator;
109 // NB: We do not "use" the DataImportBootReady, but it's presence in the OSGi
110 // Service Registry is the required "signal" that the Daexim "import on boot"
111 // has fully completed (which we want to wait for). Therefore, making this
112 // dependent on that defers the Blueprint initialization, as we'd like to,
113 // so that we do not start giving out new IDs before an import went in.
114 // Thus, please DO NOT remove the DataImportBootReady argument, even if
115 // it appears to be (is) un-used from a Java code PoV!
117 this.localPool = new ConcurrentHashMap<>();
122 public Map<String, String> getLocalPoolsDetails() {
123 Map<String, String> map = new HashMap<>();
124 localPool.entrySet().stream().forEach(entry -> map.put(entry.getKey(), entry.getValue().toString()));
129 public void start() {
130 LOG.info("{} start", getClass().getSimpleName());
134 public void close() {
135 cleanJobTimer.cancel();
137 LOG.info("{} close", getClass().getSimpleName());
140 private void populateCache() throws ReadFailedException {
141 // If IP changes during reboot, then there will be orphaned child pools.
142 InstanceIdentifier<IdPools> idPoolsInstance = idUtils.getIdPools();
143 Optional<IdPools> idPoolsOptional = singleTxDB.syncReadOptional(CONFIGURATION, idPoolsInstance);
144 if (!idPoolsOptional.isPresent()) {
147 IdPools idPools = idPoolsOptional.get();
148 List<IdPool> idPoolList = idPools.getIdPool();
151 .filter(idPool -> idPool.getParentPoolName() != null
152 && !idPool.getParentPoolName().isEmpty()
153 && idUtils.getLocalPoolName(idPool.getParentPoolName())
154 .equals(idPool.getPoolName()))
156 idPool -> updateLocalIdPoolCache(idPool,
157 idPool.getParentPoolName()));
160 public boolean updateLocalIdPoolCache(IdPool idPool, String parentPoolName) {
161 AvailableIdsHolder availableIdsHolder = idPool.getAvailableIdsHolder();
162 AvailableIdHolder availableIdHolder = new AvailableIdHolder(idUtils, availableIdsHolder.getStart(),
163 availableIdsHolder.getEnd());
164 availableIdHolder.setCur(availableIdsHolder.getCursor());
165 ReleasedIdsHolder releasedIdsHolder = idPool.getReleasedIdsHolder();
166 ReleasedIdHolder releasedIdHolder = new ReleasedIdHolder(idUtils, releasedIdsHolder.getDelayedTimeSec());
167 releasedIdHolder.setAvailableIdCount(releasedIdsHolder.getAvailableIdCount());
168 List<DelayedIdEntries> delayedEntries = releasedIdsHolder.getDelayedIdEntries();
169 List<DelayedIdEntry> delayedIdEntryInCache = delayedEntries
171 .map(delayedIdEntry -> new DelayedIdEntry(delayedIdEntry
172 .getId(), delayedIdEntry.getReadyTimeSec()))
173 .sorted(comparing(DelayedIdEntry::getReadyTimeSec))
174 .collect(toCollection(ArrayList::new));
176 releasedIdHolder.replaceDelayedEntries(delayedIdEntryInCache);
178 IdLocalPool idLocalPool = new IdLocalPool(idUtils, idPool.getPoolName());
179 idLocalPool.setAvailableIds(availableIdHolder);
180 idLocalPool.setReleasedIds(releasedIdHolder);
181 localPool.put(parentPoolName, idLocalPool);
182 if (LOG.isDebugEnabled()) {
183 LOG.debug("Populating cache for {} with {}", idLocalPool.getPoolName(), idLocalPool);
189 public Future<RpcResult<Void>> createIdPool(CreateIdPoolInput input) {
190 LOG.info("createIdPool called with input {}", input);
191 long low = input.getLow();
192 long high = input.getHigh();
193 long blockSize = idUtils.computeBlockSize(low, high);
194 return FutureRpcResults.fromListenableFuture(LOG, "createIdPool", input, () -> {
195 String poolName = input.getPoolName().intern();
197 idUtils.lock(lockManager, poolName);
198 return txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
199 IdPool idPool = createGlobalPool(tx, poolName, low, high, blockSize);
200 String localPoolName = idUtils.getLocalPoolName(poolName);
201 IdLocalPool idLocalPool = localPool.get(poolName);
202 if (idLocalPool == null) {
203 createLocalPool(tx, localPoolName, idPool);
204 idUtils.updateChildPool(tx, idPool.getPoolName(), localPoolName);
208 idUtils.unlock(lockManager, poolName);
214 public Future<RpcResult<AllocateIdOutput>> allocateId(AllocateIdInput input) {
215 String idKey = input.getIdKey();
216 String poolName = input.getPoolName();
217 return FutureRpcResults.fromBuilder(LOG, "allocateId", input, () -> {
218 String localPoolName = idUtils.getLocalPoolName(poolName);
219 // allocateIdFromLocalPool method returns a list of IDs with one element. This element is obtained by get(0)
220 long newIdValue = allocateIdFromLocalPool(poolName, localPoolName, idKey, 1).get(0);
221 return new AllocateIdOutputBuilder().setIdValue(newIdValue);
222 }).onFailure(e -> completeExceptionallyIfPresent(poolName, idKey, e)).build();
225 private void completeExceptionallyIfPresent(String poolName, String idKey, Throwable exception) {
226 CompletableFuture<List<Long>> completableFuture =
227 idUtils.removeAllocatedIds(idUtils.getUniqueKey(poolName, idKey));
228 if (completableFuture != null) {
229 completableFuture.completeExceptionally(exception);
234 public Future<RpcResult<AllocateIdRangeOutput>> allocateIdRange(AllocateIdRangeInput input) {
235 String idKey = input.getIdKey();
236 String poolName = input.getPoolName();
237 long size = input.getSize();
238 String localPoolName = idUtils.getLocalPoolName(poolName);
239 AllocateIdRangeOutputBuilder output = new AllocateIdRangeOutputBuilder();
240 return FutureRpcResults.fromBuilder(LOG, "allocateIdRange", input, () -> {
241 List<Long> newIdValuesList = allocateIdFromLocalPool(poolName, localPoolName, idKey, size);
242 Collections.sort(newIdValuesList);
243 output.setIdValues(newIdValuesList);
245 }).onFailure(e -> completeExceptionallyIfPresent(poolName, idKey, e)).build();
249 public Future<RpcResult<Void>> deleteIdPool(DeleteIdPoolInput input) {
250 return FutureRpcResults.fromListenableFuture(LOG, "deleteIdPool", input, () -> {
251 String poolName = input.getPoolName().intern();
252 InstanceIdentifier<IdPool> idPoolToBeDeleted = idUtils.getIdPoolInstance(poolName);
253 synchronized (poolName) {
254 IdPool idPool = singleTxDB.syncRead(CONFIGURATION, idPoolToBeDeleted);
255 List<ChildPools> childPoolList = idPool.getChildPools();
256 if (childPoolList != null) {
257 childPoolList.stream().forEach(childPool -> deletePool(childPool.getChildPoolName()));
259 singleTxDB.syncDelete(CONFIGURATION, idPoolToBeDeleted);
261 // TODO return the Future from a TBD asyncDelete instead.. BUT check that all callers @CheckReturnValue
262 return Futures.immediateFuture((Void) null);
267 public Future<RpcResult<Void>> releaseId(ReleaseIdInput input) {
268 String poolName = input.getPoolName();
269 String idKey = input.getIdKey();
270 String uniqueKey = idUtils.getUniqueKey(poolName, idKey);
271 return FutureRpcResults.fromListenableFuture(LOG, "releaseId", input, () -> {
272 idUtils.lock(lockManager, uniqueKey);
273 releaseIdFromLocalPool(poolName, idUtils.getLocalPoolName(poolName), idKey);
274 // TODO return the Future from releaseIdFromLocalPool() instead.. check all callers @CheckReturnValue
275 return Futures.immediateFuture((Void) null);
276 }).onFailureLogLevel(LogLevel.NONE).onFailure(e -> {
277 if (e instanceof IdDoesNotExistException) {
278 // Do not log full stack trace in case ID does not exist
279 LOG.error("RPC releaseId() failed due to IdDoesNotExistException; input = {}", input);
281 // But for all other cases do:
282 LOG.error("RPC releaseId() failed; input = {}", input, e);
284 idUtils.unlock(lockManager, uniqueKey);
288 private List<Long> allocateIdFromLocalPool(String parentPoolName, String localPoolName,
289 String idKey, long size) throws OperationFailedException, IdManagerException {
290 LOG.debug("Allocating id from local pool {}. Parent pool {}. Idkey {}", localPoolName, parentPoolName, idKey);
291 String uniqueIdKey = idUtils.getUniqueKey(parentPoolName, idKey);
292 CompletableFuture<List<Long>> futureIdValues = new CompletableFuture<>();
293 CompletableFuture<List<Long>> existingFutureIdValue =
294 idUtils.putAllocatedIdsIfAbsent(uniqueIdKey, futureIdValues);
295 if (existingFutureIdValue != null) {
297 return existingFutureIdValue.get();
298 } catch (InterruptedException | ExecutionException e) {
299 LOG.warn("Could not obtain id from existing futureIdValue for idKey {} and pool {}.",
300 idKey, parentPoolName);
301 throw new IdManagerException(e.getMessage(), e);
305 List<Long> newIdValuesList = checkForIdInIdEntries(parentPoolName, idKey, uniqueIdKey, futureIdValues,
307 if (!newIdValuesList.isEmpty()) {
308 return newIdValuesList;
310 //This get will not help in concurrent reads. Hence the same read needs to be done again.
311 IdLocalPool localIdPool = getOrCreateLocalIdPool(parentPoolName, localPoolName);
312 LOG.debug("Got pool {}", localIdPool);
313 long newIdValue = -1;
314 localPoolName = localPoolName.intern();
316 newIdValue = getIdFromLocalPoolCache(localIdPool, parentPoolName);
317 newIdValuesList.add(newIdValue);
319 getRangeOfIds(parentPoolName, localPoolName, size, newIdValuesList, localIdPool, newIdValue);
321 LOG.debug("The newIdValues {} for the idKey {}", newIdValuesList, idKey);
322 idUtils.putReleaseIdLatch(uniqueIdKey, new CountDownLatch(1));
323 UpdateIdEntryJob job = new UpdateIdEntryJob(parentPoolName, localPoolName, idKey, newIdValuesList, txRunner,
324 idUtils, lockManager);
325 jobCoordinator.enqueueJob(parentPoolName, job, IdUtils.RETRY_COUNT);
326 futureIdValues.complete(newIdValuesList);
327 return newIdValuesList;
328 } catch (OperationFailedException | IdManagerException e) {
329 idUtils.unlock(lockManager, uniqueIdKey);
334 private Long getIdFromLocalPoolCache(IdLocalPool localIdPool, String parentPoolName)
335 throws OperationFailedException, IdManagerException {
337 IdHolder availableIds = localIdPool.getAvailableIds();
338 if (availableIds != null) {
339 Optional<Long> availableId = availableIds.allocateId();
340 if (availableId.isPresent()) {
341 IdHolderSyncJob poolSyncJob =
342 new IdHolderSyncJob(localIdPool.getPoolName(), localIdPool.getAvailableIds(), txRunner,
344 jobCoordinator.enqueueJob(localIdPool.getPoolName(), poolSyncJob, IdUtils.RETRY_COUNT);
345 return availableId.get();
348 IdHolder releasedIds = localIdPool.getReleasedIds();
349 Optional<Long> releasedId = releasedIds.allocateId();
350 if (releasedId.isPresent()) {
351 IdHolderSyncJob poolSyncJob =
352 new IdHolderSyncJob(localIdPool.getPoolName(), localIdPool.getReleasedIds(), txRunner,
354 jobCoordinator.enqueueJob(localIdPool.getPoolName(), poolSyncJob, IdUtils.RETRY_COUNT);
355 return releasedId.get();
357 long idCount = getIdBlockFromParentPool(parentPoolName, localIdPool);
359 if (LOG.isDebugEnabled()) {
360 LOG.debug("Unable to allocate Id block from global pool {}", parentPoolName);
362 throw new IdManagerException(String.format("Ids exhausted for pool : %s", parentPoolName));
368 * Changes made to availableIds and releasedIds will not be persisted to the datastore.
370 private long getIdBlockFromParentPool(String parentPoolName, IdLocalPool localIdPool)
371 throws OperationFailedException, IdManagerException {
372 if (LOG.isDebugEnabled()) {
373 LOG.debug("Allocating block of id from parent pool {}", parentPoolName);
375 InstanceIdentifier<IdPool> idPoolInstanceIdentifier = idUtils.getIdPoolInstance(parentPoolName);
376 parentPoolName = parentPoolName.intern();
377 idUtils.lock(lockManager, parentPoolName);
380 // Check if the childpool already got id block.
381 long availableIdCount =
382 localIdPool.getAvailableIds().getAvailableIdCount()
383 + localIdPool.getReleasedIds().getAvailableIdCount();
384 if (availableIdCount > 0) {
385 return availableIdCount;
387 WriteTransaction tx = broker.newWriteOnlyTransaction();
388 IdPool parentIdPool = singleTxDB.syncRead(CONFIGURATION, idPoolInstanceIdentifier);
389 idCount = allocateIdBlockFromParentPool(localIdPool, parentIdPool, tx);
390 tx.submit().checkedGet();
391 } catch (IdManagerException | NullPointerException e) {
392 LOG.error("Error getting id block from parent pool", e);
394 idUtils.unlock(lockManager, parentPoolName);
399 private long allocateIdBlockFromParentPool(IdLocalPool localPoolCache, IdPool parentIdPool, WriteTransaction tx)
400 throws OperationFailedException, IdManagerException {
402 ReleasedIdsHolderBuilder releasedIdsBuilderParent = idUtils.getReleaseIdsHolderBuilder(parentIdPool);
404 idCount = allocateIdBlockFromAvailableIdsHolder(localPoolCache, parentIdPool, tx);
408 idCount = allocateIdBlockFromReleasedIdsHolder(localPoolCache, releasedIdsBuilderParent, parentIdPool, tx);
412 idCount = getIdsFromOtherChildPools(releasedIdsBuilderParent, parentIdPool);
414 if (LOG.isDebugEnabled()) {
415 LOG.debug("Unable to allocate Id block from global pool");
417 throw new IdManagerException(String.format("Ids exhausted for pool : %s", parentIdPool.getPoolName()));
422 private long getIdsFromOtherChildPools(ReleasedIdsHolderBuilder releasedIdsBuilderParent, IdPool parentIdPool)
423 throws OperationFailedException {
424 List<ChildPools> childPoolsList = parentIdPool.getChildPools();
425 // Sorting the child pools on last accessed time so that the pool that
426 // was not accessed for a long time comes first.
427 Collections.sort(childPoolsList, comparing(ChildPools::getLastAccessTime));
428 long currentTime = System.currentTimeMillis() / 1000;
429 for (ChildPools childPools : childPoolsList) {
430 if (childPools.getLastAccessTime() + DEFAULT_IDLE_TIME > currentTime) {
433 if (!childPools.getChildPoolName().equals(idUtils.getLocalPoolName(parentIdPool.getPoolName()))) {
434 InstanceIdentifier<IdPool> idPoolInstanceIdentifier = idUtils
435 .getIdPoolInstance(childPools.getChildPoolName());
436 IdPool otherChildPool = singleTxDB.syncRead(CONFIGURATION, idPoolInstanceIdentifier);
437 ReleasedIdsHolderBuilder releasedIds = idUtils.getReleaseIdsHolderBuilder(otherChildPool);
439 List<DelayedIdEntries> delayedIdEntriesChild = releasedIds.getDelayedIdEntries();
440 List<DelayedIdEntries> delayedIdEntriesParent = releasedIdsBuilderParent.getDelayedIdEntries();
441 if (delayedIdEntriesParent == null) {
442 delayedIdEntriesParent = new LinkedList<>();
444 delayedIdEntriesParent.addAll(delayedIdEntriesChild);
445 delayedIdEntriesChild.clear();
447 AvailableIdsHolderBuilder availableIds = idUtils.getAvailableIdsHolderBuilder(otherChildPool);
448 while (idUtils.isIdAvailable(availableIds)) {
449 long cursor = availableIds.getCursor() + 1;
450 delayedIdEntriesParent.add(idUtils.createDelayedIdEntry(cursor, currentTime));
451 availableIds.setCursor(cursor);
454 long totalAvailableIdCount = releasedIds.getDelayedIdEntries().size()
455 + idUtils.getAvailableIdsCount(availableIds);
456 long count = releasedIdsBuilderParent.getAvailableIdCount() + totalAvailableIdCount;
457 releasedIdsBuilderParent.setDelayedIdEntries(delayedIdEntriesParent).setAvailableIdCount(count);
458 singleTxDB.syncUpdate(CONFIGURATION, idPoolInstanceIdentifier,
459 new IdPoolBuilder().setKey(new IdPoolKey(otherChildPool.getPoolName()))
460 .setAvailableIdsHolder(availableIds.build()).setReleasedIdsHolder(releasedIds.build())
462 return totalAvailableIdCount;
468 private long allocateIdBlockFromReleasedIdsHolder(IdLocalPool localIdPool,
469 ReleasedIdsHolderBuilder releasedIdsBuilderParent, IdPool parentIdPool, WriteTransaction tx) {
470 if (releasedIdsBuilderParent.getAvailableIdCount() == 0) {
471 LOG.debug("Ids unavailable in releasedIds of parent pool {}", parentIdPool);
474 List<DelayedIdEntries> delayedIdEntriesParent = releasedIdsBuilderParent.getDelayedIdEntries();
475 int idCount = Math.min(delayedIdEntriesParent.size(), parentIdPool.getBlockSize());
476 List<DelayedIdEntries> idEntriesToBeRemoved = delayedIdEntriesParent.subList(0, idCount);
477 ReleasedIdHolder releasedIds = (ReleasedIdHolder) localIdPool.getReleasedIds();
478 List<DelayedIdEntry> delayedIdEntriesLocalCache = releasedIds.getDelayedEntries();
479 List<DelayedIdEntry> delayedIdEntriesFromParentPool = idEntriesToBeRemoved
481 .map(delayedIdEntry -> new DelayedIdEntry(delayedIdEntry
482 .getId(), delayedIdEntry.getReadyTimeSec()))
483 .sorted(comparing(DelayedIdEntry::getReadyTimeSec))
484 .collect(toCollection(ArrayList::new));
485 delayedIdEntriesFromParentPool.addAll(delayedIdEntriesLocalCache);
486 releasedIds.replaceDelayedEntries(delayedIdEntriesFromParentPool);
487 releasedIds.setAvailableIdCount(releasedIds.getAvailableIdCount() + idCount);
488 localIdPool.setReleasedIds(releasedIds);
489 delayedIdEntriesParent.removeAll(idEntriesToBeRemoved);
490 releasedIdsBuilderParent.setDelayedIdEntries(delayedIdEntriesParent);
491 InstanceIdentifier<ReleasedIdsHolder> releasedIdsHolderInstanceIdentifier = InstanceIdentifier
492 .builder(IdPools.class).child(IdPool.class,
493 new IdPoolKey(parentIdPool.getPoolName())).child(ReleasedIdsHolder.class).build();
494 releasedIdsBuilderParent.setAvailableIdCount(releasedIdsBuilderParent.getAvailableIdCount() - idCount);
495 LOG.debug("Allocated {} ids from releasedIds of parent pool {}", idCount, parentIdPool);
496 tx.merge(CONFIGURATION, releasedIdsHolderInstanceIdentifier,
497 releasedIdsBuilderParent.build(), true);
501 private long allocateIdBlockFromAvailableIdsHolder(IdLocalPool localIdPool, IdPool parentIdPool,
502 WriteTransaction tx) {
504 AvailableIdsHolderBuilder availableIdsBuilderParent = idUtils.getAvailableIdsHolderBuilder(parentIdPool);
505 long end = availableIdsBuilderParent.getEnd();
506 long cur = availableIdsBuilderParent.getCursor();
507 if (!idUtils.isIdAvailable(availableIdsBuilderParent)) {
508 if (LOG.isDebugEnabled()) {
509 LOG.debug("Ids exhausted in parent pool {}", parentIdPool);
513 // Update availableIdsHolder of Local Pool
514 idCount = Math.min(end - cur, parentIdPool.getBlockSize());
515 AvailableIdHolder availableIds = new AvailableIdHolder(idUtils, cur + 1, cur + idCount);
516 localIdPool.setAvailableIds(availableIds);
517 // Update availableIdsHolder of Global Pool
518 InstanceIdentifier<AvailableIdsHolder> availableIdsHolderInstanceIdentifier = InstanceIdentifier
519 .builder(IdPools.class).child(IdPool.class,
520 new IdPoolKey(parentIdPool.getPoolName())).child(AvailableIdsHolder.class).build();
521 availableIdsBuilderParent.setCursor(cur + idCount);
522 if (LOG.isDebugEnabled()) {
523 LOG.debug("Allocated {} ids from availableIds of global pool {}", idCount, parentIdPool);
525 tx.merge(CONFIGURATION, availableIdsHolderInstanceIdentifier,
526 availableIdsBuilderParent.build(), true);
530 private void releaseIdFromLocalPool(String parentPoolName, String localPoolName, String idKey)
531 throws ReadFailedException, IdManagerException {
532 String idLatchKey = idUtils.getUniqueKey(parentPoolName, idKey);
533 LOG.debug("Releasing ID {} from pool {}", idKey, localPoolName);
534 CountDownLatch latch = idUtils.getReleaseIdLatch(idLatchKey);
537 if (!latch.await(10, TimeUnit.SECONDS)) {
538 LOG.warn("Timed out while releasing id {} from id pool {}", idKey, parentPoolName);
540 } catch (InterruptedException ignored) {
541 LOG.warn("Thread interrupted while releasing id {} from id pool {}", idKey, parentPoolName);
543 idUtils.removeReleaseIdLatch(idLatchKey);
546 localPoolName = localPoolName.intern();
547 InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier = idUtils.getIdPoolInstance(parentPoolName);
548 IdPool parentIdPool = singleTxDB.syncRead(CONFIGURATION, parentIdPoolInstanceIdentifier);
549 List<IdEntries> idEntries = parentIdPool.getIdEntries();
550 List<IdEntries> newIdEntries = idEntries;
551 if (idEntries == null) {
552 throw new IdDoesNotExistException(parentPoolName, idKey);
554 InstanceIdentifier<IdEntries> existingId = idUtils.getIdEntry(parentIdPoolInstanceIdentifier, idKey);
555 Optional<IdEntries> existingIdEntryObject = singleTxDB.syncReadOptional(CONFIGURATION, existingId);
556 if (!existingIdEntryObject.isPresent()) {
557 LOG.info("Specified Id key {} does not exist in id pool {}", idKey, parentPoolName);
558 idUtils.unlock(lockManager, idLatchKey);
561 IdEntries existingIdEntry = existingIdEntryObject.get();
562 List<Long> idValuesList = existingIdEntry.getIdValue();
563 IdLocalPool localIdPoolCache = localPool.get(parentPoolName);
564 boolean isRemoved = newIdEntries.remove(existingIdEntry);
565 LOG.debug("The entry {} is removed {}", existingIdEntry, isRemoved);
566 updateDelayedEntriesInLocalCache(idValuesList, parentPoolName, localIdPoolCache);
567 IdHolderSyncJob poolSyncJob = new IdHolderSyncJob(localPoolName, localIdPoolCache.getReleasedIds(), txRunner,
569 jobCoordinator.enqueueJob(localPoolName, poolSyncJob, IdUtils.RETRY_COUNT);
570 scheduleCleanUpTask(localIdPoolCache, parentPoolName, parentIdPool.getBlockSize());
571 LOG.debug("Released id ({}, {}) from pool {}", idKey, idValuesList, localPoolName);
572 // Updating id entries in the parent pool. This will be used for restart scenario
573 UpdateIdEntryJob job = new UpdateIdEntryJob(parentPoolName, localPoolName, idKey, null, txRunner, idUtils,
575 jobCoordinator.enqueueJob(parentPoolName, job, IdUtils.RETRY_COUNT);
578 private void scheduleCleanUpTask(final IdLocalPool localIdPoolCache,
579 final String parentPoolName, final int blockSize) {
580 TimerTask scheduledTask = new TimerTask() {
584 new CleanUpJob(localIdPoolCache, txRunner, broker, parentPoolName, blockSize, lockManager,
585 idUtils, jobCoordinator);
586 jobCoordinator.enqueueJob(localIdPoolCache.getPoolName(), job, IdUtils.RETRY_COUNT);
589 cleanJobTimer.schedule(scheduledTask, IdUtils.DEFAULT_DELAY_TIME * 1000);
592 private IdPool createGlobalPool(WriteTransaction tx, String poolName, long low, long high, long blockSize)
593 throws ReadFailedException {
595 InstanceIdentifier<IdPool> idPoolInstanceIdentifier = idUtils.getIdPoolInstance(poolName);
596 Optional<IdPool> existingIdPool = singleTxDB.syncReadOptional(CONFIGURATION, idPoolInstanceIdentifier);
597 if (!existingIdPool.isPresent()) {
598 if (LOG.isDebugEnabled()) {
599 LOG.debug("Creating new global pool {}", poolName);
601 idPool = idUtils.createGlobalPool(poolName, low, high, blockSize);
602 tx.put(CONFIGURATION, idPoolInstanceIdentifier, idPool, true);
604 idPool = existingIdPool.get();
605 if (LOG.isDebugEnabled()) {
606 LOG.debug("GlobalPool exists {}", idPool);
612 private IdLocalPool createLocalPool(WriteTransaction tx, String localPoolName, IdPool idPool)
613 throws OperationFailedException, IdManagerException {
614 localPoolName = localPoolName.intern();
615 IdLocalPool idLocalPool = new IdLocalPool(idUtils, localPoolName);
616 allocateIdBlockFromParentPool(idLocalPool, idPool, tx);
617 String parentPool = idPool.getPoolName();
618 localPool.put(parentPool, idLocalPool);
619 LocalPoolCreateJob job = new LocalPoolCreateJob(idLocalPool, txRunner, idPool.getPoolName(),
620 idPool.getBlockSize(), idUtils);
621 jobCoordinator.enqueueJob(localPoolName, job, IdUtils.RETRY_COUNT);
625 private void deletePool(String poolName) {
626 LocalPoolDeleteJob job = new LocalPoolDeleteJob(poolName, txRunner, idUtils);
627 jobCoordinator.enqueueJob(poolName, job, IdUtils.RETRY_COUNT);
630 public void poolDeleted(String parentPoolName, String poolName) {
631 IdLocalPool idLocalPool = localPool.get(parentPoolName);
632 if (idLocalPool != null) {
633 if (idLocalPool.getPoolName().equals(poolName)) {
634 localPool.remove(parentPoolName);
639 private void updateDelayedEntriesInLocalCache(List<Long> idsList, String parentPoolName,
640 IdLocalPool localPoolCache) {
641 for (long idValue : idsList) {
642 localPoolCache.getReleasedIds().addId(idValue);
644 localPool.put(parentPoolName, localPoolCache);
647 public java.util.Optional<IdLocalPool> getIdLocalPool(String parentPoolName) {
648 return java.util.Optional.ofNullable(localPool.get(parentPoolName)).map(IdLocalPool::deepCopyOf);
651 private List<Long> checkForIdInIdEntries(String parentPoolName, String idKey, String uniqueIdKey,
652 CompletableFuture<List<Long>> futureIdValues, boolean hasExistingFutureIdValues)
653 throws IdManagerException, ReadFailedException {
654 InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier = idUtils.getIdPoolInstance(parentPoolName);
655 InstanceIdentifier<IdEntries> existingId = idUtils.getIdEntry(parentIdPoolInstanceIdentifier, idKey);
656 idUtils.lock(lockManager, uniqueIdKey);
657 List<Long> newIdValuesList = new ArrayList<>();
658 Optional<IdEntries> existingIdEntry = singleTxDB.syncReadOptional(CONFIGURATION, existingId);
659 if (existingIdEntry.isPresent()) {
660 newIdValuesList = existingIdEntry.get().getIdValue();
661 LOG.debug("Existing ids {} for the key {} ", newIdValuesList, idKey);
662 // Inform other waiting threads about this new value.
663 futureIdValues.complete(newIdValuesList);
664 // This is to avoid stale entries in the map. If this thread had populated the map,
665 // then the entry should be removed.
666 if (!hasExistingFutureIdValues) {
667 idUtils.removeAllocatedIds(uniqueIdKey);
669 idUtils.unlock(lockManager, uniqueIdKey);
670 return newIdValuesList;
672 return newIdValuesList;
675 private IdLocalPool getOrCreateLocalIdPool(String parentPoolName, String localPoolName)
676 throws IdManagerException, ReadFailedException, OperationFailedException, TransactionCommitFailedException {
677 IdLocalPool localIdPool = localPool.get(parentPoolName);
678 if (localIdPool == null) {
679 idUtils.lock(lockManager, parentPoolName);
681 // Check if a previous thread that got the cluster-wide lock
682 // first, has created the localPool
683 if (localPool.get(parentPoolName) == null) {
684 WriteTransaction tx = broker.newWriteOnlyTransaction();
685 InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier = idUtils
686 .getIdPoolInstance(parentPoolName);
687 IdPool parentIdPool = singleTxDB.syncRead(CONFIGURATION, parentIdPoolInstanceIdentifier);
688 // Return localIdPool
689 localIdPool = createLocalPool(tx, localPoolName, parentIdPool);
690 tx.submit().checkedGet();
692 localIdPool = localPool.get(parentPoolName);
695 idUtils.unlock(lockManager, parentPoolName);
701 private void getRangeOfIds(String parentPoolName, String localPoolName, long size, List<Long> newIdValuesList,
702 IdLocalPool localIdPool, long newIdValue) throws ReadFailedException, IdManagerException {
703 InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier1 = idUtils.getIdPoolInstance(parentPoolName);
704 IdPool parentIdPool = singleTxDB.syncRead(CONFIGURATION, parentIdPoolInstanceIdentifier1);
705 long totalAvailableIdCount = localIdPool.getAvailableIds().getAvailableIdCount()
706 + localIdPool.getReleasedIds().getAvailableIdCount();
707 AvailableIdsHolderBuilder availableParentIds = idUtils.getAvailableIdsHolderBuilder(parentIdPool);
708 ReleasedIdsHolderBuilder releasedParentIds = idUtils.getReleaseIdsHolderBuilder(parentIdPool);
709 totalAvailableIdCount = totalAvailableIdCount + releasedParentIds.getAvailableIdCount()
710 + idUtils.getAvailableIdsCount(availableParentIds);
711 if (totalAvailableIdCount > size) {
714 newIdValue = getIdFromLocalPoolCache(localIdPool, parentPoolName);
715 } catch (OperationFailedException e) {
716 if (LOG.isDebugEnabled()) {
717 LOG.debug("Releasing IDs to pool {}", localPoolName);
719 // Releasing the IDs added in newIdValuesList since
720 // a null list would be returned now, as the
721 // requested size of list IDs exceeds the number of
723 updateDelayedEntriesInLocalCache(newIdValuesList, parentPoolName, localIdPool);
725 newIdValuesList.add(newIdValue);
729 throw new IdManagerException(String.format("Ids exhausted for pool : %s", parentPoolName));