2 * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.genius.idmanager;
10 import static java.util.Comparator.comparing;
11 import static java.util.stream.Collectors.toCollection;
12 import static org.opendaylight.controller.md.sal.binding.api.WriteTransaction.CREATE_MISSING_PARENTS;
13 import static org.opendaylight.genius.idmanager.IdUtils.nullToEmpty;
14 import static org.opendaylight.genius.infra.Datastore.CONFIGURATION;
16 import com.google.common.base.Optional;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.MoreExecutors;
20 import java.util.ArrayList;
21 import java.util.Collections;
22 import java.util.HashMap;
23 import java.util.LinkedList;
24 import java.util.List;
26 import java.util.Objects;
27 import java.util.Timer;
28 import java.util.TimerTask;
29 import java.util.concurrent.CompletableFuture;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.ConcurrentMap;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.ExecutionException;
34 import java.util.concurrent.TimeUnit;
35 import javax.annotation.PostConstruct;
36 import javax.annotation.PreDestroy;
37 import javax.inject.Inject;
38 import javax.inject.Singleton;
40 import org.apache.aries.blueprint.annotation.service.Reference;
41 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
42 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
43 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
44 import org.opendaylight.daexim.DataImportBootReady;
45 import org.opendaylight.genius.datastoreutils.ExpectedDataObjectNotFoundException;
46 import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
47 import org.opendaylight.genius.idmanager.ReleasedIdHolder.DelayedIdEntry;
48 import org.opendaylight.genius.idmanager.api.IdManagerMonitor;
49 import org.opendaylight.genius.idmanager.jobs.CleanUpJob;
50 import org.opendaylight.genius.idmanager.jobs.IdHolderSyncJob;
51 import org.opendaylight.genius.idmanager.jobs.LocalPoolCreateJob;
52 import org.opendaylight.genius.idmanager.jobs.LocalPoolDeleteJob;
53 import org.opendaylight.genius.idmanager.jobs.UpdateIdEntryJob;
54 import org.opendaylight.genius.infra.Datastore.Configuration;
55 import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
56 import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
57 import org.opendaylight.genius.infra.TypedReadWriteTransaction;
58 import org.opendaylight.genius.infra.TypedWriteTransaction;
59 import org.opendaylight.infrautils.jobcoordinator.JobCoordinator;
60 import org.opendaylight.serviceutils.tools.mdsal.rpc.FutureRpcResults;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdInput;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdOutput;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdOutputBuilder;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdRangeInput;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdRangeOutput;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdRangeOutputBuilder;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.CreateIdPoolInput;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.CreateIdPoolOutput;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.CreateIdPoolOutputBuilder;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.DeleteIdPoolInput;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.DeleteIdPoolOutput;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.IdManagerService;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.IdPools;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.ReleaseIdInput;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.ReleaseIdOutput;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.IdPool;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.IdPoolBuilder;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.IdPoolKey;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.AvailableIdsHolder;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.AvailableIdsHolderBuilder;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.ChildPools;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.IdEntries;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.ReleasedIdsHolder;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.id.pool.ReleasedIdsHolderBuilder;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.released.ids.DelayedIdEntries;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockManagerService;
87 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
88 import org.opendaylight.yangtools.yang.common.OperationFailedException;
89 import org.opendaylight.yangtools.yang.common.RpcResult;
90 import org.slf4j.Logger;
91 import org.slf4j.LoggerFactory;
94 public class IdManager implements IdManagerService, IdManagerMonitor {
96 private static final Logger LOG = LoggerFactory.getLogger(IdManager.class);
97 private static final long DEFAULT_IDLE_TIME = 24 * 60 * 60;
99 private final DataBroker broker;
100 private final ManagedNewTransactionRunner txRunner;
101 private final SingleTransactionDataBroker singleTxDB;
102 private final LockManagerService lockManager;
103 private final IdUtils idUtils;
104 private final JobCoordinator jobCoordinator;
106 private final ConcurrentMap<String, IdLocalPool> localPool;
107 private final Timer cleanJobTimer = new Timer();
110 public IdManager(DataBroker db, LockManagerService lockManager, IdUtils idUtils,
111 @Reference DataImportBootReady dataImportBootReady, JobCoordinator jobCoordinator)
112 throws ReadFailedException {
114 this.txRunner = new ManagedNewTransactionRunnerImpl(db);
115 this.singleTxDB = new SingleTransactionDataBroker(db);
116 this.lockManager = lockManager;
117 this.idUtils = idUtils;
118 this.jobCoordinator = jobCoordinator;
120 // NB: We do not "use" the DataImportBootReady, but it's presence in the OSGi
121 // Service Registry is the required "signal" that the Daexim "import on boot"
122 // has fully completed (which we want to wait for). Therefore, making this
123 // dependent on that defers the Blueprint initialization, as we'd like to,
124 // so that we do not start giving out new IDs before an import went in.
125 // Thus, please DO NOT remove the DataImportBootReady argument, even if
126 // it appears to be (is) un-used from a Java code PoV!
128 this.localPool = new ConcurrentHashMap<>();
133 public Map<String, String> getLocalPoolsDetails() {
134 Map<String, String> map = new HashMap<>();
135 localPool.forEach((key, value) -> map.put(key, value.toString()));
140 public void start() {
141 LOG.info("{} start", getClass().getSimpleName());
145 public void close() {
146 cleanJobTimer.cancel();
148 LOG.info("{} close", getClass().getSimpleName());
151 private void populateCache() throws ReadFailedException {
152 // If IP changes during reboot, then there will be orphaned child pools.
153 InstanceIdentifier<IdPools> idPoolsInstance = idUtils.getIdPools();
154 Optional<IdPools> idPoolsOptional =
155 singleTxDB.syncReadOptional(LogicalDatastoreType.CONFIGURATION, idPoolsInstance);
156 if (!idPoolsOptional.isPresent()) {
159 nullToEmpty(idPoolsOptional.get().getIdPool())
161 .filter(idPool -> idPool.getParentPoolName() != null
162 && !idPool.getParentPoolName().isEmpty()
163 && idUtils.getLocalPoolName(idPool.getParentPoolName())
164 .equals(idPool.getPoolName()))
166 idPool -> updateLocalIdPoolCache(idPool,
167 idPool.getParentPoolName()));
170 public boolean updateLocalIdPoolCache(IdPool idPool, String parentPoolName) {
171 AvailableIdsHolder availableIdsHolder = idPool.getAvailableIdsHolder();
172 AvailableIdHolder availableIdHolder = new AvailableIdHolder(idUtils, availableIdsHolder.getStart(),
173 availableIdsHolder.getEnd());
174 availableIdHolder.setCur(availableIdsHolder.getCursor());
175 ReleasedIdsHolder releasedIdsHolder = idPool.getReleasedIdsHolder();
176 ReleasedIdHolder releasedIdHolder = new ReleasedIdHolder(idUtils, releasedIdsHolder.getDelayedTimeSec());
177 releasedIdHolder.setAvailableIdCount(releasedIdsHolder.getAvailableIdCount());
178 List<DelayedIdEntry> delayedIdEntryInCache = nullToEmpty(releasedIdsHolder.getDelayedIdEntries())
180 .map(delayedIdEntry -> new DelayedIdEntry(delayedIdEntry
181 .getId(), delayedIdEntry.getReadyTimeSec()))
182 .sorted(comparing(DelayedIdEntry::getReadyTimeSec))
183 .collect(toCollection(ArrayList::new));
185 releasedIdHolder.replaceDelayedEntries(delayedIdEntryInCache);
187 IdLocalPool idLocalPool = new IdLocalPool(idUtils, idPool.getPoolName());
188 idLocalPool.setAvailableIds(availableIdHolder);
189 idLocalPool.setReleasedIds(releasedIdHolder);
190 localPool.put(parentPoolName, idLocalPool);
191 if (LOG.isDebugEnabled()) {
192 LOG.debug("Populating cache for {} with {}", idLocalPool.getPoolName(), idLocalPool);
198 public ListenableFuture<RpcResult<CreateIdPoolOutput>> createIdPool(CreateIdPoolInput input) {
199 LOG.info("createIdPool called with input {}", input);
200 long low = input.getLow();
201 long high = input.getHigh();
202 long blockSize = idUtils.computeBlockSize(low, high);
203 return FutureRpcResults.fromListenableFuture(LOG, "createIdPool", input, () -> {
204 String poolName = input.getPoolName().intern();
206 idUtils.lock(lockManager, poolName);
207 return Futures.transform(txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, confTx -> {
208 IdPool idPool = createGlobalPool(confTx, poolName, low, high, blockSize);
209 String localPoolName = idUtils.getLocalPoolName(poolName);
210 IdLocalPool idLocalPool = localPool.get(poolName);
211 if (idLocalPool == null) {
212 createLocalPool(confTx, localPoolName, idPool);
213 idUtils.updateChildPool(confTx, idPool.getPoolName(), localPoolName);
215 }), unused -> new CreateIdPoolOutputBuilder().build(), MoreExecutors.directExecutor());
217 idUtils.unlock(lockManager, poolName);
223 public ListenableFuture<RpcResult<AllocateIdOutput>> allocateId(AllocateIdInput input) {
224 String idKey = input.getIdKey();
225 String poolName = input.getPoolName();
226 return FutureRpcResults.fromBuilder(LOG, "allocateId", input, () -> {
227 String localPoolName = idUtils.getLocalPoolName(poolName);
228 // allocateIdFromLocalPool method returns a list of IDs with one element. This element is obtained by get(0)
229 long newIdValue = allocateIdFromLocalPool(poolName, localPoolName, idKey, 1).get(0);
230 return new AllocateIdOutputBuilder().setIdValue(newIdValue);
231 }).onFailure(e -> completeExceptionallyIfPresent(poolName, idKey, e)).build();
234 private void completeExceptionallyIfPresent(String poolName, String idKey, Throwable exception) {
235 CompletableFuture<List<Long>> completableFuture =
236 idUtils.removeAllocatedIds(idUtils.getUniqueKey(poolName, idKey));
237 if (completableFuture != null) {
238 completableFuture.completeExceptionally(exception);
243 public ListenableFuture<RpcResult<AllocateIdRangeOutput>> allocateIdRange(AllocateIdRangeInput input) {
244 String idKey = input.getIdKey();
245 String poolName = input.getPoolName();
246 long size = input.getSize();
247 String localPoolName = idUtils.getLocalPoolName(poolName);
248 AllocateIdRangeOutputBuilder output = new AllocateIdRangeOutputBuilder();
249 return FutureRpcResults.fromBuilder(LOG, "allocateIdRange", input, () -> {
250 List<Long> newIdValuesList = allocateIdFromLocalPool(poolName, localPoolName, idKey, size);
251 Collections.sort(newIdValuesList);
252 output.setIdValues(newIdValuesList);
254 }).onFailure(e -> completeExceptionallyIfPresent(poolName, idKey, e)).build();
258 public ListenableFuture<RpcResult<DeleteIdPoolOutput>> deleteIdPool(DeleteIdPoolInput input) {
259 return FutureRpcResults.fromListenableFuture(LOG, "deleteIdPool", input, () -> {
260 String poolName = input.getPoolName().intern();
261 InstanceIdentifier<IdPool> idPoolToBeDeleted = idUtils.getIdPoolInstance(poolName);
262 synchronized (poolName) {
263 IdPool idPool = singleTxDB.syncRead(LogicalDatastoreType.CONFIGURATION, idPoolToBeDeleted);
264 List<ChildPools> childPoolList = idPool.getChildPools();
265 if (childPoolList != null) {
266 childPoolList.forEach(childPool -> deletePool(childPool.getChildPoolName()));
268 singleTxDB.syncDelete(LogicalDatastoreType.CONFIGURATION, idPoolToBeDeleted);
270 // TODO return the Future from a TBD asyncDelete instead.. BUT check that all callers @CheckReturnValue
271 return Futures.immediateFuture((DeleteIdPoolOutput) null);
276 public ListenableFuture<RpcResult<ReleaseIdOutput>> releaseId(ReleaseIdInput input) {
277 String poolName = input.getPoolName();
278 String idKey = input.getIdKey();
279 String uniqueKey = idUtils.getUniqueKey(poolName, idKey);
280 return FutureRpcResults.fromListenableFuture(LOG, "releaseId", input, () -> {
281 idUtils.lock(lockManager, uniqueKey);
282 releaseIdFromLocalPool(poolName, idUtils.getLocalPoolName(poolName), idKey);
283 // TODO return the Future from releaseIdFromLocalPool() instead.. check all callers @CheckReturnValue
284 return Futures.immediateFuture((ReleaseIdOutput) null);
285 }).onFailureLogLevel(org.opendaylight.serviceutils.tools.mdsal.rpc.FutureRpcResults.LogLevel.NONE)
287 if (e instanceof IdDoesNotExistException) {
288 // Do not log full stack trace in case ID does not exist
289 LOG.error("RPC releaseId() failed due to IdDoesNotExistException; input = {}", input);
291 // But for all other cases do:
292 LOG.error("RPC releaseId() failed; input = {}", input, e);
294 idUtils.unlock(lockManager, uniqueKey);
298 private List<Long> allocateIdFromLocalPool(String parentPoolName, String localPoolName,
299 String idKey, long size) throws OperationFailedException, IdManagerException {
300 LOG.debug("Allocating id from local pool {}. Parent pool {}. Idkey {}", localPoolName, parentPoolName, idKey);
301 String uniqueIdKey = idUtils.getUniqueKey(parentPoolName, idKey);
302 CompletableFuture<List<Long>> futureIdValues = new CompletableFuture<>();
303 CompletableFuture<List<Long>> existingFutureIdValue =
304 idUtils.putAllocatedIdsIfAbsent(uniqueIdKey, futureIdValues);
305 if (existingFutureIdValue != null) {
307 return existingFutureIdValue.get();
308 } catch (InterruptedException | ExecutionException e) {
309 LOG.warn("Could not obtain id from existing futureIdValue for idKey {} and pool {}.",
310 idKey, parentPoolName);
311 throw new IdManagerException(e.getMessage(), e);
315 List<Long> newIdValuesList = checkForIdInIdEntries(parentPoolName, idKey, uniqueIdKey, futureIdValues,
317 if (!newIdValuesList.isEmpty()) {
318 return newIdValuesList;
320 //This get will not help in concurrent reads. Hence the same read needs to be done again.
321 IdLocalPool localIdPool = getOrCreateLocalIdPool(parentPoolName, localPoolName);
322 LOG.debug("Got pool {}", localIdPool);
323 long newIdValue = -1;
324 localPoolName = localPoolName.intern();
326 newIdValue = getIdFromLocalPoolCache(localIdPool, parentPoolName);
327 newIdValuesList.add(newIdValue);
329 getRangeOfIds(parentPoolName, localPoolName, size, newIdValuesList, localIdPool, newIdValue);
331 LOG.debug("The newIdValues {} for the idKey {}", newIdValuesList, idKey);
332 idUtils.putReleaseIdLatch(uniqueIdKey, new CountDownLatch(1));
333 UpdateIdEntryJob job = new UpdateIdEntryJob(parentPoolName, localPoolName, idKey, newIdValuesList, txRunner,
334 idUtils, lockManager);
335 jobCoordinator.enqueueJob(parentPoolName, job, IdUtils.RETRY_COUNT);
336 futureIdValues.complete(newIdValuesList);
337 return newIdValuesList;
338 } catch (OperationFailedException | IdManagerException e) {
339 idUtils.unlock(lockManager, uniqueIdKey);
344 private Long getIdFromLocalPoolCache(IdLocalPool localIdPool, String parentPoolName)
345 throws IdManagerException {
347 IdHolder availableIds = localIdPool.getAvailableIds();
348 if (availableIds != null) {
349 Optional<Long> availableId = availableIds.allocateId();
350 if (availableId.isPresent()) {
351 IdHolderSyncJob poolSyncJob =
352 new IdHolderSyncJob(localIdPool.getPoolName(), localIdPool.getAvailableIds(), txRunner,
354 jobCoordinator.enqueueJob(localIdPool.getPoolName(), poolSyncJob, IdUtils.RETRY_COUNT);
355 return availableId.get();
358 IdHolder releasedIds = localIdPool.getReleasedIds();
359 Optional<Long> releasedId = releasedIds.allocateId();
360 if (releasedId.isPresent()) {
361 IdHolderSyncJob poolSyncJob =
362 new IdHolderSyncJob(localIdPool.getPoolName(), localIdPool.getReleasedIds(), txRunner,
364 jobCoordinator.enqueueJob(localIdPool.getPoolName(), poolSyncJob, IdUtils.RETRY_COUNT);
365 return releasedId.get();
367 long idCount = getIdBlockFromParentPool(parentPoolName, localIdPool);
369 if (LOG.isDebugEnabled()) {
370 LOG.debug("Unable to allocate Id block from global pool {}", parentPoolName);
372 throw new IdManagerException(String.format("Ids exhausted for pool : %s", parentPoolName));
378 * Changes made to availableIds and releasedIds will not be persisted to the datastore.
380 private long getIdBlockFromParentPool(String parentPoolName, IdLocalPool localIdPool)
381 throws IdManagerException {
382 if (LOG.isDebugEnabled()) {
383 LOG.debug("Allocating block of id from parent pool {}", parentPoolName);
385 InstanceIdentifier<IdPool> idPoolInstanceIdentifier = idUtils.getIdPoolInstance(parentPoolName);
386 parentPoolName = parentPoolName.intern();
387 idUtils.lock(lockManager, parentPoolName);
389 // Check if the childpool already got id block.
390 long availableIdCount =
391 localIdPool.getAvailableIds().getAvailableIdCount()
392 + localIdPool.getReleasedIds().getAvailableIdCount();
393 if (availableIdCount > 0) {
394 return availableIdCount;
396 return txRunner.applyWithNewReadWriteTransactionAndSubmit(CONFIGURATION, confTx -> {
397 Optional<IdPool> parentIdPool = confTx.read(idPoolInstanceIdentifier).get();
398 if (parentIdPool.isPresent()) {
399 return allocateIdBlockFromParentPool(localIdPool, parentIdPool.get(), confTx);
401 throw new ExpectedDataObjectNotFoundException(LogicalDatastoreType.CONFIGURATION,
402 idPoolInstanceIdentifier);
405 } catch (InterruptedException | ExecutionException e) {
406 throw new IdManagerException("Error getting id block from parent pool", e);
408 idUtils.unlock(lockManager, parentPoolName);
412 private long allocateIdBlockFromParentPool(IdLocalPool localPoolCache, IdPool parentIdPool,
413 TypedWriteTransaction<Configuration> confTx)
414 throws OperationFailedException, IdManagerException {
416 ReleasedIdsHolderBuilder releasedIdsBuilderParent = IdUtils.getReleaseIdsHolderBuilder(parentIdPool);
418 idCount = allocateIdBlockFromAvailableIdsHolder(localPoolCache, parentIdPool, confTx);
422 idCount = allocateIdBlockFromReleasedIdsHolder(localPoolCache, releasedIdsBuilderParent, parentIdPool,
427 idCount = getIdsFromOtherChildPools(releasedIdsBuilderParent, parentIdPool);
429 if (LOG.isDebugEnabled()) {
430 LOG.debug("Unable to allocate Id block from global pool");
432 throw new IdManagerException(String.format("Ids exhausted for pool : %s", parentIdPool.getPoolName()));
437 private long getIdsFromOtherChildPools(ReleasedIdsHolderBuilder releasedIdsBuilderParent, IdPool parentIdPool)
438 throws OperationFailedException {
439 List<ChildPools> childPoolsList = nullToEmpty(parentIdPool.getChildPools());
440 // Sorting the child pools on last accessed time so that the pool that
441 // was not accessed for a long time comes first.
442 childPoolsList.sort(comparing(ChildPools::getLastAccessTime));
443 long currentTime = System.currentTimeMillis() / 1000;
444 for (ChildPools childPools : childPoolsList) {
445 if (childPools.getLastAccessTime() + DEFAULT_IDLE_TIME > currentTime) {
448 if (!Objects.equals(childPools.getChildPoolName(), idUtils.getLocalPoolName(parentIdPool.getPoolName()))) {
449 InstanceIdentifier<IdPool> idPoolInstanceIdentifier = idUtils
450 .getIdPoolInstance(childPools.getChildPoolName());
451 IdPool otherChildPool =
452 singleTxDB.syncRead(LogicalDatastoreType.CONFIGURATION, idPoolInstanceIdentifier);
453 ReleasedIdsHolderBuilder releasedIds = IdUtils.getReleaseIdsHolderBuilder(otherChildPool);
455 List<DelayedIdEntries> delayedIdEntriesChild = releasedIds.getDelayedIdEntries();
456 List<DelayedIdEntries> delayedIdEntriesParent = releasedIdsBuilderParent.getDelayedIdEntries();
457 if (delayedIdEntriesParent == null) {
458 delayedIdEntriesParent = new LinkedList<>();
460 delayedIdEntriesParent.addAll(delayedIdEntriesChild);
461 delayedIdEntriesChild.clear();
463 AvailableIdsHolderBuilder availableIds = idUtils.getAvailableIdsHolderBuilder(otherChildPool);
464 while (idUtils.isIdAvailable(availableIds)) {
465 long cursor = availableIds.getCursor() + 1;
466 delayedIdEntriesParent.add(idUtils.createDelayedIdEntry(cursor, currentTime));
467 availableIds.setCursor(cursor);
470 long totalAvailableIdCount = releasedIds.getDelayedIdEntries().size()
471 + idUtils.getAvailableIdsCount(availableIds);
472 long count = releasedIdsBuilderParent.getAvailableIdCount() + totalAvailableIdCount;
473 releasedIdsBuilderParent.setDelayedIdEntries(delayedIdEntriesParent).setAvailableIdCount(count);
474 singleTxDB.syncUpdate(LogicalDatastoreType.CONFIGURATION, idPoolInstanceIdentifier,
475 new IdPoolBuilder().withKey(new IdPoolKey(otherChildPool.getPoolName()))
476 .setAvailableIdsHolder(availableIds.build()).setReleasedIdsHolder(releasedIds.build())
478 return totalAvailableIdCount;
484 private long allocateIdBlockFromReleasedIdsHolder(IdLocalPool localIdPool,
485 ReleasedIdsHolderBuilder releasedIdsBuilderParent, IdPool parentIdPool,
486 TypedWriteTransaction<Configuration> confTx) {
487 if (releasedIdsBuilderParent.getAvailableIdCount() == 0) {
488 LOG.debug("Ids unavailable in releasedIds of parent pool {}", parentIdPool);
491 List<DelayedIdEntries> delayedIdEntriesParent = releasedIdsBuilderParent.getDelayedIdEntries();
492 int idCount = Math.min(delayedIdEntriesParent.size(), parentIdPool.getBlockSize());
493 List<DelayedIdEntries> idEntriesToBeRemoved = delayedIdEntriesParent.subList(0, idCount);
494 ReleasedIdHolder releasedIds = (ReleasedIdHolder) localIdPool.getReleasedIds();
495 List<DelayedIdEntry> delayedIdEntriesLocalCache = releasedIds.getDelayedEntries();
496 List<DelayedIdEntry> delayedIdEntriesFromParentPool = idEntriesToBeRemoved
498 .map(delayedIdEntry -> new DelayedIdEntry(delayedIdEntry
499 .getId(), delayedIdEntry.getReadyTimeSec()))
500 .sorted(comparing(DelayedIdEntry::getReadyTimeSec))
501 .collect(toCollection(ArrayList::new));
502 delayedIdEntriesFromParentPool.addAll(delayedIdEntriesLocalCache);
503 releasedIds.replaceDelayedEntries(delayedIdEntriesFromParentPool);
504 releasedIds.setAvailableIdCount(releasedIds.getAvailableIdCount() + idCount);
505 localIdPool.setReleasedIds(releasedIds);
506 delayedIdEntriesParent.removeAll(idEntriesToBeRemoved);
507 releasedIdsBuilderParent.setDelayedIdEntries(delayedIdEntriesParent);
508 InstanceIdentifier<ReleasedIdsHolder> releasedIdsHolderInstanceIdentifier = InstanceIdentifier
509 .builder(IdPools.class).child(IdPool.class,
510 new IdPoolKey(parentIdPool.getPoolName())).child(ReleasedIdsHolder.class).build();
511 releasedIdsBuilderParent.setAvailableIdCount(releasedIdsBuilderParent.getAvailableIdCount() - idCount);
512 LOG.debug("Allocated {} ids from releasedIds of parent pool {}", idCount, parentIdPool);
513 confTx.merge(releasedIdsHolderInstanceIdentifier, releasedIdsBuilderParent.build(), CREATE_MISSING_PARENTS);
517 private long allocateIdBlockFromAvailableIdsHolder(IdLocalPool localIdPool, IdPool parentIdPool,
518 TypedWriteTransaction<Configuration> confTx) {
520 AvailableIdsHolderBuilder availableIdsBuilderParent = idUtils.getAvailableIdsHolderBuilder(parentIdPool);
521 long end = availableIdsBuilderParent.getEnd();
522 long cur = availableIdsBuilderParent.getCursor();
523 if (!idUtils.isIdAvailable(availableIdsBuilderParent)) {
524 if (LOG.isDebugEnabled()) {
525 LOG.debug("Ids exhausted in parent pool {}", parentIdPool);
529 // Update availableIdsHolder of Local Pool
530 idCount = Math.min(end - cur, parentIdPool.getBlockSize());
531 AvailableIdHolder availableIds = new AvailableIdHolder(idUtils, cur + 1, cur + idCount);
532 localIdPool.setAvailableIds(availableIds);
533 // Update availableIdsHolder of Global Pool
534 InstanceIdentifier<AvailableIdsHolder> availableIdsHolderInstanceIdentifier = InstanceIdentifier
535 .builder(IdPools.class).child(IdPool.class,
536 new IdPoolKey(parentIdPool.getPoolName())).child(AvailableIdsHolder.class).build();
537 availableIdsBuilderParent.setCursor(cur + idCount);
538 if (LOG.isDebugEnabled()) {
539 LOG.debug("Allocated {} ids from availableIds of global pool {}", idCount, parentIdPool);
541 confTx.merge(availableIdsHolderInstanceIdentifier, availableIdsBuilderParent.build(), CREATE_MISSING_PARENTS);
545 private void releaseIdFromLocalPool(String parentPoolName, String localPoolName, String idKey)
546 throws ReadFailedException, IdManagerException {
547 String idLatchKey = idUtils.getUniqueKey(parentPoolName, idKey);
548 LOG.debug("Releasing ID {} from pool {}", idKey, localPoolName);
549 CountDownLatch latch = idUtils.getReleaseIdLatch(idLatchKey);
552 if (!latch.await(10, TimeUnit.SECONDS)) {
553 LOG.warn("Timed out while releasing id {} from id pool {}", idKey, parentPoolName);
555 } catch (InterruptedException ignored) {
556 LOG.warn("Thread interrupted while releasing id {} from id pool {}", idKey, parentPoolName);
558 idUtils.removeReleaseIdLatch(idLatchKey);
561 localPoolName = localPoolName.intern();
562 InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier = idUtils.getIdPoolInstance(parentPoolName);
563 IdPool parentIdPool = singleTxDB.syncRead(LogicalDatastoreType.CONFIGURATION, parentIdPoolInstanceIdentifier);
564 List<IdEntries> idEntries = parentIdPool.getIdEntries();
565 if (idEntries == null) {
566 throw new IdDoesNotExistException(parentPoolName, idKey);
568 InstanceIdentifier<IdEntries> existingId = idUtils.getIdEntry(parentIdPoolInstanceIdentifier, idKey);
569 Optional<IdEntries> existingIdEntryObject =
570 singleTxDB.syncReadOptional(LogicalDatastoreType.CONFIGURATION, existingId);
571 if (!existingIdEntryObject.isPresent()) {
572 LOG.info("Specified Id key {} does not exist in id pool {}", idKey, parentPoolName);
573 idUtils.unlock(lockManager, idLatchKey);
576 IdEntries existingIdEntry = existingIdEntryObject.get();
577 List<Long> idValuesList = nullToEmpty(existingIdEntry.getIdValue());
578 IdLocalPool localIdPoolCache = localPool.get(parentPoolName);
579 boolean isRemoved = idEntries.remove(existingIdEntry);
580 LOG.debug("The entry {} is removed {}", existingIdEntry, isRemoved);
581 updateDelayedEntriesInLocalCache(idValuesList, parentPoolName, localIdPoolCache);
582 IdHolderSyncJob poolSyncJob = new IdHolderSyncJob(localPoolName, localIdPoolCache.getReleasedIds(), txRunner,
584 jobCoordinator.enqueueJob(localPoolName, poolSyncJob, IdUtils.RETRY_COUNT);
585 scheduleCleanUpTask(localIdPoolCache, parentPoolName, parentIdPool.getBlockSize());
586 LOG.debug("Released id ({}, {}) from pool {}", idKey, idValuesList, localPoolName);
587 // Updating id entries in the parent pool. This will be used for restart scenario
588 UpdateIdEntryJob job = new UpdateIdEntryJob(parentPoolName, localPoolName, idKey, null, txRunner, idUtils,
590 jobCoordinator.enqueueJob(parentPoolName, job, IdUtils.RETRY_COUNT);
593 private void scheduleCleanUpTask(final IdLocalPool localIdPoolCache,
594 final String parentPoolName, final int blockSize) {
595 TimerTask scheduledTask = new TimerTask() {
599 new CleanUpJob(localIdPoolCache, txRunner, broker, parentPoolName, blockSize, lockManager,
600 idUtils, jobCoordinator);
601 jobCoordinator.enqueueJob(localIdPoolCache.getPoolName(), job, IdUtils.RETRY_COUNT);
604 cleanJobTimer.schedule(scheduledTask, IdUtils.DEFAULT_DELAY_TIME * 1000);
607 private IdPool createGlobalPool(TypedReadWriteTransaction<Configuration> confTx, String poolName, long low,
608 long high, long blockSize) throws IdManagerException {
610 InstanceIdentifier<IdPool> idPoolInstanceIdentifier = idUtils.getIdPoolInstance(poolName);
612 Optional<IdPool> existingIdPool = confTx.read(idPoolInstanceIdentifier).get();
613 if (!existingIdPool.isPresent()) {
614 if (LOG.isDebugEnabled()) {
615 LOG.debug("Creating new global pool {}", poolName);
617 idPool = idUtils.createGlobalPool(poolName, low, high, blockSize);
618 confTx.put(idPoolInstanceIdentifier, idPool, CREATE_MISSING_PARENTS);
620 idPool = existingIdPool.get();
621 if (LOG.isDebugEnabled()) {
622 LOG.debug("GlobalPool exists {}", idPool);
626 } catch (ExecutionException | InterruptedException e) {
627 throw new IdManagerException("Error retrieving the existing id pool for " + poolName, e);
631 private IdLocalPool createLocalPool(TypedWriteTransaction<Configuration> confTx, String localPoolName,
633 throws OperationFailedException, IdManagerException {
634 localPoolName = localPoolName.intern();
635 IdLocalPool idLocalPool = new IdLocalPool(idUtils, localPoolName);
636 allocateIdBlockFromParentPool(idLocalPool, idPool, confTx);
637 String parentPool = idPool.getPoolName();
638 localPool.put(parentPool, idLocalPool);
639 LocalPoolCreateJob job = new LocalPoolCreateJob(idLocalPool, txRunner, idPool.getPoolName(),
640 idPool.getBlockSize(), idUtils);
641 jobCoordinator.enqueueJob(localPoolName, job, IdUtils.RETRY_COUNT);
645 private void deletePool(String poolName) {
646 LocalPoolDeleteJob job = new LocalPoolDeleteJob(poolName, txRunner, idUtils);
647 jobCoordinator.enqueueJob(poolName, job, IdUtils.RETRY_COUNT);
650 public void poolDeleted(String parentPoolName, String poolName) {
651 IdLocalPool idLocalPool = localPool.get(parentPoolName);
652 if (idLocalPool != null) {
653 if (idLocalPool.getPoolName().equals(poolName)) {
654 localPool.remove(parentPoolName);
659 private void updateDelayedEntriesInLocalCache(List<Long> idsList, String parentPoolName,
660 IdLocalPool localPoolCache) {
661 for (long idValue : idsList) {
662 localPoolCache.getReleasedIds().addId(idValue);
664 localPool.put(parentPoolName, localPoolCache);
667 private List<Long> checkForIdInIdEntries(String parentPoolName, String idKey, String uniqueIdKey,
668 CompletableFuture<List<Long>> futureIdValues, boolean hasExistingFutureIdValues)
669 throws IdManagerException, ReadFailedException {
670 InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier = idUtils.getIdPoolInstance(parentPoolName);
671 InstanceIdentifier<IdEntries> existingId = idUtils.getIdEntry(parentIdPoolInstanceIdentifier, idKey);
672 idUtils.lock(lockManager, uniqueIdKey);
673 List<Long> newIdValuesList = new ArrayList<>();
674 Optional<IdEntries> existingIdEntry =
675 singleTxDB.syncReadOptional(LogicalDatastoreType.CONFIGURATION, existingId);
676 if (existingIdEntry.isPresent()) {
677 newIdValuesList = existingIdEntry.get().getIdValue();
678 LOG.debug("Existing ids {} for the key {} ", newIdValuesList, idKey);
679 // Inform other waiting threads about this new value.
680 futureIdValues.complete(newIdValuesList);
681 // This is to avoid stale entries in the map. If this thread had populated the map,
682 // then the entry should be removed.
683 if (!hasExistingFutureIdValues) {
684 idUtils.removeAllocatedIds(uniqueIdKey);
686 idUtils.unlock(lockManager, uniqueIdKey);
687 return newIdValuesList;
689 return newIdValuesList;
692 private IdLocalPool getOrCreateLocalIdPool(String parentPoolName, String localPoolName)
693 throws IdManagerException, ReadFailedException {
694 IdLocalPool localIdPool = localPool.get(parentPoolName);
695 if (localIdPool == null) {
696 idUtils.lock(lockManager, parentPoolName);
698 // Check if a previous thread that got the cluster-wide lock
699 // first, has created the localPool
700 InstanceIdentifier<IdPool> childIdPoolInstanceIdentifier = idUtils
701 .getIdPoolInstance(localPoolName);
702 IdPool childIdPool = singleTxDB.syncRead(LogicalDatastoreType.CONFIGURATION,
703 childIdPoolInstanceIdentifier);
704 if (childIdPool != null) {
705 updateLocalIdPoolCache(childIdPool, parentPoolName);
707 if (localPool.get(parentPoolName) == null) {
709 return txRunner.applyWithNewReadWriteTransactionAndSubmit(CONFIGURATION, confTx -> {
710 InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier = idUtils
711 .getIdPoolInstance(parentPoolName);
712 Optional<IdPool> parentIdPool = confTx.read(parentIdPoolInstanceIdentifier).get();
713 if (parentIdPool.isPresent()) {
714 // Return localIdPool
715 return createLocalPool(confTx, localPoolName, parentIdPool.get());
717 throw new ExpectedDataObjectNotFoundException(LogicalDatastoreType.CONFIGURATION,
718 parentIdPoolInstanceIdentifier);
721 } catch (InterruptedException | ExecutionException e) {
722 throw new IdManagerException("Error creating a local id pool", e);
725 localIdPool = localPool.get(parentPoolName);
728 idUtils.unlock(lockManager, parentPoolName);
734 private void getRangeOfIds(String parentPoolName, String localPoolName, long size, List<Long> newIdValuesList,
735 IdLocalPool localIdPool, long newIdValue) throws ReadFailedException, IdManagerException {
736 InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier1 = idUtils.getIdPoolInstance(parentPoolName);
737 IdPool parentIdPool = singleTxDB.syncRead(LogicalDatastoreType.CONFIGURATION, parentIdPoolInstanceIdentifier1);
738 long totalAvailableIdCount = localIdPool.getAvailableIds().getAvailableIdCount()
739 + localIdPool.getReleasedIds().getAvailableIdCount();
740 AvailableIdsHolderBuilder availableParentIds = idUtils.getAvailableIdsHolderBuilder(parentIdPool);
741 ReleasedIdsHolderBuilder releasedParentIds = IdUtils.getReleaseIdsHolderBuilder(parentIdPool);
742 totalAvailableIdCount = totalAvailableIdCount + releasedParentIds.getAvailableIdCount()
743 + idUtils.getAvailableIdsCount(availableParentIds);
744 if (totalAvailableIdCount > size) {
747 newIdValue = getIdFromLocalPoolCache(localIdPool, parentPoolName);
748 } catch (IdManagerException e) {
749 if (LOG.isDebugEnabled()) {
750 LOG.debug("Releasing IDs to pool {}", localPoolName);
752 // Releasing the IDs added in newIdValuesList since
753 // a null list would be returned now, as the
754 // requested size of list IDs exceeds the number of
756 updateDelayedEntriesInLocalCache(newIdValuesList, parentPoolName, localIdPool);
758 newIdValuesList.add(newIdValue);
762 throw new IdManagerException(String.format("Ids exhausted for pool : %s", parentPoolName));