2 * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.genius.idmanager;
10 import static java.util.Comparator.comparing;
11 import static java.util.stream.Collectors.toCollection;
12 import static org.opendaylight.genius.infra.Datastore.CONFIGURATION;
13 import static org.opendaylight.yangtools.yang.binding.CodeHelpers.nonnull;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import java.util.ArrayList;
19 import java.util.Collections;
20 import java.util.HashMap;
21 import java.util.LinkedList;
22 import java.util.List;
24 import java.util.Objects;
25 import java.util.Optional;
26 import java.util.Timer;
27 import java.util.TimerTask;
28 import java.util.concurrent.CompletableFuture;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.ConcurrentMap;
31 import java.util.concurrent.CountDownLatch;
32 import java.util.concurrent.ExecutionException;
33 import java.util.concurrent.TimeUnit;
34 import java.util.stream.Collectors;
35 import javax.annotation.PostConstruct;
36 import javax.annotation.PreDestroy;
37 import javax.inject.Inject;
38 import javax.inject.Singleton;
39 import org.apache.aries.blueprint.annotation.service.Reference;
40 import org.eclipse.jdt.annotation.Nullable;
41 import org.opendaylight.daexim.DataImportBootReady;
42 import org.opendaylight.genius.datastoreutils.ExpectedDataObjectNotFoundException;
43 import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
44 import org.opendaylight.genius.idmanager.ReleasedIdHolder.DelayedIdEntry;
45 import org.opendaylight.genius.idmanager.api.IdManagerMonitor;
46 import org.opendaylight.genius.idmanager.jobs.CleanUpJob;
47 import org.opendaylight.genius.idmanager.jobs.IdHolderSyncJob;
48 import org.opendaylight.genius.idmanager.jobs.LocalPoolCreateJob;
49 import org.opendaylight.genius.idmanager.jobs.LocalPoolDeleteJob;
50 import org.opendaylight.genius.idmanager.jobs.UpdateIdEntryJob;
51 import org.opendaylight.genius.infra.Datastore.Configuration;
52 import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
53 import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
54 import org.opendaylight.genius.infra.TypedReadWriteTransaction;
55 import org.opendaylight.genius.infra.TypedWriteTransaction;
56 import org.opendaylight.genius.mdsalutil.NwConstants;
57 import org.opendaylight.genius.mdsalutil.interfaces.ShardStatusMonitor;
58 import org.opendaylight.infrautils.jobcoordinator.JobCoordinator;
59 import org.opendaylight.mdsal.binding.api.DataBroker;
60 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
61 import org.opendaylight.mdsal.common.api.ReadFailedException;
62 import org.opendaylight.serviceutils.tools.rpc.FutureRpcResults;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdInput;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdOutput;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdOutputBuilder;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdRangeInput;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdRangeOutput;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdRangeOutputBuilder;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.CreateIdPoolInput;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.CreateIdPoolOutput;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.CreateIdPoolOutputBuilder;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.DeleteIdPoolInput;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.DeleteIdPoolOutput;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.IdManagerService;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.IdPools;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.ReleaseIdInput;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.ReleaseIdOutput;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.ReleaseIdOutputBuilder;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.IdPool;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.IdPoolBuilder;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.IdPoolKey;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.AvailableIdsHolder;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.AvailableIdsHolderBuilder;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.ChildPools;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.ChildPoolsKey;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.IdEntries;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.IdEntriesKey;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.ReleasedIdsHolder;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.ReleasedIdsHolderBuilder;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.released.ids.DelayedIdEntries;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockManagerService;
92 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
93 import org.opendaylight.yangtools.yang.common.OperationFailedException;
94 import org.opendaylight.yangtools.yang.common.RpcResult;
95 import org.opendaylight.yangtools.yang.common.Uint32;
96 import org.slf4j.Logger;
97 import org.slf4j.LoggerFactory;
100 public class IdManager implements IdManagerService, IdManagerMonitor {
102 private static final Logger LOG = LoggerFactory.getLogger(IdManager.class);
103 private static final long DEFAULT_IDLE_TIME = 24 * 60 * 60;
105 private final DataBroker broker;
106 private final ManagedNewTransactionRunner txRunner;
107 private final SingleTransactionDataBroker singleTxDB;
108 private final LockManagerService lockManager;
109 private final IdUtils idUtils;
110 private final JobCoordinator jobCoordinator;
112 private final ConcurrentMap<String, IdLocalPool> localPool;
113 private final Timer cleanJobTimer = new Timer();
116 public IdManager(DataBroker db, LockManagerService lockManager, IdUtils idUtils,
117 @Reference DataImportBootReady dataImportBootReady,
118 @Reference JobCoordinator jobCoordinator, @Reference ShardStatusMonitor shardStatusMonitor)
119 throws ReadFailedException, InterruptedException {
121 this.txRunner = new ManagedNewTransactionRunnerImpl(db);
122 this.singleTxDB = new SingleTransactionDataBroker(db);
123 this.lockManager = lockManager;
124 this.idUtils = idUtils;
125 this.jobCoordinator = jobCoordinator;
127 // NB: We do not "use" the DataImportBootReady, but it's presence in the OSGi
128 // Service Registry is the required "signal" that the Daexim "import on boot"
129 // has fully completed (which we want to wait for). Therefore, making this
130 // dependent on that defers the Blueprint initialization, as we'd like to,
131 // so that we do not start giving out new IDs before an import went in.
132 // Thus, please DO NOT remove the DataImportBootReady argument, even if
133 // it appears to be (is) un-used from a Java code PoV!
135 this.localPool = new ConcurrentHashMap<>();
136 boolean isDatastoreAvailable = false;
138 int totalRetry = 1000;
140 while (retryCount < totalRetry) {
141 isDatastoreAvailable = shardStatusMonitor.getShardStatus(NwConstants.IdManagerShards.getShardList());
142 if (isDatastoreAvailable) {
143 LOG.info("IDManager is UP");
147 LOG.error("IdManager: retrying shard status check for the {} time, pending retries {}",
148 ++retryCount, totalRetry - retryCount);
151 } catch (InterruptedException e) {
152 LOG.error("IDManager is DOWN, shard status check failed");
155 if (!isDatastoreAvailable) {
156 LOG.error("IDManager is DOWN, as shards were not available at bundle bringup");
161 public Map<String, String> getLocalPoolsDetails() {
162 Map<String, String> map = new HashMap<>();
163 localPool.forEach((key, value) -> map.put(key, value.toString()));
168 public void start() {
169 LOG.info("{} start", getClass().getSimpleName());
173 public void close() {
174 cleanJobTimer.cancel();
176 LOG.info("{} close", getClass().getSimpleName());
179 private void populateCache() throws InterruptedException {
180 // If IP changes during reboot, then there will be orphaned child pools.
181 InstanceIdentifier<IdPools> idPoolsInstance = idUtils.getIdPools();
182 Optional<IdPools> idPoolsOptional;
185 idPoolsOptional = singleTxDB.syncReadOptional(LogicalDatastoreType.CONFIGURATION, idPoolsInstance);
187 } catch (ExecutionException e) {
188 LOG.error("Failed to read the id pools due to error. Retrying again...", e);
192 if (!idPoolsOptional.isPresent()) {
195 idPoolsOptional.get().nonnullIdPool().values()
197 .filter(idPool -> idPool.getParentPoolName() != null
198 && !idPool.getParentPoolName().isEmpty()
199 && idUtils.getLocalPoolName(idPool.getParentPoolName())
200 .equals(idPool.getPoolName()))
202 idPool -> updateLocalIdPoolCache(idPool,
203 idPool.getParentPoolName()));
206 public boolean updateLocalIdPoolCache(IdPool idPool, String parentPoolName) {
207 AvailableIdsHolder availableIdsHolder = idPool.getAvailableIdsHolder();
208 AvailableIdHolder availableIdHolder = new AvailableIdHolder(idUtils, availableIdsHolder.getStart().toJava(),
209 availableIdsHolder.getEnd().toJava());
210 availableIdHolder.setCur(availableIdsHolder.getCursor());
211 ReleasedIdsHolder releasedIdsHolder = idPool.getReleasedIdsHolder();
212 ReleasedIdHolder releasedIdHolder = new ReleasedIdHolder(idUtils,
213 releasedIdsHolder.getDelayedTimeSec().toJava());
214 releasedIdHolder.setAvailableIdCount(releasedIdsHolder.getAvailableIdCount().toJava());
215 List<DelayedIdEntry> delayedIdEntryInCache = releasedIdsHolder.nonnullDelayedIdEntries()
217 .map(delayedIdEntry -> new DelayedIdEntry(delayedIdEntry
218 .getId().toJava(), delayedIdEntry.getReadyTimeSec().toJava()))
219 .sorted(comparing(DelayedIdEntry::getReadyTimeSec))
220 .collect(toCollection(ArrayList::new));
222 releasedIdHolder.replaceDelayedEntries(delayedIdEntryInCache);
224 IdLocalPool idLocalPool = new IdLocalPool(idUtils, idPool.getPoolName());
225 idLocalPool.setAvailableIds(availableIdHolder);
226 idLocalPool.setReleasedIds(releasedIdHolder);
227 localPool.put(parentPoolName, idLocalPool);
228 if (LOG.isDebugEnabled()) {
229 LOG.debug("Populating cache for {} with {}", idLocalPool.getPoolName(), idLocalPool);
235 public ListenableFuture<RpcResult<CreateIdPoolOutput>> createIdPool(CreateIdPoolInput input) {
236 LOG.info("createIdPool called with input {}", input);
237 long low = input.getLow().toJava();
238 long high = input.getHigh().toJava();
239 long blockSize = idUtils.computeBlockSize(low, high);
240 return FutureRpcResults.fromListenableFuture(LOG, "createIdPool", input, () -> {
241 String poolName = input.getPoolName().intern();
243 idUtils.lock(lockManager, poolName);
244 return Futures.transform(txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, confTx -> {
245 IdPool idPool = createGlobalPool(confTx, poolName, low, high, blockSize);
246 String localPoolName = idUtils.getLocalPoolName(poolName);
247 IdLocalPool idLocalPool = localPool.get(poolName);
248 if (idLocalPool == null) {
249 createLocalPool(confTx, localPoolName, idPool);
250 idUtils.updateChildPool(confTx, idPool.getPoolName(), localPoolName);
252 }), unused -> new CreateIdPoolOutputBuilder().build(), MoreExecutors.directExecutor());
254 idUtils.unlock(lockManager, poolName);
260 public ListenableFuture<RpcResult<AllocateIdOutput>> allocateId(AllocateIdInput input) {
261 String idKey = input.getIdKey();
262 String poolName = input.getPoolName();
263 return FutureRpcResults.fromBuilder(LOG, "allocateId", input, () -> {
264 String localPoolName = idUtils.getLocalPoolName(poolName);
265 // allocateIdFromLocalPool method returns a list of IDs with one element. This element is obtained by get(0)
266 long newIdValue = allocateIdFromLocalPool(poolName, localPoolName, idKey, 1).get(0).toJava();
267 return new AllocateIdOutputBuilder().setIdValue(newIdValue);
268 }).onFailure(e -> completeExceptionallyIfPresent(poolName, idKey, e)).build();
271 private void completeExceptionallyIfPresent(String poolName, String idKey, Throwable exception) {
272 CompletableFuture<List<Uint32>> completableFuture =
273 idUtils.removeAllocatedIds(idUtils.getUniqueKey(poolName, idKey));
274 if (completableFuture != null) {
275 completableFuture.completeExceptionally(exception);
280 public ListenableFuture<RpcResult<AllocateIdRangeOutput>> allocateIdRange(AllocateIdRangeInput input) {
281 String idKey = input.getIdKey();
282 String poolName = input.getPoolName();
283 long size = input.getSize().toJava();
284 String localPoolName = idUtils.getLocalPoolName(poolName);
285 AllocateIdRangeOutputBuilder output = new AllocateIdRangeOutputBuilder();
286 return FutureRpcResults.fromBuilder(LOG, "allocateIdRange", input, () -> {
287 List<Uint32> newIdValuesList = allocateIdFromLocalPool(poolName, localPoolName, idKey, size);
288 Collections.sort(newIdValuesList);
289 output.setIdValues(newIdValuesList);
291 }).onFailure(e -> completeExceptionallyIfPresent(poolName, idKey, e)).build();
295 public ListenableFuture<RpcResult<DeleteIdPoolOutput>> deleteIdPool(DeleteIdPoolInput input) {
296 return FutureRpcResults.fromListenableFuture(LOG, "deleteIdPool", input, () -> {
297 String poolName = input.getPoolName().intern();
298 InstanceIdentifier<IdPool> idPoolToBeDeleted = idUtils.getIdPoolInstance(poolName);
299 synchronized (poolName) {
300 IdPool idPool = singleTxDB.syncRead(LogicalDatastoreType.CONFIGURATION, idPoolToBeDeleted);
301 //List<ChildPools> childPoolList = idPool.getChildPools();
304 @Nullable Map<ChildPoolsKey, ChildPools> childPoolList = idPool.getChildPools();
305 if (childPoolList != null) {
306 childPoolList.forEach((childPool, childPools) -> {
307 deletePool(childPool.getChildPoolName());
310 singleTxDB.syncDelete(LogicalDatastoreType.CONFIGURATION, idPoolToBeDeleted);
312 // TODO return the Future from a TBD asyncDelete instead.. BUT check that all callers @CheckReturnValue
313 return Futures.immediateFuture((DeleteIdPoolOutput) null);
318 public ListenableFuture<RpcResult<ReleaseIdOutput>> releaseId(ReleaseIdInput input) {
319 String poolName = input.getPoolName();
320 String idKey = input.getIdKey();
321 String uniqueKey = idUtils.getUniqueKey(poolName, idKey);
322 return FutureRpcResults.fromBuilder(LOG, "releaseId", input, () -> {
323 idUtils.lock(lockManager, uniqueKey);
324 return releaseIdFromLocalPool(poolName, idUtils.getLocalPoolName(poolName), idKey);
325 }).onFailureLogLevel(FutureRpcResults.LogLevel.NONE)
327 if (e instanceof IdDoesNotExistException) {
328 // Do not log full stack trace in case ID does not exist
329 LOG.error("RPC releaseId() failed due to IdDoesNotExistException; input = {}", input);
331 // But for all other cases do:
332 LOG.error("RPC releaseId() failed; input = {}", input, e);
334 idUtils.unlock(lockManager, uniqueKey);
338 private List<Uint32> allocateIdFromLocalPool(String parentPoolName, String localPoolName,
339 String idKey, long size) throws OperationFailedException, IdManagerException, ExecutionException,
340 InterruptedException {
341 LOG.debug("Allocating id from local pool {}. Parent pool {}. Idkey {}", localPoolName, parentPoolName, idKey);
342 String uniqueIdKey = idUtils.getUniqueKey(parentPoolName, idKey);
343 CompletableFuture<List<Uint32>> futureIdValues = new CompletableFuture<>();
344 CompletableFuture<List<Uint32>> existingFutureIdValue =
345 idUtils.putAllocatedIdsIfAbsent(uniqueIdKey, futureIdValues);
346 if (existingFutureIdValue != null) {
348 return existingFutureIdValue.get();
349 } catch (InterruptedException | ExecutionException e) {
350 LOG.warn("Could not obtain id from existing futureIdValue for idKey {} and pool {}.",
351 idKey, parentPoolName);
352 throw new IdManagerException(e.getMessage(), e);
356 List<Uint32> newIdValuesList = checkForIdInIdEntries(parentPoolName, idKey, uniqueIdKey, futureIdValues,
358 if (!newIdValuesList.isEmpty()) {
359 return newIdValuesList;
361 //This get will not help in concurrent reads. Hence the same read needs to be done again.
362 IdLocalPool localIdPool = getOrCreateLocalIdPool(parentPoolName, localPoolName);
363 LOG.debug("Got pool {}", localIdPool);
364 long newIdValue = -1;
365 localPoolName = localPoolName.intern();
367 newIdValue = getIdFromLocalPoolCache(localIdPool, parentPoolName);
368 newIdValuesList.add(Uint32.valueOf(newIdValue));
370 getRangeOfIds(parentPoolName, localPoolName, size, newIdValuesList, localIdPool, newIdValue);
372 LOG.debug("The newIdValues {} for the idKey {}", newIdValuesList, idKey);
373 idUtils.putReleaseIdLatch(uniqueIdKey, new CountDownLatch(1));
374 UpdateIdEntryJob job = new UpdateIdEntryJob(parentPoolName, localPoolName, idKey, newIdValuesList, txRunner,
375 idUtils, lockManager);
376 jobCoordinator.enqueueJob(parentPoolName, job, IdUtils.RETRY_COUNT);
377 futureIdValues.complete(newIdValuesList);
378 return newIdValuesList;
379 } catch (OperationFailedException | IdManagerException e) {
380 idUtils.unlock(lockManager, uniqueIdKey);
385 private Long getIdFromLocalPoolCache(IdLocalPool localIdPool, String parentPoolName)
386 throws IdManagerException {
388 IdHolder availableIds = localIdPool.getAvailableIds();
389 if (availableIds != null) {
390 Optional<Long> availableId = availableIds.allocateId();
391 if (availableId.isPresent()) {
392 IdHolderSyncJob poolSyncJob =
393 new IdHolderSyncJob(localIdPool.getPoolName(), localIdPool.getAvailableIds(), txRunner,
395 jobCoordinator.enqueueJob(localIdPool.getPoolName(), poolSyncJob, IdUtils.RETRY_COUNT);
396 return availableId.get();
399 IdHolder releasedIds = localIdPool.getReleasedIds();
400 Optional<Long> releasedId = releasedIds.allocateId();
401 if (releasedId.isPresent()) {
402 IdHolderSyncJob poolSyncJob =
403 new IdHolderSyncJob(localIdPool.getPoolName(), localIdPool.getReleasedIds(), txRunner,
405 jobCoordinator.enqueueJob(localIdPool.getPoolName(), poolSyncJob, IdUtils.RETRY_COUNT);
406 return releasedId.get();
408 long idCount = getIdBlockFromParentPool(parentPoolName, localIdPool);
410 if (LOG.isDebugEnabled()) {
411 LOG.debug("Unable to allocate Id block from global pool {}", parentPoolName);
413 throw new IdManagerException(String.format("Ids exhausted for pool : %s", parentPoolName));
419 * Changes made to availableIds and releasedIds will not be persisted to the datastore.
421 private long getIdBlockFromParentPool(String parentPoolName, IdLocalPool localIdPool)
422 throws IdManagerException {
423 if (LOG.isDebugEnabled()) {
424 LOG.debug("Allocating block of id from parent pool {}", parentPoolName);
426 InstanceIdentifier<IdPool> idPoolInstanceIdentifier = idUtils.getIdPoolInstance(parentPoolName);
427 parentPoolName = parentPoolName.intern();
428 idUtils.lock(lockManager, parentPoolName);
430 // Check if the childpool already got id block.
431 long availableIdCount =
432 localIdPool.getAvailableIds().getAvailableIdCount()
433 + localIdPool.getReleasedIds().getAvailableIdCount();
434 if (availableIdCount > 0) {
435 return availableIdCount;
437 return txRunner.applyWithNewReadWriteTransactionAndSubmit(CONFIGURATION, confTx -> {
438 Optional<IdPool> parentIdPool = confTx.read(idPoolInstanceIdentifier).get();
439 if (parentIdPool.isPresent()) {
440 return allocateIdBlockFromParentPool(localIdPool, parentIdPool.get(), confTx);
442 throw new ExpectedDataObjectNotFoundException(LogicalDatastoreType.CONFIGURATION,
443 idPoolInstanceIdentifier);
446 } catch (InterruptedException | ExecutionException e) {
447 throw new IdManagerException("Error getting id block from parent pool", e);
449 idUtils.unlock(lockManager, parentPoolName);
453 private long allocateIdBlockFromParentPool(IdLocalPool localPoolCache, IdPool parentIdPool,
454 TypedWriteTransaction<Configuration> confTx)
455 throws OperationFailedException, IdManagerException {
457 long idCount = allocateIdBlockFromAvailableIdsHolder(localPoolCache, parentIdPool, confTx);
462 ReleasedIdsHolderBuilder releasedIdsBuilderParent = IdUtils.getReleaseIdsHolderBuilder(parentIdPool);
463 final List<DelayedIdEntries> delayedEntries = releasedIdsBuilderParent.getDelayedIdEntries();
464 if (delayedEntries != null) {
465 releasedIdsBuilderParent.setDelayedIdEntries(new ArrayList<>(delayedEntries));
469 idCount = allocateIdBlockFromReleasedIdsHolder(localPoolCache, releasedIdsBuilderParent, parentIdPool,
474 idCount = getIdsFromOtherChildPools(releasedIdsBuilderParent, parentIdPool);
476 if (LOG.isDebugEnabled()) {
477 LOG.debug("Unable to allocate Id block from global pool");
479 throw new IdManagerException(String.format("Ids exhausted for pool : %s", parentIdPool.getPoolName()));
482 idCount = allocateIdBlockFromAvailableIdsHolder(localPoolCache, parentIdPool, confTx);
489 private long getIdsFromOtherChildPools(ReleasedIdsHolderBuilder releasedIdsBuilderParent, IdPool parentIdPool)
490 throws OperationFailedException {
491 @Nullable Map<ChildPoolsKey, ChildPools> childPoolsMap = parentIdPool.nonnullChildPools();
492 // Sorting the child pools on last accessed time so that the pool that
493 // was not accessed for a long time comes first.
495 List<ChildPools> list = childPoolsMap.values().stream().collect(Collectors.toList());
496 list.sort(comparing(ChildPools::getLastAccessTime));
498 long currentTime = System.currentTimeMillis() / 1000;
499 for (ChildPools childPools : childPoolsMap.values()) {
500 if (childPools.getLastAccessTime().toJava() + DEFAULT_IDLE_TIME > currentTime) {
503 if (!Objects.equals(childPools.getChildPoolName(), idUtils.getLocalPoolName(parentIdPool.getPoolName()))) {
504 InstanceIdentifier<IdPool> idPoolInstanceIdentifier = idUtils
505 .getIdPoolInstance(childPools.getChildPoolName());
506 IdPool otherChildPool =
507 singleTxDB.syncRead(LogicalDatastoreType.CONFIGURATION, idPoolInstanceIdentifier);
508 ReleasedIdsHolderBuilder releasedIds = IdUtils.getReleaseIdsHolderBuilder(otherChildPool);
510 List<DelayedIdEntries> delayedIdEntriesChild = releasedIds.getDelayedIdEntries();
511 List<DelayedIdEntries> delayedIdEntriesParent = releasedIdsBuilderParent.getDelayedIdEntries();
512 if (delayedIdEntriesParent == null) {
513 delayedIdEntriesParent = new LinkedList<>();
515 delayedIdEntriesParent.addAll(delayedIdEntriesChild);
516 delayedIdEntriesChild.clear();
518 AvailableIdsHolderBuilder availableIds = idUtils.getAvailableIdsHolderBuilder(otherChildPool);
519 while (idUtils.isIdAvailable(availableIds)) {
520 long cursor = availableIds.getCursor() + 1;
521 delayedIdEntriesParent.add(idUtils.createDelayedIdEntry(cursor, currentTime));
522 availableIds.setCursor(cursor);
525 long totalAvailableIdCount = releasedIds.getDelayedIdEntries().size()
526 + idUtils.getAvailableIdsCount(availableIds);
527 long count = releasedIdsBuilderParent.getAvailableIdCount().toJava() + totalAvailableIdCount;
528 releasedIdsBuilderParent.setDelayedIdEntries(delayedIdEntriesParent).setAvailableIdCount(count);
529 singleTxDB.syncUpdate(LogicalDatastoreType.CONFIGURATION, idPoolInstanceIdentifier,
530 new IdPoolBuilder().withKey(new IdPoolKey(otherChildPool.getPoolName()))
531 .setAvailableIdsHolder(availableIds.build()).setReleasedIdsHolder(releasedIds.build())
533 return totalAvailableIdCount;
539 private long allocateIdBlockFromReleasedIdsHolder(IdLocalPool localIdPool,
540 ReleasedIdsHolderBuilder releasedIdsBuilderParent, IdPool parentIdPool,
541 TypedWriteTransaction<Configuration> confTx) {
542 if (releasedIdsBuilderParent.getAvailableIdCount().toJava() == 0) {
543 LOG.debug("Ids unavailable in releasedIds of parent pool {}", parentIdPool);
546 List<DelayedIdEntries> delayedIdEntriesParent = releasedIdsBuilderParent.getDelayedIdEntries();
547 int idCount = Math.min(delayedIdEntriesParent.size(), parentIdPool.getBlockSize().toJava());
548 List<DelayedIdEntries> idEntriesToBeRemoved = delayedIdEntriesParent.subList(0, idCount);
549 ReleasedIdHolder releasedIds = (ReleasedIdHolder) localIdPool.getReleasedIds();
550 List<DelayedIdEntry> delayedIdEntriesLocalCache = releasedIds.getDelayedEntries();
551 List<DelayedIdEntry> delayedIdEntriesFromParentPool = idEntriesToBeRemoved
553 .map(delayedIdEntry -> new DelayedIdEntry(delayedIdEntry
554 .getId().toJava(), delayedIdEntry.getReadyTimeSec().toJava()))
555 .sorted(comparing(DelayedIdEntry::getReadyTimeSec))
556 .collect(toCollection(ArrayList::new));
557 delayedIdEntriesFromParentPool.addAll(delayedIdEntriesLocalCache);
558 releasedIds.replaceDelayedEntries(delayedIdEntriesFromParentPool);
559 releasedIds.setAvailableIdCount(releasedIds.getAvailableIdCount() + idCount);
560 localIdPool.setReleasedIds(releasedIds);
561 delayedIdEntriesParent.removeAll(idEntriesToBeRemoved);
562 releasedIdsBuilderParent.setDelayedIdEntries(delayedIdEntriesParent);
563 InstanceIdentifier<ReleasedIdsHolder> releasedIdsHolderInstanceIdentifier = InstanceIdentifier
564 .builder(IdPools.class).child(IdPool.class,
565 new IdPoolKey(parentIdPool.getPoolName())).child(ReleasedIdsHolder.class).build();
566 releasedIdsBuilderParent.setAvailableIdCount(releasedIdsBuilderParent.getAvailableIdCount().toJava() - idCount);
567 LOG.debug("Allocated {} ids from releasedIds of parent pool {}", idCount, parentIdPool);
568 confTx.mergeParentStructureMerge(releasedIdsHolderInstanceIdentifier, releasedIdsBuilderParent.build());
572 private long allocateIdBlockFromAvailableIdsHolder(IdLocalPool localIdPool, IdPool parentIdPool,
573 TypedWriteTransaction<Configuration> confTx) {
575 AvailableIdsHolderBuilder availableIdsBuilderParent = idUtils.getAvailableIdsHolderBuilder(parentIdPool);
576 long end = availableIdsBuilderParent.getEnd().toJava();
577 long cur = availableIdsBuilderParent.getCursor();
578 if (!idUtils.isIdAvailable(availableIdsBuilderParent)) {
579 if (LOG.isDebugEnabled()) {
580 LOG.debug("Ids exhausted in parent pool {}", parentIdPool);
584 // Update availableIdsHolder of Local Pool
585 idCount = Math.min(end - cur, parentIdPool.getBlockSize().toJava());
586 AvailableIdHolder availableIds = new AvailableIdHolder(idUtils, cur + 1, cur + idCount);
587 localIdPool.setAvailableIds(availableIds);
588 // Update availableIdsHolder of Global Pool
589 InstanceIdentifier<AvailableIdsHolder> availableIdsHolderInstanceIdentifier = InstanceIdentifier
590 .builder(IdPools.class).child(IdPool.class,
591 new IdPoolKey(parentIdPool.getPoolName())).child(AvailableIdsHolder.class).build();
592 availableIdsBuilderParent.setCursor(cur + idCount);
593 if (LOG.isDebugEnabled()) {
594 LOG.debug("Allocated {} ids from availableIds of global pool {}", idCount, parentIdPool);
596 confTx.mergeParentStructureMerge(availableIdsHolderInstanceIdentifier, availableIdsBuilderParent.build());
600 private ReleaseIdOutputBuilder releaseIdFromLocalPool(String parentPoolName, String localPoolName, String idKey)
601 throws IdManagerException, ReadFailedException, ExecutionException, InterruptedException {
602 String idLatchKey = idUtils.getUniqueKey(parentPoolName, idKey);
603 LOG.debug("Releasing ID {} from pool {}", idKey, localPoolName);
604 CountDownLatch latch = idUtils.getReleaseIdLatch(idLatchKey);
607 if (!latch.await(10, TimeUnit.SECONDS)) {
608 LOG.warn("Timed out while releasing id {} from id pool {}", idKey, parentPoolName);
610 } catch (InterruptedException ignored) {
611 LOG.warn("Thread interrupted while releasing id {} from id pool {}", idKey, parentPoolName);
613 idUtils.removeReleaseIdLatch(idLatchKey);
616 localPoolName = localPoolName.intern();
617 InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier = idUtils.getIdPoolInstance(parentPoolName);
618 IdPool parentIdPool = singleTxDB.syncRead(LogicalDatastoreType.CONFIGURATION, parentIdPoolInstanceIdentifier);
619 @Nullable Map<IdEntriesKey, IdEntries> idEntries = parentIdPool.getIdEntries();
620 if (idEntries == null) {
621 throw new IdDoesNotExistException(parentPoolName, idKey);
623 InstanceIdentifier<IdEntries> existingId = idUtils.getIdEntry(parentIdPoolInstanceIdentifier, idKey);
624 Optional<IdEntries> existingIdEntryObject =
625 singleTxDB.syncReadOptional(LogicalDatastoreType.CONFIGURATION, existingId);
626 if (!existingIdEntryObject.isPresent()) {
627 LOG.info("Specified Id key {} does not exist in id pool {}", idKey, parentPoolName);
628 idUtils.unlock(lockManager, idLatchKey);
629 throw new IdManagerException(String.format("Specified Id key %s does not exist in id pool %s",
630 idKey, parentPoolName));
632 IdEntries existingIdEntry = existingIdEntryObject.get();
633 List<Uint32> idValuesList = nonnull(existingIdEntry.getIdValue());
634 IdLocalPool localIdPoolCache = localPool.get(parentPoolName);
635 boolean isRemoved = idEntries.values().contains(existingIdEntry);
636 LOG.debug("The entry {} is removed {}", existingIdEntry, isRemoved);
637 updateDelayedEntriesInLocalCache(idValuesList, parentPoolName, localIdPoolCache);
638 IdHolderSyncJob poolSyncJob = new IdHolderSyncJob(localPoolName, localIdPoolCache.getReleasedIds(), txRunner,
640 jobCoordinator.enqueueJob(localPoolName, poolSyncJob, IdUtils.RETRY_COUNT);
641 scheduleCleanUpTask(localIdPoolCache, parentPoolName, parentIdPool.getBlockSize().toJava());
642 LOG.debug("Released id ({}, {}) from pool {}", idKey, idValuesList, localPoolName);
643 // Updating id entries in the parent pool. This will be used for restart scenario
644 UpdateIdEntryJob job = new UpdateIdEntryJob(parentPoolName, localPoolName, idKey, null, txRunner, idUtils,
646 jobCoordinator.enqueueJob(parentPoolName, job, IdUtils.RETRY_COUNT);
647 return new ReleaseIdOutputBuilder().setIdValues(idValuesList);
650 private void scheduleCleanUpTask(final IdLocalPool localIdPoolCache,
651 final String parentPoolName, final int blockSize) {
652 TimerTask scheduledTask = new TimerTask() {
656 new CleanUpJob(localIdPoolCache, txRunner, broker, parentPoolName, blockSize, lockManager,
657 idUtils, jobCoordinator);
658 jobCoordinator.enqueueJob(localIdPoolCache.getPoolName(), job, IdUtils.RETRY_COUNT);
661 cleanJobTimer.schedule(scheduledTask, IdUtils.DEFAULT_DELAY_TIME * 1000);
664 private IdPool createGlobalPool(TypedReadWriteTransaction<Configuration> confTx, String poolName, long low,
665 long high, long blockSize) throws IdManagerException {
667 InstanceIdentifier<IdPool> idPoolInstanceIdentifier = idUtils.getIdPoolInstance(poolName);
669 Optional<IdPool> existingIdPool = confTx.read(idPoolInstanceIdentifier).get();
670 if (!existingIdPool.isPresent()) {
671 if (LOG.isDebugEnabled()) {
672 LOG.debug("Creating new global pool {}", poolName);
674 idPool = idUtils.createGlobalPool(poolName, low, high, blockSize);
675 confTx.mergeParentStructurePut(idPoolInstanceIdentifier, idPool);
677 idPool = existingIdPool.get();
678 if (LOG.isDebugEnabled()) {
679 LOG.debug("GlobalPool exists {}", idPool);
683 } catch (ExecutionException | InterruptedException e) {
684 throw new IdManagerException("Error retrieving the existing id pool for " + poolName, e);
688 private IdLocalPool createLocalPool(TypedWriteTransaction<Configuration> confTx, String localPoolName,
690 throws OperationFailedException, IdManagerException {
691 localPoolName = localPoolName.intern();
692 IdLocalPool idLocalPool = new IdLocalPool(idUtils, localPoolName);
693 allocateIdBlockFromParentPool(idLocalPool, idPool, confTx);
694 String parentPool = idPool.getPoolName();
695 localPool.put(parentPool, idLocalPool);
696 LocalPoolCreateJob job = new LocalPoolCreateJob(idLocalPool, txRunner, idPool.getPoolName(),
697 idPool.getBlockSize().toJava(), idUtils);
698 jobCoordinator.enqueueJob(localPoolName, job, IdUtils.RETRY_COUNT);
702 private void deletePool(String poolName) {
703 LocalPoolDeleteJob job = new LocalPoolDeleteJob(poolName, txRunner, idUtils);
704 jobCoordinator.enqueueJob(poolName, job, IdUtils.RETRY_COUNT);
707 public void poolDeleted(String parentPoolName, String poolName) {
708 IdLocalPool idLocalPool = localPool.get(parentPoolName);
709 if (idLocalPool != null) {
710 if (idLocalPool.getPoolName().equals(poolName)) {
711 localPool.remove(parentPoolName);
716 private void updateDelayedEntriesInLocalCache(List<Uint32> idsList, String parentPoolName,
717 IdLocalPool localPoolCache) {
718 for (Uint32 idValue : idsList) {
719 localPoolCache.getReleasedIds().addId(idValue.toJava());
721 localPool.put(parentPoolName, localPoolCache);
724 private List<Uint32> checkForIdInIdEntries(String parentPoolName, String idKey, String uniqueIdKey,
725 CompletableFuture<List<Uint32>> futureIdValues, boolean hasExistingFutureIdValues)
726 throws IdManagerException, InterruptedException , ExecutionException {
727 InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier = idUtils.getIdPoolInstance(parentPoolName);
728 InstanceIdentifier<IdEntries> existingId = idUtils.getIdEntry(parentIdPoolInstanceIdentifier, idKey);
729 idUtils.lock(lockManager, uniqueIdKey);
730 List<Uint32> newIdValuesList = new ArrayList<>();
731 Optional<IdEntries> existingIdEntry =
732 singleTxDB.syncReadOptional(LogicalDatastoreType.CONFIGURATION, existingId);
733 if (existingIdEntry.isPresent()) {
734 newIdValuesList = existingIdEntry.get().getIdValue();
735 LOG.debug("Existing ids {} for the key {} ", newIdValuesList, idKey);
736 // Inform other waiting threads about this new value.
737 futureIdValues.complete(newIdValuesList);
738 // This is to avoid stale entries in the map. If this thread had populated the map,
739 // then the entry should be removed.
740 if (!hasExistingFutureIdValues) {
741 idUtils.removeAllocatedIds(uniqueIdKey);
743 idUtils.unlock(lockManager, uniqueIdKey);
744 return newIdValuesList;
746 return newIdValuesList;
749 private IdLocalPool getOrCreateLocalIdPool(String parentPoolName, String localPoolName)
750 throws IdManagerException {
751 IdLocalPool localIdPool = localPool.get(parentPoolName);
752 if (localIdPool == null) {
753 idUtils.lock(lockManager, parentPoolName);
755 // Check if a previous thread that got the cluster-wide lock
756 // first, has created the localPool
758 InstanceIdentifier<IdPool> childIdPoolInstanceIdentifier = idUtils
759 .getIdPoolInstance(localPoolName);
760 Optional<IdPool> childIdPoolOpt = singleTxDB.syncReadOptional(LogicalDatastoreType.CONFIGURATION,
761 childIdPoolInstanceIdentifier);
762 if (childIdPoolOpt.isPresent()) {
763 updateLocalIdPoolCache(childIdPoolOpt.get(), parentPoolName);
766 catch (ExecutionException | InterruptedException ex) {
767 LOG.debug("Failed to read id pool {} due to {}", localPoolName, ex.getMessage());
769 if (localPool.get(parentPoolName) == null) {
771 return txRunner.applyWithNewReadWriteTransactionAndSubmit(CONFIGURATION, confTx -> {
772 InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier = idUtils
773 .getIdPoolInstance(parentPoolName);
774 Optional<IdPool> parentIdPool = confTx.read(parentIdPoolInstanceIdentifier).get();
775 if (parentIdPool.isPresent()) {
776 // Return localIdPool
777 return createLocalPool(confTx, localPoolName, parentIdPool.get());
779 throw new ExpectedDataObjectNotFoundException(LogicalDatastoreType.CONFIGURATION,
780 parentIdPoolInstanceIdentifier);
783 } catch (InterruptedException | ExecutionException e) {
784 throw new IdManagerException("Error creating a local id pool", e);
787 localIdPool = localPool.get(parentPoolName);
790 idUtils.unlock(lockManager, parentPoolName);
796 private void getRangeOfIds(String parentPoolName, String localPoolName, long size, List<Uint32> newIdValuesList,
797 IdLocalPool localIdPool, long newIdValue) throws ReadFailedException, IdManagerException {
798 InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier1 = idUtils.getIdPoolInstance(parentPoolName);
799 IdPool parentIdPool = singleTxDB.syncRead(LogicalDatastoreType.CONFIGURATION, parentIdPoolInstanceIdentifier1);
800 long totalAvailableIdCount = localIdPool.getAvailableIds().getAvailableIdCount()
801 + localIdPool.getReleasedIds().getAvailableIdCount();
802 AvailableIdsHolderBuilder availableParentIds = idUtils.getAvailableIdsHolderBuilder(parentIdPool);
803 ReleasedIdsHolderBuilder releasedParentIds = IdUtils.getReleaseIdsHolderBuilder(parentIdPool);
804 totalAvailableIdCount = totalAvailableIdCount + releasedParentIds.getAvailableIdCount().toJava()
805 + idUtils.getAvailableIdsCount(availableParentIds);
806 if (totalAvailableIdCount > size) {
809 newIdValue = getIdFromLocalPoolCache(localIdPool, parentPoolName);
810 } catch (IdManagerException e) {
811 if (LOG.isDebugEnabled()) {
812 LOG.debug("Releasing IDs to pool {}", localPoolName);
814 // Releasing the IDs added in newIdValuesList since
815 // a null list would be returned now, as the
816 // requested size of list IDs exceeds the number of
818 updateDelayedEntriesInLocalCache(newIdValuesList, parentPoolName, localIdPool);
820 newIdValuesList.add(Uint32.valueOf(newIdValue));
824 throw new IdManagerException(String.format("Ids exhausted for pool : %s", parentPoolName));