2 * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.genius.idmanager;
10 import static java.util.Comparator.comparing;
11 import static java.util.stream.Collectors.toCollection;
12 import static org.opendaylight.mdsal.binding.util.Datastore.CONFIGURATION;
13 import static org.opendaylight.yangtools.yang.binding.CodeHelpers.nonnull;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import java.util.ArrayList;
19 import java.util.Collections;
20 import java.util.HashMap;
21 import java.util.LinkedList;
22 import java.util.List;
24 import java.util.Objects;
25 import java.util.Optional;
26 import java.util.Timer;
27 import java.util.TimerTask;
28 import java.util.concurrent.CompletableFuture;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.ConcurrentMap;
31 import java.util.concurrent.CountDownLatch;
32 import java.util.concurrent.ExecutionException;
33 import java.util.concurrent.TimeUnit;
34 import java.util.stream.Collectors;
35 import javax.annotation.PostConstruct;
36 import javax.annotation.PreDestroy;
37 import javax.inject.Inject;
38 import javax.inject.Singleton;
39 import org.apache.aries.blueprint.annotation.service.Reference;
40 import org.eclipse.jdt.annotation.Nullable;
41 import org.opendaylight.daexim.DataImportBootReady;
42 import org.opendaylight.genius.datastoreutils.ExpectedDataObjectNotFoundException;
43 import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
44 import org.opendaylight.genius.idmanager.ReleasedIdHolder.DelayedIdEntry;
45 import org.opendaylight.genius.idmanager.api.IdManagerMonitor;
46 import org.opendaylight.genius.idmanager.jobs.CleanUpJob;
47 import org.opendaylight.genius.idmanager.jobs.IdHolderSyncJob;
48 import org.opendaylight.genius.idmanager.jobs.LocalPoolCreateJob;
49 import org.opendaylight.genius.idmanager.jobs.LocalPoolDeleteJob;
50 import org.opendaylight.genius.idmanager.jobs.UpdateIdEntryJob;
51 import org.opendaylight.genius.mdsalutil.NwConstants;
52 import org.opendaylight.genius.mdsalutil.interfaces.ShardStatusMonitor;
53 import org.opendaylight.infrautils.jobcoordinator.JobCoordinator;
54 import org.opendaylight.mdsal.binding.api.DataBroker;
55 import org.opendaylight.mdsal.binding.util.Datastore.Configuration;
56 import org.opendaylight.mdsal.binding.util.ManagedNewTransactionRunner;
57 import org.opendaylight.mdsal.binding.util.ManagedNewTransactionRunnerImpl;
58 import org.opendaylight.mdsal.binding.util.TypedReadWriteTransaction;
59 import org.opendaylight.mdsal.binding.util.TypedWriteTransaction;
60 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
61 import org.opendaylight.mdsal.common.api.ReadFailedException;
62 import org.opendaylight.serviceutils.tools.rpc.FutureRpcResults;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdInput;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdOutput;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdOutputBuilder;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdRangeInput;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdRangeOutput;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdRangeOutputBuilder;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.CreateIdPoolInput;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.CreateIdPoolOutput;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.CreateIdPoolOutputBuilder;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.DeleteIdPoolInput;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.DeleteIdPoolOutput;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.IdManagerService;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.IdPools;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.ReleaseIdInput;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.ReleaseIdOutput;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.ReleaseIdOutputBuilder;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.IdPool;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.IdPoolBuilder;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.IdPoolKey;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.AvailableIdsHolder;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.AvailableIdsHolderBuilder;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.ChildPools;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.ChildPoolsKey;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.IdEntries;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.IdEntriesKey;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.ReleasedIdsHolder;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.ReleasedIdsHolderBuilder;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.released.ids.DelayedIdEntries;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockManagerService;
92 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
93 import org.opendaylight.yangtools.yang.common.OperationFailedException;
94 import org.opendaylight.yangtools.yang.common.RpcResult;
95 import org.opendaylight.yangtools.yang.common.Uint32;
96 import org.slf4j.Logger;
97 import org.slf4j.LoggerFactory;
100 public class IdManager implements IdManagerService, IdManagerMonitor {
101 private static final Logger LOG = LoggerFactory.getLogger(IdManager.class);
102 private static final long DEFAULT_IDLE_TIME = 24 * 60 * 60;
104 private final DataBroker broker;
105 private final ManagedNewTransactionRunner txRunner;
106 private final SingleTransactionDataBroker singleTxDB;
107 private final LockManagerService lockManager;
108 private final IdUtils idUtils;
109 private final JobCoordinator jobCoordinator;
111 private final ConcurrentMap<String, IdLocalPool> localPool;
112 private final Timer cleanJobTimer = new Timer();
115 public IdManager(DataBroker db, LockManagerService lockManager, IdUtils idUtils,
116 @Reference DataImportBootReady dataImportBootReady,
117 @Reference JobCoordinator jobCoordinator, @Reference ShardStatusMonitor shardStatusMonitor)
118 throws ReadFailedException, InterruptedException {
120 this.txRunner = new ManagedNewTransactionRunnerImpl(db);
121 this.singleTxDB = new SingleTransactionDataBroker(db);
122 this.lockManager = lockManager;
123 this.idUtils = idUtils;
124 this.jobCoordinator = jobCoordinator;
126 // NB: We do not "use" the DataImportBootReady, but it's presence in the OSGi
127 // Service Registry is the required "signal" that the Daexim "import on boot"
128 // has fully completed (which we want to wait for). Therefore, making this
129 // dependent on that defers the Blueprint initialization, as we'd like to,
130 // so that we do not start giving out new IDs before an import went in.
131 // Thus, please DO NOT remove the DataImportBootReady argument, even if
132 // it appears to be (is) un-used from a Java code PoV!
134 this.localPool = new ConcurrentHashMap<>();
135 boolean isDatastoreAvailable = false;
137 int totalRetry = 1000;
139 while (retryCount < totalRetry) {
140 isDatastoreAvailable = shardStatusMonitor.getShardStatus(NwConstants.IdManagerShards.getShardList());
141 if (isDatastoreAvailable) {
142 LOG.info("IDManager is UP");
146 LOG.error("IdManager: retrying shard status check for the {} time, pending retries {}",
147 ++retryCount, totalRetry - retryCount);
150 } catch (InterruptedException e) {
151 LOG.error("IDManager is DOWN, shard status check failed");
154 if (!isDatastoreAvailable) {
155 LOG.error("IDManager is DOWN, as shards were not available at bundle bringup");
160 public Map<String, String> getLocalPoolsDetails() {
161 Map<String, String> map = new HashMap<>();
162 localPool.forEach((key, value) -> map.put(key, value.toString()));
167 public void start() {
168 LOG.info("{} start", getClass().getSimpleName());
172 public void close() {
173 cleanJobTimer.cancel();
175 LOG.info("{} close", getClass().getSimpleName());
178 private void populateCache() throws InterruptedException {
179 // If IP changes during reboot, then there will be orphaned child pools.
180 InstanceIdentifier<IdPools> idPoolsInstance = idUtils.getIdPools();
181 Optional<IdPools> idPoolsOptional;
184 idPoolsOptional = singleTxDB.syncReadOptional(LogicalDatastoreType.CONFIGURATION, idPoolsInstance);
186 } catch (ExecutionException e) {
187 LOG.error("Failed to read the id pools due to error. Retrying again...", e);
191 if (!idPoolsOptional.isPresent()) {
194 idPoolsOptional.get().nonnullIdPool().values()
196 .filter(idPool -> idPool.getParentPoolName() != null
197 && !idPool.getParentPoolName().isEmpty()
198 && idUtils.getLocalPoolName(idPool.getParentPoolName())
199 .equals(idPool.getPoolName()))
201 idPool -> updateLocalIdPoolCache(idPool,
202 idPool.getParentPoolName()));
205 public boolean updateLocalIdPoolCache(IdPool idPool, String parentPoolName) {
206 AvailableIdsHolder availableIdsHolder = idPool.getAvailableIdsHolder();
207 AvailableIdHolder availableIdHolder = new AvailableIdHolder(idUtils, availableIdsHolder.getStart().toJava(),
208 availableIdsHolder.getEnd().toJava());
209 availableIdHolder.setCur(availableIdsHolder.getCursor());
210 ReleasedIdsHolder releasedIdsHolder = idPool.getReleasedIdsHolder();
211 ReleasedIdHolder releasedIdHolder = new ReleasedIdHolder(idUtils,
212 releasedIdsHolder.getDelayedTimeSec().toJava());
213 releasedIdHolder.setAvailableIdCount(releasedIdsHolder.getAvailableIdCount().toJava());
214 List<DelayedIdEntry> delayedIdEntryInCache = releasedIdsHolder.nonnullDelayedIdEntries()
216 .map(delayedIdEntry -> new DelayedIdEntry(delayedIdEntry
217 .getId().toJava(), delayedIdEntry.getReadyTimeSec().toJava()))
218 .sorted(comparing(DelayedIdEntry::getReadyTimeSec))
219 .collect(toCollection(ArrayList::new));
221 releasedIdHolder.replaceDelayedEntries(delayedIdEntryInCache);
223 IdLocalPool idLocalPool = new IdLocalPool(idUtils, idPool.getPoolName());
224 idLocalPool.setAvailableIds(availableIdHolder);
225 idLocalPool.setReleasedIds(releasedIdHolder);
226 localPool.put(parentPoolName, idLocalPool);
227 if (LOG.isDebugEnabled()) {
228 LOG.debug("Populating cache for {} with {}", idLocalPool.getPoolName(), idLocalPool);
234 public ListenableFuture<RpcResult<CreateIdPoolOutput>> createIdPool(CreateIdPoolInput input) {
235 LOG.info("createIdPool called with input {}", input);
236 long low = input.getLow().toJava();
237 long high = input.getHigh().toJava();
238 long blockSize = idUtils.computeBlockSize(low, high);
239 return FutureRpcResults.fromListenableFuture(LOG, "createIdPool", input, () -> {
240 String poolName = input.getPoolName().intern();
242 idUtils.lock(lockManager, poolName);
243 return Futures.transform(txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, confTx -> {
244 IdPool idPool = createGlobalPool(confTx, poolName, low, high, blockSize);
245 String localPoolName = idUtils.getLocalPoolName(poolName);
246 IdLocalPool idLocalPool = localPool.get(poolName);
247 if (idLocalPool == null) {
248 createLocalPool(confTx, localPoolName, idPool);
249 idUtils.updateChildPool(confTx, idPool.getPoolName(), localPoolName);
251 }), unused -> new CreateIdPoolOutputBuilder().build(), MoreExecutors.directExecutor());
253 idUtils.unlock(lockManager, poolName);
259 public ListenableFuture<RpcResult<AllocateIdOutput>> allocateId(AllocateIdInput input) {
260 String idKey = input.getIdKey();
261 String poolName = input.getPoolName();
262 return FutureRpcResults.fromBuilder(LOG, "allocateId", input, () -> {
263 String localPoolName = idUtils.getLocalPoolName(poolName);
264 // allocateIdFromLocalPool method returns a list of IDs with one element. This element is obtained by get(0)
265 long newIdValue = allocateIdFromLocalPool(poolName, localPoolName, idKey, 1).get(0).toJava();
266 return new AllocateIdOutputBuilder().setIdValue(newIdValue);
267 }).onFailure(e -> completeExceptionallyIfPresent(poolName, idKey, e)).build();
270 private void completeExceptionallyIfPresent(String poolName, String idKey, Throwable exception) {
271 CompletableFuture<List<Uint32>> completableFuture =
272 idUtils.removeAllocatedIds(idUtils.getUniqueKey(poolName, idKey));
273 if (completableFuture != null) {
274 completableFuture.completeExceptionally(exception);
279 public ListenableFuture<RpcResult<AllocateIdRangeOutput>> allocateIdRange(AllocateIdRangeInput input) {
280 String idKey = input.getIdKey();
281 String poolName = input.getPoolName();
282 long size = input.getSize().toJava();
283 String localPoolName = idUtils.getLocalPoolName(poolName);
284 AllocateIdRangeOutputBuilder output = new AllocateIdRangeOutputBuilder();
285 return FutureRpcResults.fromBuilder(LOG, "allocateIdRange", input, () -> {
286 List<Uint32> newIdValuesList = allocateIdFromLocalPool(poolName, localPoolName, idKey, size);
287 Collections.sort(newIdValuesList);
288 output.setIdValues(newIdValuesList);
290 }).onFailure(e -> completeExceptionallyIfPresent(poolName, idKey, e)).build();
294 public ListenableFuture<RpcResult<DeleteIdPoolOutput>> deleteIdPool(DeleteIdPoolInput input) {
295 return FutureRpcResults.fromListenableFuture(LOG, "deleteIdPool", input, () -> {
296 String poolName = input.getPoolName().intern();
297 InstanceIdentifier<IdPool> idPoolToBeDeleted = idUtils.getIdPoolInstance(poolName);
298 synchronized (poolName) {
299 IdPool idPool = singleTxDB.syncRead(LogicalDatastoreType.CONFIGURATION, idPoolToBeDeleted);
300 //List<ChildPools> childPoolList = idPool.getChildPools();
303 @Nullable Map<ChildPoolsKey, ChildPools> childPoolList = idPool.getChildPools();
304 if (childPoolList != null) {
305 childPoolList.forEach((childPool, childPools) -> {
306 deletePool(childPool.getChildPoolName());
309 singleTxDB.syncDelete(LogicalDatastoreType.CONFIGURATION, idPoolToBeDeleted);
311 // TODO return the Future from a TBD asyncDelete instead.. BUT check that all callers @CheckReturnValue
312 return Futures.immediateFuture((DeleteIdPoolOutput) null);
317 public ListenableFuture<RpcResult<ReleaseIdOutput>> releaseId(ReleaseIdInput input) {
318 String poolName = input.getPoolName();
319 String idKey = input.getIdKey();
320 String uniqueKey = idUtils.getUniqueKey(poolName, idKey);
321 return FutureRpcResults.fromBuilder(LOG, "releaseId", input, () -> {
322 idUtils.lock(lockManager, uniqueKey);
323 return releaseIdFromLocalPool(poolName, idUtils.getLocalPoolName(poolName), idKey);
324 }).onFailureLogLevel(FutureRpcResults.LogLevel.NONE)
326 if (e instanceof IdDoesNotExistException) {
327 // Do not log full stack trace in case ID does not exist
328 LOG.error("RPC releaseId() failed due to IdDoesNotExistException; input = {}", input);
330 // But for all other cases do:
331 LOG.error("RPC releaseId() failed; input = {}", input, e);
333 idUtils.unlock(lockManager, uniqueKey);
337 private List<Uint32> allocateIdFromLocalPool(String parentPoolName, String localPoolName,
338 String idKey, long size) throws OperationFailedException, IdManagerException, ExecutionException,
339 InterruptedException {
340 LOG.debug("Allocating id from local pool {}. Parent pool {}. Idkey {}", localPoolName, parentPoolName, idKey);
341 String uniqueIdKey = idUtils.getUniqueKey(parentPoolName, idKey);
342 CompletableFuture<List<Uint32>> futureIdValues = new CompletableFuture<>();
343 CompletableFuture<List<Uint32>> existingFutureIdValue =
344 idUtils.putAllocatedIdsIfAbsent(uniqueIdKey, futureIdValues);
345 if (existingFutureIdValue != null) {
347 return existingFutureIdValue.get();
348 } catch (InterruptedException | ExecutionException e) {
349 LOG.warn("Could not obtain id from existing futureIdValue for idKey {} and pool {}.",
350 idKey, parentPoolName);
351 throw new IdManagerException(e.getMessage(), e);
355 List<Uint32> newIdValuesList = checkForIdInIdEntries(parentPoolName, idKey, uniqueIdKey, futureIdValues,
357 if (!newIdValuesList.isEmpty()) {
358 return newIdValuesList;
360 //This get will not help in concurrent reads. Hence the same read needs to be done again.
361 IdLocalPool localIdPool = getOrCreateLocalIdPool(parentPoolName, localPoolName);
362 LOG.debug("Got pool {}", localIdPool);
363 long newIdValue = -1;
364 localPoolName = localPoolName.intern();
366 newIdValue = getIdFromLocalPoolCache(localIdPool, parentPoolName);
367 newIdValuesList.add(Uint32.valueOf(newIdValue));
369 getRangeOfIds(parentPoolName, localPoolName, size, newIdValuesList, localIdPool, newIdValue);
371 LOG.debug("The newIdValues {} for the idKey {}", newIdValuesList, idKey);
372 idUtils.putReleaseIdLatch(uniqueIdKey, new CountDownLatch(1));
373 UpdateIdEntryJob job = new UpdateIdEntryJob(parentPoolName, localPoolName, idKey, newIdValuesList, txRunner,
374 idUtils, lockManager);
375 jobCoordinator.enqueueJob(parentPoolName, job, IdUtils.RETRY_COUNT);
376 futureIdValues.complete(newIdValuesList);
377 return newIdValuesList;
378 } catch (OperationFailedException | IdManagerException e) {
379 idUtils.unlock(lockManager, uniqueIdKey);
384 private Long getIdFromLocalPoolCache(IdLocalPool localIdPool, String parentPoolName)
385 throws IdManagerException {
387 IdHolder availableIds = localIdPool.getAvailableIds();
388 if (availableIds != null) {
389 Optional<Long> availableId = availableIds.allocateId();
390 if (availableId.isPresent()) {
391 IdHolderSyncJob poolSyncJob =
392 new IdHolderSyncJob(localIdPool.getPoolName(), localIdPool.getAvailableIds(), txRunner,
394 jobCoordinator.enqueueJob(localIdPool.getPoolName(), poolSyncJob, IdUtils.RETRY_COUNT);
395 return availableId.get();
398 IdHolder releasedIds = localIdPool.getReleasedIds();
399 Optional<Long> releasedId = releasedIds.allocateId();
400 if (releasedId.isPresent()) {
401 IdHolderSyncJob poolSyncJob =
402 new IdHolderSyncJob(localIdPool.getPoolName(), localIdPool.getReleasedIds(), txRunner,
404 jobCoordinator.enqueueJob(localIdPool.getPoolName(), poolSyncJob, IdUtils.RETRY_COUNT);
405 return releasedId.get();
407 long idCount = getIdBlockFromParentPool(parentPoolName, localIdPool);
409 if (LOG.isDebugEnabled()) {
410 LOG.debug("Unable to allocate Id block from global pool {}", parentPoolName);
412 throw new IdManagerException(String.format("Ids exhausted for pool : %s", parentPoolName));
418 * Changes made to availableIds and releasedIds will not be persisted to the datastore.
420 private long getIdBlockFromParentPool(String parentPoolName, IdLocalPool localIdPool)
421 throws IdManagerException {
422 if (LOG.isDebugEnabled()) {
423 LOG.debug("Allocating block of id from parent pool {}", parentPoolName);
425 InstanceIdentifier<IdPool> idPoolInstanceIdentifier = idUtils.getIdPoolInstance(parentPoolName);
426 parentPoolName = parentPoolName.intern();
427 idUtils.lock(lockManager, parentPoolName);
429 // Check if the childpool already got id block.
430 long availableIdCount =
431 localIdPool.getAvailableIds().getAvailableIdCount()
432 + localIdPool.getReleasedIds().getAvailableIdCount();
433 if (availableIdCount > 0) {
434 return availableIdCount;
436 return txRunner.applyWithNewReadWriteTransactionAndSubmit(CONFIGURATION, confTx -> {
437 Optional<IdPool> parentIdPool = confTx.read(idPoolInstanceIdentifier).get();
438 if (parentIdPool.isPresent()) {
439 return allocateIdBlockFromParentPool(localIdPool, parentIdPool.get(), confTx);
441 throw new ExpectedDataObjectNotFoundException(LogicalDatastoreType.CONFIGURATION,
442 idPoolInstanceIdentifier);
445 } catch (InterruptedException | ExecutionException e) {
446 throw new IdManagerException("Error getting id block from parent pool", e);
448 idUtils.unlock(lockManager, parentPoolName);
452 private long allocateIdBlockFromParentPool(IdLocalPool localPoolCache, IdPool parentIdPool,
453 TypedWriteTransaction<Configuration> confTx)
454 throws OperationFailedException, IdManagerException {
456 long idCount = allocateIdBlockFromAvailableIdsHolder(localPoolCache, parentIdPool, confTx);
461 ReleasedIdsHolderBuilder releasedIdsBuilderParent = IdUtils.getReleaseIdsHolderBuilder(parentIdPool);
462 final List<DelayedIdEntries> delayedEntries = releasedIdsBuilderParent.getDelayedIdEntries();
463 if (delayedEntries != null) {
464 releasedIdsBuilderParent.setDelayedIdEntries(new ArrayList<>(delayedEntries));
468 idCount = allocateIdBlockFromReleasedIdsHolder(localPoolCache, releasedIdsBuilderParent, parentIdPool,
473 idCount = getIdsFromOtherChildPools(releasedIdsBuilderParent, parentIdPool);
475 if (LOG.isDebugEnabled()) {
476 LOG.debug("Unable to allocate Id block from global pool");
478 throw new IdManagerException(String.format("Ids exhausted for pool : %s", parentIdPool.getPoolName()));
481 idCount = allocateIdBlockFromAvailableIdsHolder(localPoolCache, parentIdPool, confTx);
488 private long getIdsFromOtherChildPools(ReleasedIdsHolderBuilder releasedIdsBuilderParent, IdPool parentIdPool)
489 throws OperationFailedException {
490 @Nullable Map<ChildPoolsKey, ChildPools> childPoolsMap = parentIdPool.nonnullChildPools();
491 // Sorting the child pools on last accessed time so that the pool that
492 // was not accessed for a long time comes first.
494 List<ChildPools> list = childPoolsMap.values().stream().collect(Collectors.toList());
495 list.sort(comparing(ChildPools::getLastAccessTime));
497 long currentTime = System.currentTimeMillis() / 1000;
498 for (ChildPools childPools : childPoolsMap.values()) {
499 if (childPools.getLastAccessTime().toJava() + DEFAULT_IDLE_TIME > currentTime) {
502 if (!Objects.equals(childPools.getChildPoolName(), idUtils.getLocalPoolName(parentIdPool.getPoolName()))) {
503 InstanceIdentifier<IdPool> idPoolInstanceIdentifier = idUtils
504 .getIdPoolInstance(childPools.getChildPoolName());
505 IdPool otherChildPool =
506 singleTxDB.syncRead(LogicalDatastoreType.CONFIGURATION, idPoolInstanceIdentifier);
507 ReleasedIdsHolderBuilder releasedIds = IdUtils.getReleaseIdsHolderBuilder(otherChildPool);
509 List<DelayedIdEntries> delayedIdEntriesChild = releasedIds.getDelayedIdEntries();
510 List<DelayedIdEntries> delayedIdEntriesParent = releasedIdsBuilderParent.getDelayedIdEntries();
511 if (delayedIdEntriesParent == null) {
512 delayedIdEntriesParent = new LinkedList<>();
514 delayedIdEntriesParent.addAll(delayedIdEntriesChild);
515 delayedIdEntriesChild.clear();
517 AvailableIdsHolderBuilder availableIds = idUtils.getAvailableIdsHolderBuilder(otherChildPool);
518 while (idUtils.isIdAvailable(availableIds)) {
519 long cursor = availableIds.getCursor() + 1;
520 delayedIdEntriesParent.add(idUtils.createDelayedIdEntry(cursor, currentTime));
521 availableIds.setCursor(cursor);
524 long totalAvailableIdCount = releasedIds.getDelayedIdEntries().size()
525 + idUtils.getAvailableIdsCount(availableIds);
526 long count = releasedIdsBuilderParent.getAvailableIdCount().toJava() + totalAvailableIdCount;
527 releasedIdsBuilderParent.setDelayedIdEntries(delayedIdEntriesParent).setAvailableIdCount(count);
528 singleTxDB.syncUpdate(LogicalDatastoreType.CONFIGURATION, idPoolInstanceIdentifier,
529 new IdPoolBuilder().withKey(new IdPoolKey(otherChildPool.getPoolName()))
530 .setAvailableIdsHolder(availableIds.build()).setReleasedIdsHolder(releasedIds.build())
532 return totalAvailableIdCount;
538 private static long allocateIdBlockFromReleasedIdsHolder(IdLocalPool localIdPool,
539 ReleasedIdsHolderBuilder releasedIdsBuilderParent, IdPool parentIdPool,
540 TypedWriteTransaction<Configuration> confTx) {
541 if (releasedIdsBuilderParent.getAvailableIdCount().toJava() == 0) {
542 LOG.debug("Ids unavailable in releasedIds of parent pool {}", parentIdPool);
545 List<DelayedIdEntries> delayedIdEntriesParent = releasedIdsBuilderParent.getDelayedIdEntries();
546 int idCount = Math.min(delayedIdEntriesParent.size(), parentIdPool.getBlockSize().toJava());
547 List<DelayedIdEntries> idEntriesToBeRemoved = delayedIdEntriesParent.subList(0, idCount);
548 ReleasedIdHolder releasedIds = (ReleasedIdHolder) localIdPool.getReleasedIds();
549 List<DelayedIdEntry> delayedIdEntriesLocalCache = releasedIds.getDelayedEntries();
550 List<DelayedIdEntry> delayedIdEntriesFromParentPool = idEntriesToBeRemoved
552 .map(delayedIdEntry -> new DelayedIdEntry(delayedIdEntry
553 .getId().toJava(), delayedIdEntry.getReadyTimeSec().toJava()))
554 .sorted(comparing(DelayedIdEntry::getReadyTimeSec))
555 .collect(toCollection(ArrayList::new));
556 delayedIdEntriesFromParentPool.addAll(delayedIdEntriesLocalCache);
557 releasedIds.replaceDelayedEntries(delayedIdEntriesFromParentPool);
558 releasedIds.setAvailableIdCount(releasedIds.getAvailableIdCount() + idCount);
559 localIdPool.setReleasedIds(releasedIds);
560 delayedIdEntriesParent.removeAll(idEntriesToBeRemoved);
561 releasedIdsBuilderParent.setDelayedIdEntries(delayedIdEntriesParent);
562 InstanceIdentifier<ReleasedIdsHolder> releasedIdsHolderInstanceIdentifier = InstanceIdentifier
563 .builder(IdPools.class).child(IdPool.class,
564 new IdPoolKey(parentIdPool.getPoolName())).child(ReleasedIdsHolder.class).build();
565 releasedIdsBuilderParent.setAvailableIdCount(releasedIdsBuilderParent.getAvailableIdCount().toJava() - idCount);
566 LOG.debug("Allocated {} ids from releasedIds of parent pool {}", idCount, parentIdPool);
567 confTx.mergeParentStructureMerge(releasedIdsHolderInstanceIdentifier, releasedIdsBuilderParent.build());
571 private long allocateIdBlockFromAvailableIdsHolder(IdLocalPool localIdPool, IdPool parentIdPool,
572 TypedWriteTransaction<Configuration> confTx) {
574 AvailableIdsHolderBuilder availableIdsBuilderParent = idUtils.getAvailableIdsHolderBuilder(parentIdPool);
575 long end = availableIdsBuilderParent.getEnd().toJava();
576 long cur = availableIdsBuilderParent.getCursor();
577 if (!idUtils.isIdAvailable(availableIdsBuilderParent)) {
578 if (LOG.isDebugEnabled()) {
579 LOG.debug("Ids exhausted in parent pool {}", parentIdPool);
583 // Update availableIdsHolder of Local Pool
584 idCount = Math.min(end - cur, parentIdPool.getBlockSize().toJava());
585 AvailableIdHolder availableIds = new AvailableIdHolder(idUtils, cur + 1, cur + idCount);
586 localIdPool.setAvailableIds(availableIds);
587 // Update availableIdsHolder of Global Pool
588 InstanceIdentifier<AvailableIdsHolder> availableIdsHolderInstanceIdentifier = InstanceIdentifier
589 .builder(IdPools.class).child(IdPool.class,
590 new IdPoolKey(parentIdPool.getPoolName())).child(AvailableIdsHolder.class).build();
591 availableIdsBuilderParent.setCursor(cur + idCount);
592 if (LOG.isDebugEnabled()) {
593 LOG.debug("Allocated {} ids from availableIds of global pool {}", idCount, parentIdPool);
595 confTx.mergeParentStructureMerge(availableIdsHolderInstanceIdentifier, availableIdsBuilderParent.build());
599 private ReleaseIdOutputBuilder releaseIdFromLocalPool(String parentPoolName, String localPoolName, String idKey)
600 throws IdManagerException, ReadFailedException, ExecutionException, InterruptedException {
601 String idLatchKey = idUtils.getUniqueKey(parentPoolName, idKey);
602 LOG.debug("Releasing ID {} from pool {}", idKey, localPoolName);
603 CountDownLatch latch = idUtils.getReleaseIdLatch(idLatchKey);
606 if (!latch.await(10, TimeUnit.SECONDS)) {
607 LOG.warn("Timed out while releasing id {} from id pool {}", idKey, parentPoolName);
609 } catch (InterruptedException ignored) {
610 LOG.warn("Thread interrupted while releasing id {} from id pool {}", idKey, parentPoolName);
612 idUtils.removeReleaseIdLatch(idLatchKey);
615 localPoolName = localPoolName.intern();
616 InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier = idUtils.getIdPoolInstance(parentPoolName);
617 IdPool parentIdPool = singleTxDB.syncRead(LogicalDatastoreType.CONFIGURATION, parentIdPoolInstanceIdentifier);
618 @Nullable Map<IdEntriesKey, IdEntries> idEntries = parentIdPool.getIdEntries();
619 if (idEntries == null) {
620 throw new IdDoesNotExistException(parentPoolName, idKey);
622 InstanceIdentifier<IdEntries> existingId = idUtils.getIdEntry(parentIdPoolInstanceIdentifier, idKey);
623 Optional<IdEntries> existingIdEntryObject =
624 singleTxDB.syncReadOptional(LogicalDatastoreType.CONFIGURATION, existingId);
625 if (!existingIdEntryObject.isPresent()) {
626 LOG.info("Specified Id key {} does not exist in id pool {}", idKey, parentPoolName);
627 idUtils.unlock(lockManager, idLatchKey);
628 throw new IdManagerException(String.format("Specified Id key %s does not exist in id pool %s",
629 idKey, parentPoolName));
631 IdEntries existingIdEntry = existingIdEntryObject.get();
632 List<Uint32> idValuesList = nonnull(existingIdEntry.getIdValue());
633 IdLocalPool localIdPoolCache = localPool.get(parentPoolName);
634 boolean isRemoved = idEntries.values().contains(existingIdEntry);
635 LOG.debug("The entry {} is removed {}", existingIdEntry, isRemoved);
636 updateDelayedEntriesInLocalCache(idValuesList, parentPoolName, localIdPoolCache);
637 IdHolderSyncJob poolSyncJob = new IdHolderSyncJob(localPoolName, localIdPoolCache.getReleasedIds(), txRunner,
639 jobCoordinator.enqueueJob(localPoolName, poolSyncJob, IdUtils.RETRY_COUNT);
640 scheduleCleanUpTask(localIdPoolCache, parentPoolName, parentIdPool.getBlockSize().toJava());
641 LOG.debug("Released id ({}, {}) from pool {}", idKey, idValuesList, localPoolName);
642 // Updating id entries in the parent pool. This will be used for restart scenario
643 UpdateIdEntryJob job = new UpdateIdEntryJob(parentPoolName, localPoolName, idKey, null, txRunner, idUtils,
645 jobCoordinator.enqueueJob(parentPoolName, job, IdUtils.RETRY_COUNT);
646 return new ReleaseIdOutputBuilder().setIdValues(idValuesList);
649 private void scheduleCleanUpTask(final IdLocalPool localIdPoolCache,
650 final String parentPoolName, final int blockSize) {
651 TimerTask scheduledTask = new TimerTask() {
655 new CleanUpJob(localIdPoolCache, txRunner, broker, parentPoolName, blockSize, lockManager,
656 idUtils, jobCoordinator);
657 jobCoordinator.enqueueJob(localIdPoolCache.getPoolName(), job, IdUtils.RETRY_COUNT);
660 cleanJobTimer.schedule(scheduledTask, IdUtils.DEFAULT_DELAY_TIME * 1000);
663 private IdPool createGlobalPool(TypedReadWriteTransaction<Configuration> confTx, String poolName, long low,
664 long high, long blockSize) throws IdManagerException {
666 InstanceIdentifier<IdPool> idPoolInstanceIdentifier = idUtils.getIdPoolInstance(poolName);
668 Optional<IdPool> existingIdPool = confTx.read(idPoolInstanceIdentifier).get();
669 if (!existingIdPool.isPresent()) {
670 if (LOG.isDebugEnabled()) {
671 LOG.debug("Creating new global pool {}", poolName);
673 idPool = idUtils.createGlobalPool(poolName, low, high, blockSize);
674 confTx.mergeParentStructurePut(idPoolInstanceIdentifier, idPool);
676 idPool = existingIdPool.get();
677 if (LOG.isDebugEnabled()) {
678 LOG.debug("GlobalPool exists {}", idPool);
682 } catch (ExecutionException | InterruptedException e) {
683 throw new IdManagerException("Error retrieving the existing id pool for " + poolName, e);
687 private IdLocalPool createLocalPool(TypedWriteTransaction<Configuration> confTx, String localPoolName,
689 throws OperationFailedException, IdManagerException {
690 localPoolName = localPoolName.intern();
691 IdLocalPool idLocalPool = new IdLocalPool(idUtils, localPoolName);
692 allocateIdBlockFromParentPool(idLocalPool, idPool, confTx);
693 String parentPool = idPool.getPoolName();
694 localPool.put(parentPool, idLocalPool);
695 LocalPoolCreateJob job = new LocalPoolCreateJob(idLocalPool, txRunner, idPool.getPoolName(),
696 idPool.getBlockSize().toJava(), idUtils);
697 jobCoordinator.enqueueJob(localPoolName, job, IdUtils.RETRY_COUNT);
701 private void deletePool(String poolName) {
702 LocalPoolDeleteJob job = new LocalPoolDeleteJob(poolName, txRunner, idUtils);
703 jobCoordinator.enqueueJob(poolName, job, IdUtils.RETRY_COUNT);
706 public void poolDeleted(String parentPoolName, String poolName) {
707 IdLocalPool idLocalPool = localPool.get(parentPoolName);
708 if (idLocalPool != null) {
709 if (idLocalPool.getPoolName().equals(poolName)) {
710 localPool.remove(parentPoolName);
715 private void updateDelayedEntriesInLocalCache(List<Uint32> idsList, String parentPoolName,
716 IdLocalPool localPoolCache) {
717 for (Uint32 idValue : idsList) {
718 localPoolCache.getReleasedIds().addId(idValue.toJava());
720 localPool.put(parentPoolName, localPoolCache);
723 private List<Uint32> checkForIdInIdEntries(String parentPoolName, String idKey, String uniqueIdKey,
724 CompletableFuture<List<Uint32>> futureIdValues, boolean hasExistingFutureIdValues)
725 throws IdManagerException, InterruptedException , ExecutionException {
726 InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier = idUtils.getIdPoolInstance(parentPoolName);
727 InstanceIdentifier<IdEntries> existingId = idUtils.getIdEntry(parentIdPoolInstanceIdentifier, idKey);
728 idUtils.lock(lockManager, uniqueIdKey);
729 List<Uint32> newIdValuesList = new ArrayList<>();
730 Optional<IdEntries> existingIdEntry =
731 singleTxDB.syncReadOptional(LogicalDatastoreType.CONFIGURATION, existingId);
732 if (existingIdEntry.isPresent()) {
733 newIdValuesList = existingIdEntry.get().getIdValue();
734 LOG.debug("Existing ids {} for the key {} ", newIdValuesList, idKey);
735 // Inform other waiting threads about this new value.
736 futureIdValues.complete(newIdValuesList);
737 // This is to avoid stale entries in the map. If this thread had populated the map,
738 // then the entry should be removed.
739 if (!hasExistingFutureIdValues) {
740 idUtils.removeAllocatedIds(uniqueIdKey);
742 idUtils.unlock(lockManager, uniqueIdKey);
743 return newIdValuesList;
745 return newIdValuesList;
748 private IdLocalPool getOrCreateLocalIdPool(String parentPoolName, String localPoolName)
749 throws IdManagerException {
750 IdLocalPool localIdPool = localPool.get(parentPoolName);
751 if (localIdPool == null) {
752 idUtils.lock(lockManager, parentPoolName);
754 // Check if a previous thread that got the cluster-wide lock
755 // first, has created the localPool
757 InstanceIdentifier<IdPool> childIdPoolInstanceIdentifier = idUtils
758 .getIdPoolInstance(localPoolName);
759 Optional<IdPool> childIdPoolOpt = singleTxDB.syncReadOptional(LogicalDatastoreType.CONFIGURATION,
760 childIdPoolInstanceIdentifier);
761 if (childIdPoolOpt.isPresent()) {
762 updateLocalIdPoolCache(childIdPoolOpt.get(), parentPoolName);
765 catch (ExecutionException | InterruptedException ex) {
766 LOG.debug("Failed to read id pool {} due to {}", localPoolName, ex.getMessage());
768 if (localPool.get(parentPoolName) == null) {
770 return txRunner.applyWithNewReadWriteTransactionAndSubmit(CONFIGURATION, confTx -> {
771 InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier = idUtils
772 .getIdPoolInstance(parentPoolName);
773 Optional<IdPool> parentIdPool = confTx.read(parentIdPoolInstanceIdentifier).get();
774 if (parentIdPool.isPresent()) {
775 // Return localIdPool
776 return createLocalPool(confTx, localPoolName, parentIdPool.get());
778 throw new ExpectedDataObjectNotFoundException(LogicalDatastoreType.CONFIGURATION,
779 parentIdPoolInstanceIdentifier);
782 } catch (InterruptedException | ExecutionException e) {
783 throw new IdManagerException("Error creating a local id pool", e);
786 localIdPool = localPool.get(parentPoolName);
789 idUtils.unlock(lockManager, parentPoolName);
795 private void getRangeOfIds(String parentPoolName, String localPoolName, long size, List<Uint32> newIdValuesList,
796 IdLocalPool localIdPool, long newIdValue) throws ReadFailedException, IdManagerException {
797 InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier1 = idUtils.getIdPoolInstance(parentPoolName);
798 IdPool parentIdPool = singleTxDB.syncRead(LogicalDatastoreType.CONFIGURATION, parentIdPoolInstanceIdentifier1);
799 long totalAvailableIdCount = localIdPool.getAvailableIds().getAvailableIdCount()
800 + localIdPool.getReleasedIds().getAvailableIdCount();
801 AvailableIdsHolderBuilder availableParentIds = idUtils.getAvailableIdsHolderBuilder(parentIdPool);
802 ReleasedIdsHolderBuilder releasedParentIds = IdUtils.getReleaseIdsHolderBuilder(parentIdPool);
803 totalAvailableIdCount = totalAvailableIdCount + releasedParentIds.getAvailableIdCount().toJava()
804 + idUtils.getAvailableIdsCount(availableParentIds);
805 if (totalAvailableIdCount > size) {
808 newIdValue = getIdFromLocalPoolCache(localIdPool, parentPoolName);
809 } catch (IdManagerException e) {
810 if (LOG.isDebugEnabled()) {
811 LOG.debug("Releasing IDs to pool {}", localPoolName);
813 // Releasing the IDs added in newIdValuesList since
814 // a null list would be returned now, as the
815 // requested size of list IDs exceeds the number of
817 updateDelayedEntriesInLocalCache(newIdValuesList, parentPoolName, localIdPool);
819 newIdValuesList.add(Uint32.valueOf(newIdValue));
823 throw new IdManagerException(String.format("Ids exhausted for pool : %s", parentPoolName));