2 * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.genius.idmanager;
11 import static java.util.Comparator.comparing;
12 import static java.util.stream.Collectors.toCollection;
13 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
15 import com.google.common.base.Optional;
16 import com.google.common.util.concurrent.ListenableFuture;
18 import java.util.ArrayList;
19 import java.util.Collections;
20 import java.util.HashMap;
21 import java.util.LinkedList;
22 import java.util.List;
24 import java.util.Timer;
25 import java.util.TimerTask;
26 import java.util.concurrent.CompletableFuture;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.ConcurrentMap;
29 import java.util.concurrent.CopyOnWriteArrayList;
30 import java.util.concurrent.CountDownLatch;
31 import java.util.concurrent.ExecutionException;
32 import java.util.concurrent.Future;
33 import java.util.concurrent.TimeUnit;
35 import javax.annotation.PostConstruct;
36 import javax.annotation.PreDestroy;
37 import javax.inject.Inject;
38 import javax.inject.Singleton;
40 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
41 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
42 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
43 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
44 import org.opendaylight.genius.datastoreutils.DataStoreJobCoordinator;
45 import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
46 import org.opendaylight.genius.idmanager.ReleasedIdHolder.DelayedIdEntry;
47 import org.opendaylight.genius.idmanager.api.IdManagerMonitor;
48 import org.opendaylight.genius.idmanager.jobs.CleanUpJob;
49 import org.opendaylight.genius.idmanager.jobs.IdHolderSyncJob;
50 import org.opendaylight.genius.idmanager.jobs.LocalPoolCreateJob;
51 import org.opendaylight.genius.idmanager.jobs.LocalPoolDeleteJob;
52 import org.opendaylight.genius.idmanager.jobs.UpdateIdEntryJob;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdInput;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdOutput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdOutputBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdRangeInput;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdRangeOutput;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdRangeOutputBuilder;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.CreateIdPoolInput;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.DeleteIdPoolInput;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.IdManagerService;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.IdPools;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.ReleaseIdInput;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.IdPool;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.IdPoolBuilder;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.IdPoolKey;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.AvailableIdsHolder;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.AvailableIdsHolderBuilder;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.ChildPools;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.IdEntries;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.ReleasedIdsHolder;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.ReleasedIdsHolderBuilder;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.released.ids.DelayedIdEntries;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockManagerService;
75 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
76 import org.opendaylight.yangtools.yang.common.OperationFailedException;
77 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
78 import org.opendaylight.yangtools.yang.common.RpcResult;
79 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
80 import org.slf4j.Logger;
81 import org.slf4j.LoggerFactory;
84 public class IdManager implements IdManagerService, IdManagerMonitor {
86 private static final Logger LOG = LoggerFactory.getLogger(IdManager.class);
87 private static final long DEFAULT_IDLE_TIME = 24 * 60 * 60;
89 private final DataBroker broker;
90 private final SingleTransactionDataBroker singleTxDB;
91 private final LockManagerService lockManager;
92 private final IdUtils idUtils;
94 private final ConcurrentMap<String, IdLocalPool> localPool;
95 private final Timer cleanJobTimer = new Timer();
98 public IdManager(DataBroker db, LockManagerService lockManager, IdUtils idUtils) throws ReadFailedException {
100 this.singleTxDB = new SingleTransactionDataBroker(db);
101 this.lockManager = lockManager;
102 this.idUtils = idUtils;
103 this.localPool = new ConcurrentHashMap<>();
108 public Map<String, String> getLocalPoolsDetails() {
109 Map<String, String> map = new HashMap<>();
110 localPool.entrySet().stream().forEach(entry -> map.put(entry.getKey(), entry.getValue().toString()));
115 public void start() {
116 LOG.info("{} start", getClass().getSimpleName());
120 public void close() {
121 LOG.info("{} close", getClass().getSimpleName());
124 private void populateCache() throws ReadFailedException {
125 // If IP changes during reboot, then there will be orphaned child pools.
126 InstanceIdentifier<IdPools> idPoolsInstance = idUtils.getIdPools();
127 Optional<IdPools> idPoolsOptional = singleTxDB.syncReadOptional(CONFIGURATION, idPoolsInstance);
128 if (!idPoolsOptional.isPresent()) {
131 IdPools idPools = idPoolsOptional.get();
132 List<IdPool> idPoolList = idPools.getIdPool();
135 .filter(idPool -> idPool.getParentPoolName() != null
136 && !idPool.getParentPoolName().isEmpty()
137 && idUtils.getLocalPoolName(idPool.getParentPoolName())
138 .equals(idPool.getPoolName()))
140 idPool -> updateLocalIdPoolCache(idPool,
141 idPool.getParentPoolName()));
144 public boolean updateLocalIdPoolCache(IdPool idPool, String parentPoolName) {
145 AvailableIdsHolder availableIdsHolder = idPool.getAvailableIdsHolder();
146 AvailableIdHolder availableIdHolder = new AvailableIdHolder(idUtils, availableIdsHolder.getStart(),
147 availableIdsHolder.getEnd());
148 availableIdHolder.setCur(availableIdsHolder.getCursor());
149 ReleasedIdsHolder releasedIdsHolder = idPool.getReleasedIdsHolder();
150 ReleasedIdHolder releasedIdHolder = new ReleasedIdHolder(idUtils, releasedIdsHolder.getDelayedTimeSec());
151 releasedIdHolder.setAvailableIdCount(releasedIdsHolder.getAvailableIdCount());
152 List<DelayedIdEntries> delayedEntries = releasedIdsHolder.getDelayedIdEntries();
153 List<DelayedIdEntry> delayedIdEntryInCache = new CopyOnWriteArrayList<>();
154 if (delayedEntries != null) {
155 delayedIdEntryInCache = delayedEntries
157 .map(delayedIdEntry -> new DelayedIdEntry(delayedIdEntry
158 .getId(), delayedIdEntry.getReadyTimeSec()))
159 .sorted(comparing(DelayedIdEntry::getReadyTimeSec))
160 .collect(toCollection(CopyOnWriteArrayList::new));
162 releasedIdHolder.setDelayedEntries(delayedIdEntryInCache);
164 IdLocalPool idLocalPool = new IdLocalPool(idUtils, idPool.getPoolName());
165 idLocalPool.setAvailableIds(availableIdHolder);
166 idLocalPool.setReleasedIds(releasedIdHolder);
167 localPool.put(parentPoolName, idLocalPool);
168 if (LOG.isDebugEnabled()) {
169 LOG.debug("Populating cache for {} with {}", idLocalPool.getPoolName(), idLocalPool);
175 public Future<RpcResult<Void>> createIdPool(CreateIdPoolInput input) {
176 LOG.info("createIdPool called with input {}", input);
177 String poolName = input.getPoolName();
178 long low = input.getLow();
179 long high = input.getHigh();
180 long blockSize = idUtils.computeBlockSize(low, high);
181 Future<RpcResult<Void>> futureResult;
183 idUtils.lock(lockManager, poolName);
184 WriteTransaction tx = broker.newWriteOnlyTransaction();
185 poolName = poolName.intern();
187 idPool = createGlobalPool(tx, poolName, low, high, blockSize);
188 String localPoolName = idUtils.getLocalPoolName(poolName);
189 IdLocalPool idLocalPool = localPool.get(poolName);
190 if (idLocalPool == null) {
191 createLocalPool(tx, localPoolName, idPool);
192 idUtils.updateChildPool(tx, idPool.getPoolName(), localPoolName);
194 tx.submit().checkedGet();
195 futureResult = RpcResultBuilder.<Void>success().buildFuture();
196 } catch (OperationFailedException | IdManagerException e) {
197 futureResult = buildFailedRpcResultFuture("createIdPool failed: " + input.toString(), e);
199 idUtils.unlock(lockManager, poolName);
205 public Future<RpcResult<AllocateIdOutput>> allocateId(AllocateIdInput input) {
206 LOG.debug("AllocateId called with input {}", input);
207 String idKey = input.getIdKey();
208 String poolName = input.getPoolName();
209 String localPoolName = idUtils.getLocalPoolName(poolName);
210 long newIdValue = -1;
211 AllocateIdOutputBuilder output = new AllocateIdOutputBuilder();
212 Future<RpcResult<AllocateIdOutput>> futureResult;
214 //allocateIdFromLocalPool method returns a list of IDs with one element. This element is obtained by get(0)
215 newIdValue = allocateIdFromLocalPool(poolName, localPoolName, idKey, 1).get(0);
216 output.setIdValue(newIdValue);
217 futureResult = RpcResultBuilder.<AllocateIdOutput>success().withResult(output.build()).buildFuture();
218 } catch (OperationFailedException | IdManagerException e) {
219 completeExceptionallyIfPresent(poolName, idKey, e);
220 futureResult = buildFailedRpcResultFuture("allocateId failed: " + input.toString(), e);
225 private void completeExceptionallyIfPresent(String poolName, String idKey, Exception exception) {
226 CompletableFuture<List<Long>> completableFuture =
227 idUtils.allocatedIdMap.remove(idUtils.getUniqueKey(poolName, idKey));
228 if (completableFuture != null) {
229 completableFuture.completeExceptionally(exception);
234 public Future<RpcResult<AllocateIdRangeOutput>> allocateIdRange(AllocateIdRangeInput input) {
235 if (LOG.isDebugEnabled()) {
236 LOG.debug("AllocateIdRange called with input {}", input);
238 String idKey = input.getIdKey();
239 String poolName = input.getPoolName();
240 long size = input.getSize();
241 String localPoolName = idUtils.getLocalPoolName(poolName);
242 List<Long> newIdValuesList = new ArrayList<>();
243 AllocateIdRangeOutputBuilder output = new AllocateIdRangeOutputBuilder();
244 Future<RpcResult<AllocateIdRangeOutput>> futureResult;
246 newIdValuesList = allocateIdFromLocalPool(poolName, localPoolName, idKey, size);
247 Collections.sort(newIdValuesList);
248 output.setIdValues(newIdValuesList);
249 futureResult = RpcResultBuilder.<AllocateIdRangeOutput>success().withResult(output.build()).buildFuture();
250 } catch (OperationFailedException | IdManagerException e) {
251 completeExceptionallyIfPresent(poolName, idKey, e);
252 futureResult = buildFailedRpcResultFuture("allocateIdRange failed: " + input.toString(), e);
258 public Future<RpcResult<Void>> deleteIdPool(DeleteIdPoolInput input) {
259 if (LOG.isDebugEnabled()) {
260 LOG.debug("DeleteIdPool called with input {}", input);
262 String poolName = input.getPoolName();
263 Future<RpcResult<Void>> futureResult;
265 InstanceIdentifier<IdPool> idPoolToBeDeleted = idUtils.getIdPoolInstance(poolName);
266 poolName = poolName.intern();
267 synchronized (poolName) {
268 IdPool idPool = singleTxDB.syncRead(CONFIGURATION, idPoolToBeDeleted);
269 List<ChildPools> childPoolList = idPool.getChildPools();
270 if (childPoolList != null) {
271 childPoolList.stream().forEach(childPool -> deletePool(childPool.getChildPoolName()));
273 singleTxDB.syncDelete(CONFIGURATION, idPoolToBeDeleted);
274 if (LOG.isDebugEnabled()) {
275 LOG.debug("Deleted id pool {}", poolName);
278 futureResult = RpcResultBuilder.<Void>success().buildFuture();
279 } catch (OperationFailedException e) {
280 futureResult = buildFailedRpcResultFuture("deleteIdPool failed: " + input.toString(), e);
286 public Future<RpcResult<Void>> releaseId(ReleaseIdInput input) {
287 String poolName = input.getPoolName();
288 String idKey = input.getIdKey();
289 LOG.info("Releasing ID {} from pool {}", idKey, poolName);
290 Future<RpcResult<Void>> futureResult;
291 String uniqueKey = idUtils.getUniqueKey(poolName, idKey);
293 idUtils.lock(lockManager, uniqueKey);
294 releaseIdFromLocalPool(poolName, idUtils.getLocalPoolName(poolName), idKey);
295 futureResult = RpcResultBuilder.<Void>success().buildFuture();
296 } catch (ReadFailedException | IdManagerException e) {
297 futureResult = buildFailedRpcResultFuture("releaseId failed: " + input.toString(), e);
298 idUtils.unlock(lockManager, uniqueKey);
303 private <T> ListenableFuture<RpcResult<T>> buildFailedRpcResultFuture(String msg, Exception exception) {
304 if (exception instanceof IdDoesNotExistException) {
305 // Do not log full stack trace in case ID does not exist
306 LOG.error(msg + " : " + exception.getMessage());
308 LOG.error(msg, exception);
310 RpcResultBuilder<T> failedRpcResultBuilder = RpcResultBuilder.failed();
311 failedRpcResultBuilder.withError(ErrorType.APPLICATION, msg, exception);
312 if (exception instanceof OperationFailedException) {
313 failedRpcResultBuilder.withRpcErrors(((OperationFailedException) exception).getErrorList());
315 return failedRpcResultBuilder.buildFuture();
318 private List<Long> allocateIdFromLocalPool(String parentPoolName, String localPoolName,
319 String idKey, long size) throws OperationFailedException, IdManagerException {
320 LOG.debug("Allocating id from local pool {}. Parent pool {}. Idkey {}", localPoolName, parentPoolName,
322 String uniqueIdKey = idUtils.getUniqueKey(parentPoolName, idKey);
323 CompletableFuture<List<Long>> futureIdValues = new CompletableFuture<>();
324 CompletableFuture<List<Long>> existingFutureIdValue =
325 idUtils.allocatedIdMap.putIfAbsent(uniqueIdKey, futureIdValues);
326 if (existingFutureIdValue != null) {
328 return existingFutureIdValue.get();
329 } catch (InterruptedException | ExecutionException e) {
330 LOG.warn("Could not obtain id from existing futureIdValue for idKey {} and pool {}.",
331 idKey, parentPoolName);
332 throw new IdManagerException(e.getMessage(), e);
336 List<Long> newIdValuesList = checkForIdInIdEntries(parentPoolName, idKey, uniqueIdKey, futureIdValues,
337 existingFutureIdValue);
338 if (!newIdValuesList.isEmpty()) {
339 return newIdValuesList;
341 //This get will not help in concurrent reads. Hence the same read needs to be done again.
342 IdLocalPool localIdPool = getOrCreateLocalIdPool(parentPoolName, localPoolName);
343 LOG.info("Got pool {}", localIdPool);
344 long newIdValue = -1;
345 localPoolName = localPoolName.intern();
347 newIdValue = getIdFromLocalPoolCache(localIdPool, parentPoolName);
348 newIdValuesList.add(newIdValue);
350 getRangeOfIds(parentPoolName, localPoolName, size, newIdValuesList, localIdPool, newIdValue);
352 LOG.info("The newIdValues {} for the idKey {}", newIdValuesList, idKey);
353 idUtils.releaseIdLatchMap.put(uniqueIdKey, new CountDownLatch(1));
354 UpdateIdEntryJob job = new UpdateIdEntryJob(parentPoolName, localPoolName, idKey, newIdValuesList, broker,
355 idUtils, lockManager);
356 DataStoreJobCoordinator.getInstance().enqueueJob(parentPoolName, job, IdUtils.RETRY_COUNT);
357 futureIdValues.complete(newIdValuesList);
358 return newIdValuesList;
359 } catch (OperationFailedException | IdManagerException e) {
360 idUtils.unlock(lockManager, uniqueIdKey);
365 private Long getIdFromLocalPoolCache(IdLocalPool localIdPool, String parentPoolName)
366 throws OperationFailedException, IdManagerException {
368 IdHolder releasedIds = localIdPool.getReleasedIds();
369 Optional<Long> releasedId = Optional.absent();
370 releasedId = releasedIds.allocateId();
371 if (releasedId.isPresent()) {
372 IdHolderSyncJob poolSyncJob =
373 new IdHolderSyncJob(localIdPool.getPoolName(), localIdPool.getReleasedIds(), broker,
375 DataStoreJobCoordinator.getInstance().enqueueJob(localIdPool.getPoolName(),
376 poolSyncJob, IdUtils.RETRY_COUNT);
377 return releasedId.get();
379 Optional<Long> availableId = Optional.absent();
380 IdHolder availableIds = localIdPool.getAvailableIds();
381 if (availableIds != null) {
382 availableId = availableIds.allocateId();
383 if (availableId.isPresent()) {
384 IdHolderSyncJob poolSyncJob =
385 new IdHolderSyncJob(localIdPool.getPoolName(), localIdPool.getAvailableIds(),
387 DataStoreJobCoordinator.getInstance().enqueueJob(localIdPool.getPoolName(),
388 poolSyncJob, IdUtils.RETRY_COUNT);
389 return availableId.get();
392 long idCount = getIdBlockFromParentPool(parentPoolName, localIdPool);
394 if (LOG.isDebugEnabled()) {
395 LOG.debug("Unable to allocate Id block from global pool {}", parentPoolName);
397 throw new IdManagerException(String.format("Ids exhausted for pool : %s", parentPoolName));
403 * Changes made to availableIds and releasedIds will not be persisted to the datastore.
405 private long getIdBlockFromParentPool(String parentPoolName, IdLocalPool localIdPool)
406 throws OperationFailedException, IdManagerException {
407 if (LOG.isDebugEnabled()) {
408 LOG.debug("Allocating block of id from parent pool {}", parentPoolName);
410 InstanceIdentifier<IdPool> idPoolInstanceIdentifier = idUtils.getIdPoolInstance(parentPoolName);
411 parentPoolName = parentPoolName.intern();
412 idUtils.lock(lockManager, parentPoolName);
415 // Check if the childpool already got id block.
416 long availableIdCount =
417 localIdPool.getAvailableIds().getAvailableIdCount()
418 + localIdPool.getReleasedIds().getAvailableIdCount();
419 if (availableIdCount > 0) {
420 return availableIdCount;
422 WriteTransaction tx = broker.newWriteOnlyTransaction();
423 IdPool parentIdPool = singleTxDB.syncRead(CONFIGURATION, idPoolInstanceIdentifier);
424 idCount = allocateIdBlockFromParentPool(localIdPool, parentIdPool, tx);
425 tx.submit().checkedGet();
426 } catch (IdManagerException | NullPointerException e) {
427 LOG.error("Error getting id block from parent pool. {}", e.getMessage());
429 idUtils.unlock(lockManager, parentPoolName);
434 private long allocateIdBlockFromParentPool(IdLocalPool localPoolCache, IdPool parentIdPool, WriteTransaction tx)
435 throws OperationFailedException, IdManagerException {
437 ReleasedIdsHolderBuilder releasedIdsBuilderParent = idUtils.getReleaseIdsHolderBuilder(parentIdPool);
439 idCount = allocateIdBlockFromReleasedIdsHolder(localPoolCache, releasedIdsBuilderParent, parentIdPool, tx);
443 idCount = allocateIdBlockFromAvailableIdsHolder(localPoolCache, parentIdPool, tx);
447 idCount = getIdsFromOtherChildPools(releasedIdsBuilderParent, parentIdPool);
449 if (LOG.isDebugEnabled()) {
450 LOG.debug("Unable to allocate Id block from global pool");
452 throw new IdManagerException(String.format("Ids exhausted for pool : %s", parentIdPool.getPoolName()));
457 private long getIdsFromOtherChildPools(ReleasedIdsHolderBuilder releasedIdsBuilderParent, IdPool parentIdPool)
458 throws OperationFailedException {
459 List<ChildPools> childPoolsList = parentIdPool.getChildPools();
460 // Sorting the child pools on last accessed time so that the pool that
461 // was not accessed for a long time comes first.
462 Collections.sort(childPoolsList, comparing(ChildPools::getLastAccessTime));
463 long currentTime = System.currentTimeMillis() / 1000;
464 for (ChildPools childPools : childPoolsList) {
465 if (childPools.getLastAccessTime() + DEFAULT_IDLE_TIME > currentTime) {
468 if (!childPools.getChildPoolName().equals(idUtils.getLocalPoolName(parentIdPool.getPoolName()))) {
469 InstanceIdentifier<IdPool> idPoolInstanceIdentifier = idUtils
470 .getIdPoolInstance(childPools.getChildPoolName());
471 IdPool otherChildPool = singleTxDB.syncRead(CONFIGURATION, idPoolInstanceIdentifier);
472 ReleasedIdsHolderBuilder releasedIds = idUtils.getReleaseIdsHolderBuilder(otherChildPool);
474 List<DelayedIdEntries> delayedIdEntriesChild = releasedIds.getDelayedIdEntries();
475 List<DelayedIdEntries> delayedIdEntriesParent = releasedIdsBuilderParent.getDelayedIdEntries();
476 if (delayedIdEntriesParent == null) {
477 delayedIdEntriesParent = new LinkedList<>();
479 delayedIdEntriesParent.addAll(delayedIdEntriesChild);
480 delayedIdEntriesChild.removeAll(delayedIdEntriesChild);
482 AvailableIdsHolderBuilder availableIds = idUtils.getAvailableIdsHolderBuilder(otherChildPool);
483 while (idUtils.isIdAvailable(availableIds)) {
484 long cursor = availableIds.getCursor() + 1;
485 delayedIdEntriesParent.add(idUtils.createDelayedIdEntry(cursor, currentTime));
486 availableIds.setCursor(cursor);
489 long totalAvailableIdCount = releasedIds.getDelayedIdEntries().size()
490 + idUtils.getAvailableIdsCount(availableIds);
491 long count = releasedIdsBuilderParent.getAvailableIdCount() + totalAvailableIdCount;
492 releasedIdsBuilderParent.setDelayedIdEntries(delayedIdEntriesParent).setAvailableIdCount(count);
493 singleTxDB.syncUpdate(CONFIGURATION, idPoolInstanceIdentifier,
494 new IdPoolBuilder().setKey(new IdPoolKey(otherChildPool.getPoolName()))
495 .setAvailableIdsHolder(availableIds.build()).setReleasedIdsHolder(releasedIds.build())
497 return totalAvailableIdCount;
503 private long allocateIdBlockFromReleasedIdsHolder(IdLocalPool localIdPool,
504 ReleasedIdsHolderBuilder releasedIdsBuilderParent, IdPool parentIdPool, WriteTransaction tx) {
505 if (releasedIdsBuilderParent.getAvailableIdCount() == 0) {
506 LOG.debug("Ids unavailable in releasedIds of parent pool {}", parentIdPool);
509 List<DelayedIdEntries> delayedIdEntriesParent = releasedIdsBuilderParent.getDelayedIdEntries();
510 int idCount = Math.min(delayedIdEntriesParent.size(), parentIdPool.getBlockSize());
511 List<DelayedIdEntries> idEntriesToBeRemoved = delayedIdEntriesParent.subList(0, idCount);
512 ReleasedIdHolder releasedIds = (ReleasedIdHolder) localIdPool.getReleasedIds();
513 List<DelayedIdEntry> delayedIdEntriesLocalCache = releasedIds.getDelayedEntries();
514 List<DelayedIdEntry> delayedIdEntriesFromParentPool = idEntriesToBeRemoved
516 .map(delayedIdEntry -> new DelayedIdEntry(delayedIdEntry
517 .getId(), delayedIdEntry.getReadyTimeSec()))
518 .sorted(comparing(DelayedIdEntry::getReadyTimeSec))
519 .collect(toCollection(CopyOnWriteArrayList::new));
520 delayedIdEntriesFromParentPool.addAll(delayedIdEntriesLocalCache);
521 releasedIds.setDelayedEntries(delayedIdEntriesFromParentPool);
522 releasedIds.setAvailableIdCount(releasedIds.getAvailableIdCount() + idCount);
523 localIdPool.setReleasedIds(releasedIds);
524 delayedIdEntriesParent.removeAll(idEntriesToBeRemoved);
525 releasedIdsBuilderParent.setDelayedIdEntries(delayedIdEntriesParent);
526 InstanceIdentifier<ReleasedIdsHolder> releasedIdsHolderInstanceIdentifier = InstanceIdentifier
527 .builder(IdPools.class).child(IdPool.class,
528 new IdPoolKey(parentIdPool.getPoolName())).child(ReleasedIdsHolder.class).build();
529 releasedIdsBuilderParent.setAvailableIdCount(releasedIdsBuilderParent.getAvailableIdCount() - idCount);
530 LOG.debug("Allocated {} ids from releasedIds of parent pool {}", idCount, parentIdPool);
531 tx.merge(CONFIGURATION, releasedIdsHolderInstanceIdentifier,
532 releasedIdsBuilderParent.build(), true);
536 private long allocateIdBlockFromAvailableIdsHolder(IdLocalPool localIdPool, IdPool parentIdPool,
537 WriteTransaction tx) {
539 AvailableIdsHolderBuilder availableIdsBuilderParent = idUtils.getAvailableIdsHolderBuilder(parentIdPool);
540 long end = availableIdsBuilderParent.getEnd();
541 long cur = availableIdsBuilderParent.getCursor();
542 if (!idUtils.isIdAvailable(availableIdsBuilderParent)) {
543 if (LOG.isDebugEnabled()) {
544 LOG.debug("Ids exhausted in parent pool {}", parentIdPool);
548 // Update availableIdsHolder of Local Pool
549 idCount = Math.min(end - cur, parentIdPool.getBlockSize());
550 AvailableIdHolder availableIds = new AvailableIdHolder(idUtils, cur + 1, cur + idCount);
551 localIdPool.setAvailableIds(availableIds);
552 // Update availableIdsHolder of Global Pool
553 InstanceIdentifier<AvailableIdsHolder> availableIdsHolderInstanceIdentifier = InstanceIdentifier
554 .builder(IdPools.class).child(IdPool.class,
555 new IdPoolKey(parentIdPool.getPoolName())).child(AvailableIdsHolder.class).build();
556 availableIdsBuilderParent.setCursor(cur + idCount);
557 if (LOG.isDebugEnabled()) {
558 LOG.debug("Allocated {} ids from availableIds of global pool {}", idCount, parentIdPool);
560 tx.merge(CONFIGURATION, availableIdsHolderInstanceIdentifier,
561 availableIdsBuilderParent.build(), true);
565 private void releaseIdFromLocalPool(String parentPoolName, String localPoolName, String idKey)
566 throws ReadFailedException, IdManagerException {
567 String idLatchKey = idUtils.getUniqueKey(parentPoolName, idKey);
568 LOG.debug("Releasing ID {} from pool {}", idKey, localPoolName);
569 CountDownLatch latch = idUtils.releaseIdLatchMap.get(idLatchKey);
572 latch.await(10, TimeUnit.SECONDS);
573 } catch (InterruptedException ignored) {
574 LOG.warn("Thread interrupted while releasing id {} from id pool {}", idKey, parentPoolName);
576 idUtils.releaseIdLatchMap.remove(idLatchKey);
579 localPoolName = localPoolName.intern();
580 InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier = idUtils.getIdPoolInstance(parentPoolName);
581 IdPool parentIdPool = singleTxDB.syncRead(CONFIGURATION, parentIdPoolInstanceIdentifier);
582 List<IdEntries> idEntries = parentIdPool.getIdEntries();
583 List<IdEntries> newIdEntries = idEntries;
584 if (idEntries == null) {
585 throw new IdDoesNotExistException(parentPoolName, idKey);
587 InstanceIdentifier<IdEntries> existingId = idUtils.getIdEntry(parentIdPoolInstanceIdentifier, idKey);
588 Optional<IdEntries> existingIdEntryObject = singleTxDB.syncReadOptional(CONFIGURATION, existingId);
589 if (!existingIdEntryObject.isPresent()) {
590 LOG.info("Specified Id key {} does not exist in id pool {}", idKey, parentPoolName);
591 idUtils.unlock(lockManager, idLatchKey);
594 IdEntries existingIdEntry = existingIdEntryObject.get();
595 List<Long> idValuesList = existingIdEntry.getIdValue();
596 IdLocalPool localIdPoolCache = localPool.get(parentPoolName);
597 boolean isRemoved = newIdEntries.remove(existingIdEntry);
598 LOG.debug("The entry {} is removed {}", existingIdEntry, isRemoved);
599 updateDelayedEntriesInLocalCache(idValuesList, parentPoolName, localIdPoolCache);
600 IdHolderSyncJob poolSyncJob = new IdHolderSyncJob(localPoolName, localIdPoolCache.getReleasedIds(), broker,
602 DataStoreJobCoordinator.getInstance().enqueueJob(localPoolName, poolSyncJob, IdUtils.RETRY_COUNT);
603 scheduleCleanUpTask(localIdPoolCache, parentPoolName, parentIdPool.getBlockSize());
604 LOG.debug("Released id ({}, {}) from pool {}", idKey, idValuesList, localPoolName);
605 // Updating id entries in the parent pool. This will be used for restart scenario
606 UpdateIdEntryJob job = new UpdateIdEntryJob(parentPoolName, localPoolName, idKey, null, broker, idUtils,
608 DataStoreJobCoordinator.getInstance().enqueueJob(parentPoolName, job, IdUtils.RETRY_COUNT);
611 private void scheduleCleanUpTask(final IdLocalPool localIdPoolCache,
612 final String parentPoolName, final int blockSize) {
613 TimerTask scheduledTask = new TimerTask() {
616 CleanUpJob job = new CleanUpJob(localIdPoolCache, broker, parentPoolName, blockSize, lockManager,
618 DataStoreJobCoordinator.getInstance().enqueueJob(localIdPoolCache.getPoolName(), job,
619 IdUtils.RETRY_COUNT);
622 cleanJobTimer.schedule(scheduledTask, IdUtils.DEFAULT_DELAY_TIME * 1000);
625 private IdPool createGlobalPool(WriteTransaction tx, String poolName, long low, long high, long blockSize)
626 throws ReadFailedException {
628 InstanceIdentifier<IdPool> idPoolInstanceIdentifier = idUtils.getIdPoolInstance(poolName);
629 Optional<IdPool> existingIdPool = singleTxDB.syncReadOptional(CONFIGURATION, idPoolInstanceIdentifier);
630 if (!existingIdPool.isPresent()) {
631 if (LOG.isDebugEnabled()) {
632 LOG.debug("Creating new global pool {}", poolName);
634 idPool = idUtils.createGlobalPool(poolName, low, high, blockSize);
635 tx.put(CONFIGURATION, idPoolInstanceIdentifier, idPool, true);
637 idPool = existingIdPool.get();
638 if (LOG.isDebugEnabled()) {
639 LOG.debug("GlobalPool exists {}", idPool);
645 private IdLocalPool createLocalPool(WriteTransaction tx, String localPoolName, IdPool idPool)
646 throws OperationFailedException, IdManagerException {
647 localPoolName = localPoolName.intern();
648 IdLocalPool idLocalPool = new IdLocalPool(idUtils, localPoolName);
649 allocateIdBlockFromParentPool(idLocalPool, idPool, tx);
650 String parentPool = idPool.getPoolName();
651 localPool.put(parentPool, idLocalPool);
652 LocalPoolCreateJob job = new LocalPoolCreateJob(idLocalPool, broker, idPool.getPoolName(),
653 idPool.getBlockSize(), idUtils);
654 DataStoreJobCoordinator.getInstance().enqueueJob(localPoolName, job, IdUtils.RETRY_COUNT);
658 private void deletePool(String poolName) {
659 LocalPoolDeleteJob job = new LocalPoolDeleteJob(poolName, broker, idUtils);
660 DataStoreJobCoordinator.getInstance().enqueueJob(poolName, job, IdUtils.RETRY_COUNT);
663 public void poolDeleted(String parentPoolName, String poolName) {
664 IdLocalPool idLocalPool = localPool.get(parentPoolName);
665 if (idLocalPool != null) {
666 if (idLocalPool.getPoolName().equals(poolName)) {
667 localPool.remove(parentPoolName);
672 private void updateDelayedEntriesInLocalCache(List<Long> idsList, String parentPoolName,
673 IdLocalPool localPoolCache) {
674 for (long idValue : idsList) {
675 localPoolCache.getReleasedIds().addId(idValue);
677 localPool.put(parentPoolName, localPoolCache);
680 public java.util.Optional<IdLocalPool> getIdLocalPool(String parentPoolName) {
681 return java.util.Optional.ofNullable(localPool.get(parentPoolName))
682 .map(localPool -> localPool.deepCopyOf());
685 private List<Long> checkForIdInIdEntries(String parentPoolName, String idKey, String uniqueIdKey,
686 CompletableFuture<List<Long>> futureIdValues, CompletableFuture<List<Long>> existingFutureIdValue)
687 throws IdManagerException, ReadFailedException {
688 InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier = idUtils.getIdPoolInstance(parentPoolName);
689 InstanceIdentifier<IdEntries> existingId = idUtils.getIdEntry(parentIdPoolInstanceIdentifier, idKey);
690 idUtils.lock(lockManager, uniqueIdKey);
691 List<Long> newIdValuesList = new ArrayList<>();
692 Optional<IdEntries> existingIdEntry = singleTxDB.syncReadOptional(CONFIGURATION, existingId);
693 if (existingIdEntry.isPresent()) {
694 newIdValuesList = existingIdEntry.get().getIdValue();
695 LOG.debug("Existing ids {} for the key {} ", newIdValuesList, idKey);
696 // Inform other waiting threads about this new value.
697 futureIdValues.complete(newIdValuesList);
698 // This is to avoid stale entries in the map. If this thread had populated the map,
699 // then the entry should be removed.
700 if (existingFutureIdValue == null) {
701 idUtils.allocatedIdMap.remove(uniqueIdKey);
703 idUtils.unlock(lockManager, uniqueIdKey);
704 return newIdValuesList;
706 return newIdValuesList;
709 private IdLocalPool getOrCreateLocalIdPool(String parentPoolName, String localPoolName)
710 throws IdManagerException, ReadFailedException, OperationFailedException, TransactionCommitFailedException {
711 IdLocalPool localIdPool = localPool.get(parentPoolName);
712 if (localIdPool == null) {
713 idUtils.lock(lockManager, parentPoolName);
715 // Check if a previous thread that got the cluster-wide lock
716 // first, has created the localPool
717 if (localPool.get(parentPoolName) == null) {
718 WriteTransaction tx = broker.newWriteOnlyTransaction();
719 InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier = idUtils
720 .getIdPoolInstance(parentPoolName);
721 IdPool parentIdPool = singleTxDB.syncRead(CONFIGURATION, parentIdPoolInstanceIdentifier);
722 // Return localIdPool
723 localIdPool = createLocalPool(tx, localPoolName, parentIdPool);
724 tx.submit().checkedGet();
726 localIdPool = localPool.get(parentPoolName);
729 idUtils.unlock(lockManager, parentPoolName);
735 private void getRangeOfIds(String parentPoolName, String localPoolName, long size, List<Long> newIdValuesList,
736 IdLocalPool localIdPool, long newIdValue) throws ReadFailedException, IdManagerException {
737 InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier1 = idUtils.getIdPoolInstance(parentPoolName);
738 IdPool parentIdPool = singleTxDB.syncRead(CONFIGURATION, parentIdPoolInstanceIdentifier1);
739 long totalAvailableIdCount = localIdPool.getAvailableIds().getAvailableIdCount()
740 + localIdPool.getReleasedIds().getAvailableIdCount();
741 AvailableIdsHolderBuilder availableParentIds = idUtils.getAvailableIdsHolderBuilder(parentIdPool);
742 ReleasedIdsHolderBuilder releasedParentIds = idUtils.getReleaseIdsHolderBuilder(parentIdPool);
743 totalAvailableIdCount = totalAvailableIdCount + releasedParentIds.getAvailableIdCount()
744 + idUtils.getAvailableIdsCount(availableParentIds);
745 if (totalAvailableIdCount > size) {
748 newIdValue = getIdFromLocalPoolCache(localIdPool, parentPoolName);
749 } catch (OperationFailedException e) {
750 if (LOG.isDebugEnabled()) {
751 LOG.debug("Releasing IDs to pool {}", localPoolName);
753 // Releasing the IDs added in newIdValuesList since
754 // a null list would be returned now, as the
755 // requested size of list IDs exceeds the number of
757 updateDelayedEntriesInLocalCache(newIdValuesList, parentPoolName, localIdPool);
759 newIdValuesList.add(newIdValue);
763 throw new IdManagerException(String.format("Ids exhausted for pool : %s", parentPoolName));