* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.genius.idmanager;
-import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
+import static java.util.Comparator.comparing;
+import static java.util.stream.Collectors.toCollection;
+import static org.opendaylight.controller.md.sal.binding.api.WriteTransaction.CREATE_MISSING_PARENTS;
+import static org.opendaylight.genius.infra.Datastore.CONFIGURATION;
import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Future;
-import java.util.stream.Collectors;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import javax.inject.Singleton;
+
+import org.apache.aries.blueprint.annotation.service.Reference;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.genius.datastoreutils.DataStoreJobCoordinator;
+import org.opendaylight.daexim.DataImportBootReady;
+import org.opendaylight.genius.datastoreutils.ExpectedDataObjectNotFoundException;
import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
import org.opendaylight.genius.idmanager.ReleasedIdHolder.DelayedIdEntry;
import org.opendaylight.genius.idmanager.api.IdManagerMonitor;
import org.opendaylight.genius.idmanager.jobs.LocalPoolCreateJob;
import org.opendaylight.genius.idmanager.jobs.LocalPoolDeleteJob;
import org.opendaylight.genius.idmanager.jobs.UpdateIdEntryJob;
+import org.opendaylight.genius.infra.Datastore.Configuration;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
+import org.opendaylight.genius.infra.TypedReadWriteTransaction;
+import org.opendaylight.genius.infra.TypedWriteTransaction;
+import org.opendaylight.infrautils.jobcoordinator.JobCoordinator;
+import org.opendaylight.serviceutils.tools.mdsal.rpc.FutureRpcResults;
import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdRangeOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.AllocateIdRangeOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.CreateIdPoolInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.CreateIdPoolOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.CreateIdPoolOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.DeleteIdPoolInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.DeleteIdPoolOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.IdManagerService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.IdPools;
import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.ReleaseIdInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.ReleaseIdOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.IdPool;
import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.IdPoolBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.id.pools.IdPoolKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.lockmanager.rev160413.LockManagerService;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.common.OperationFailedException;
-import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final long DEFAULT_IDLE_TIME = 24 * 60 * 60;
private final DataBroker broker;
+ private final ManagedNewTransactionRunner txRunner;
private final SingleTransactionDataBroker singleTxDB;
private final LockManagerService lockManager;
private final IdUtils idUtils;
+ private final JobCoordinator jobCoordinator;
private final ConcurrentMap<String, IdLocalPool> localPool;
private final Timer cleanJobTimer = new Timer();
@Inject
- public IdManager(DataBroker db, LockManagerService lockManager, IdUtils idUtils) throws ReadFailedException {
+ public IdManager(DataBroker db, LockManagerService lockManager, IdUtils idUtils,
+ @Reference DataImportBootReady dataImportBootReady, JobCoordinator jobCoordinator)
+ throws ReadFailedException {
this.broker = db;
+ this.txRunner = new ManagedNewTransactionRunnerImpl(db);
this.singleTxDB = new SingleTransactionDataBroker(db);
this.lockManager = lockManager;
this.idUtils = idUtils;
+ this.jobCoordinator = jobCoordinator;
+
+ // NB: We do not "use" the DataImportBootReady, but it's presence in the OSGi
+ // Service Registry is the required "signal" that the Daexim "import on boot"
+ // has fully completed (which we want to wait for). Therefore, making this
+ // dependent on that defers the Blueprint initialization, as we'd like to,
+ // so that we do not start giving out new IDs before an import went in.
+ // Thus, please DO NOT remove the DataImportBootReady argument, even if
+ // it appears to be (is) un-used from a Java code PoV!
+
this.localPool = new ConcurrentHashMap<>();
populateCache();
}
@Override
public Map<String, String> getLocalPoolsDetails() {
Map<String, String> map = new HashMap<>();
- localPool.entrySet().stream().forEach(entry -> map.put(entry.getKey(), entry.getValue().toString()));
+ localPool.forEach((key, value) -> map.put(key, value.toString()));
return map;
}
}
@PreDestroy
- public void close() throws Exception {
+ public void close() {
+ cleanJobTimer.cancel();
+
LOG.info("{} close", getClass().getSimpleName());
}
private void populateCache() throws ReadFailedException {
// If IP changes during reboot, then there will be orphaned child pools.
InstanceIdentifier<IdPools> idPoolsInstance = idUtils.getIdPools();
- Optional<IdPools> idPoolsOptional = singleTxDB.syncReadOptional(CONFIGURATION, idPoolsInstance);
+ Optional<IdPools> idPoolsOptional =
+ singleTxDB.syncReadOptional(LogicalDatastoreType.CONFIGURATION, idPoolsInstance);
if (!idPoolsOptional.isPresent()) {
return;
}
IdPools idPools = idPoolsOptional.get();
List<IdPool> idPoolList = idPools.getIdPool();
idPoolList
- .parallelStream()
+ .stream()
.filter(idPool -> idPool.getParentPoolName() != null
&& !idPool.getParentPoolName().isEmpty()
&& idUtils.getLocalPoolName(idPool.getParentPoolName())
ReleasedIdHolder releasedIdHolder = new ReleasedIdHolder(idUtils, releasedIdsHolder.getDelayedTimeSec());
releasedIdHolder.setAvailableIdCount(releasedIdsHolder.getAvailableIdCount());
List<DelayedIdEntries> delayedEntries = releasedIdsHolder.getDelayedIdEntries();
- List<DelayedIdEntry> delayedIdEntryInCache = new ArrayList<>();
- if (delayedEntries != null) {
- delayedIdEntryInCache = delayedEntries
- .parallelStream()
- .map(delayedIdEntry -> new DelayedIdEntry(delayedIdEntry
- .getId(), delayedIdEntry.getReadyTimeSec()))
- .sorted((idEntry1, idEntry2) -> Long.compare(idEntry1.getReadyTimeSec(),
- idEntry2.getReadyTimeSec())).collect(Collectors.toList());
- }
- releasedIdHolder.setDelayedEntries(delayedIdEntryInCache);
+ List<DelayedIdEntry> delayedIdEntryInCache = delayedEntries
+ .stream()
+ .map(delayedIdEntry -> new DelayedIdEntry(delayedIdEntry
+ .getId(), delayedIdEntry.getReadyTimeSec()))
+ .sorted(comparing(DelayedIdEntry::getReadyTimeSec))
+ .collect(toCollection(ArrayList::new));
+
+ releasedIdHolder.replaceDelayedEntries(delayedIdEntryInCache);
IdLocalPool idLocalPool = new IdLocalPool(idUtils, idPool.getPoolName());
idLocalPool.setAvailableIds(availableIdHolder);
}
@Override
- public Future<RpcResult<Void>> createIdPool(CreateIdPoolInput input) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("createIdPool called with input {}", input);
- }
- String poolName = input.getPoolName();
+ public ListenableFuture<RpcResult<CreateIdPoolOutput>> createIdPool(CreateIdPoolInput input) {
+ LOG.info("createIdPool called with input {}", input);
long low = input.getLow();
long high = input.getHigh();
long blockSize = idUtils.computeBlockSize(low, high);
- Future<RpcResult<Void>> futureResult;
- try {
- idUtils.lockPool(lockManager, poolName);
- WriteTransaction tx = broker.newWriteOnlyTransaction();
- poolName = poolName.intern();
- IdPool idPool;
- idPool = createGlobalPool(tx, poolName, low, high, blockSize);
- String localPoolName = idUtils.getLocalPoolName(poolName);
- IdLocalPool idLocalPool = localPool.get(poolName);
- if (idLocalPool == null) {
- createLocalPool(tx, localPoolName, idPool);
- idUtils.updateChildPool(tx, idPool.getPoolName(), localPoolName);
- }
- tx.submit().checkedGet();
- futureResult = RpcResultBuilder.<Void>success().buildFuture();
- } catch (OperationFailedException | IdManagerException e) {
- futureResult = buildFailedRpcResultFuture("createIdPool failed: " + input.toString(), e);
- } finally {
+ return FutureRpcResults.fromListenableFuture(LOG, "createIdPool", input, () -> {
+ String poolName = input.getPoolName().intern();
try {
- idUtils.unlockPool(lockManager, poolName);
- } catch (IdManagerException e) {
- futureResult = buildFailedRpcResultFuture("createIdPool unlockPool() failed: " + input.toString(), e);
+ idUtils.lock(lockManager, poolName);
+ return Futures.transform(txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, confTx -> {
+ IdPool idPool = createGlobalPool(confTx, poolName, low, high, blockSize);
+ String localPoolName = idUtils.getLocalPoolName(poolName);
+ IdLocalPool idLocalPool = localPool.get(poolName);
+ if (idLocalPool == null) {
+ createLocalPool(confTx, localPoolName, idPool);
+ idUtils.updateChildPool(confTx, idPool.getPoolName(), localPoolName);
+ }
+ }), unused -> new CreateIdPoolOutputBuilder().build(), MoreExecutors.directExecutor());
+ } finally {
+ idUtils.unlock(lockManager, poolName);
}
- }
- return futureResult;
+ }).build();
}
@Override
- public Future<RpcResult<AllocateIdOutput>> allocateId(AllocateIdInput input) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("AllocateId called with input {}", input);
- }
+ public ListenableFuture<RpcResult<AllocateIdOutput>> allocateId(AllocateIdInput input) {
String idKey = input.getIdKey();
String poolName = input.getPoolName();
- String localPoolName = idUtils.getLocalPoolName(poolName);
- long newIdValue = -1;
- AllocateIdOutputBuilder output = new AllocateIdOutputBuilder();
- Future<RpcResult<AllocateIdOutput>> futureResult;
- try {
- //allocateIdFromLocalPool method returns a list of IDs with one element. This element is obtained by get(0)
- newIdValue = allocateIdFromLocalPool(poolName, localPoolName, idKey, 1).get(0);
- output.setIdValue(newIdValue);
- futureResult = RpcResultBuilder.<AllocateIdOutput>success().withResult(output.build()).buildFuture();
- } catch (OperationFailedException | IdManagerException e) {
- futureResult = buildFailedRpcResultFuture("allocateId failed: " + input.toString(), e);
+ return FutureRpcResults.fromBuilder(LOG, "allocateId", input, () -> {
+ String localPoolName = idUtils.getLocalPoolName(poolName);
+ // allocateIdFromLocalPool method returns a list of IDs with one element. This element is obtained by get(0)
+ long newIdValue = allocateIdFromLocalPool(poolName, localPoolName, idKey, 1).get(0);
+ return new AllocateIdOutputBuilder().setIdValue(newIdValue);
+ }).onFailure(e -> completeExceptionallyIfPresent(poolName, idKey, e)).build();
+ }
+
+ private void completeExceptionallyIfPresent(String poolName, String idKey, Throwable exception) {
+ CompletableFuture<List<Long>> completableFuture =
+ idUtils.removeAllocatedIds(idUtils.getUniqueKey(poolName, idKey));
+ if (completableFuture != null) {
+ completableFuture.completeExceptionally(exception);
}
- return futureResult;
}
@Override
- public Future<RpcResult<AllocateIdRangeOutput>> allocateIdRange(AllocateIdRangeInput input) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("AllocateIdRange called with input {}", input);
- }
+ public ListenableFuture<RpcResult<AllocateIdRangeOutput>> allocateIdRange(AllocateIdRangeInput input) {
String idKey = input.getIdKey();
String poolName = input.getPoolName();
long size = input.getSize();
String localPoolName = idUtils.getLocalPoolName(poolName);
- List<Long> newIdValuesList = new ArrayList<>();
AllocateIdRangeOutputBuilder output = new AllocateIdRangeOutputBuilder();
- Future<RpcResult<AllocateIdRangeOutput>> futureResult;
- try {
- newIdValuesList = allocateIdFromLocalPool(poolName, localPoolName, idKey, size);
+ return FutureRpcResults.fromBuilder(LOG, "allocateIdRange", input, () -> {
+ List<Long> newIdValuesList = allocateIdFromLocalPool(poolName, localPoolName, idKey, size);
Collections.sort(newIdValuesList);
output.setIdValues(newIdValuesList);
- futureResult = RpcResultBuilder.<AllocateIdRangeOutput>success().withResult(output.build()).buildFuture();
- } catch (OperationFailedException | IdManagerException e) {
- futureResult = buildFailedRpcResultFuture("allocateIdRange failed: " + input.toString(), e);
- }
- return futureResult;
+ return output;
+ }).onFailure(e -> completeExceptionallyIfPresent(poolName, idKey, e)).build();
}
@Override
- public Future<RpcResult<Void>> deleteIdPool(DeleteIdPoolInput input) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("DeleteIdPool called with input {}", input);
- }
- String poolName = input.getPoolName();
- Future<RpcResult<Void>> futureResult;
- try {
+ public ListenableFuture<RpcResult<DeleteIdPoolOutput>> deleteIdPool(DeleteIdPoolInput input) {
+ return FutureRpcResults.fromListenableFuture(LOG, "deleteIdPool", input, () -> {
+ String poolName = input.getPoolName().intern();
InstanceIdentifier<IdPool> idPoolToBeDeleted = idUtils.getIdPoolInstance(poolName);
- poolName = poolName.intern();
synchronized (poolName) {
- IdPool idPool = singleTxDB.syncRead(CONFIGURATION, idPoolToBeDeleted);
+ IdPool idPool = singleTxDB.syncRead(LogicalDatastoreType.CONFIGURATION, idPoolToBeDeleted);
List<ChildPools> childPoolList = idPool.getChildPools();
if (childPoolList != null) {
- childPoolList.parallelStream().forEach(childPool -> deletePool(childPool.getChildPoolName()));
- }
- singleTxDB.syncDelete(CONFIGURATION, idPoolToBeDeleted);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Deleted id pool {}", poolName);
+ childPoolList.forEach(childPool -> deletePool(childPool.getChildPoolName()));
}
+ singleTxDB.syncDelete(LogicalDatastoreType.CONFIGURATION, idPoolToBeDeleted);
}
- futureResult = RpcResultBuilder.<Void>success().buildFuture();
- } catch (OperationFailedException e) {
- futureResult = buildFailedRpcResultFuture("deleteIdPool failed: " + input.toString(), e);
- }
- return futureResult;
+ // TODO return the Future from a TBD asyncDelete instead.. BUT check that all callers @CheckReturnValue
+ return Futures.immediateFuture((DeleteIdPoolOutput) null);
+ }).build();
}
@Override
- public Future<RpcResult<Void>> releaseId(ReleaseIdInput input) {
+ public ListenableFuture<RpcResult<ReleaseIdOutput>> releaseId(ReleaseIdInput input) {
String poolName = input.getPoolName();
String idKey = input.getIdKey();
- Future<RpcResult<Void>> futureResult;
- try {
+ String uniqueKey = idUtils.getUniqueKey(poolName, idKey);
+ return FutureRpcResults.fromListenableFuture(LOG, "releaseId", input, () -> {
+ idUtils.lock(lockManager, uniqueKey);
releaseIdFromLocalPool(poolName, idUtils.getLocalPoolName(poolName), idKey);
- futureResult = RpcResultBuilder.<Void>success().buildFuture();
- } catch (ReadFailedException | IdManagerException e) {
- futureResult = buildFailedRpcResultFuture("releaseId failed: " + input.toString(), e);
- }
- return futureResult;
- }
-
- private <T> ListenableFuture<RpcResult<T>> buildFailedRpcResultFuture(String msg, Exception exception) {
- LOG.error(msg, exception);
- RpcResultBuilder<T> failedRpcResultBuilder = RpcResultBuilder.failed();
- failedRpcResultBuilder.withError(ErrorType.APPLICATION, msg, exception);
- if (exception instanceof OperationFailedException) {
- failedRpcResultBuilder.withRpcErrors(((OperationFailedException) exception).getErrorList());
- }
- return failedRpcResultBuilder.buildFuture();
+ // TODO return the Future from releaseIdFromLocalPool() instead.. check all callers @CheckReturnValue
+ return Futures.immediateFuture((ReleaseIdOutput) null);
+ }).onFailureLogLevel(org.opendaylight.serviceutils.tools.mdsal.rpc.FutureRpcResults.LogLevel.NONE)
+ .onFailure(e -> {
+ if (e instanceof IdDoesNotExistException) {
+ // Do not log full stack trace in case ID does not exist
+ LOG.error("RPC releaseId() failed due to IdDoesNotExistException; input = {}", input);
+ } else {
+ // But for all other cases do:
+ LOG.error("RPC releaseId() failed; input = {}", input, e);
+ }
+ idUtils.unlock(lockManager, uniqueKey);
+ }).build();
}
- private List<Long> allocateIdFromLocalPool(String parentPoolName, String localPoolName, String idKey, long size)
- throws OperationFailedException, IdManagerException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Allocating id from local pool {}. Parent pool {}. Idkey {}", localPoolName, parentPoolName,
- idKey);
- }
- long newIdValue = -1;
- List<Long> newIdValuesList = new ArrayList<>();
- localPoolName = localPoolName.intern();
- InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier = idUtils.getIdPoolInstance(parentPoolName);
- InstanceIdentifier<IdEntries> existingId = idUtils.getIdEntry(parentIdPoolInstanceIdentifier, idKey);
- Optional<IdEntries> existingIdEntry = singleTxDB.syncReadOptional(CONFIGURATION, existingId);
- if (existingIdEntry.isPresent()) {
- newIdValuesList = existingIdEntry.get().getIdValue();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Existing ids {} for the key {} ", newIdValuesList, idKey);
- }
- return newIdValuesList;
- }
- IdLocalPool localIdPool = localPool.get(parentPoolName);
- if (localIdPool == null) {
- idUtils.lockPool(lockManager, parentPoolName);
+ private List<Long> allocateIdFromLocalPool(String parentPoolName, String localPoolName,
+ String idKey, long size) throws OperationFailedException, IdManagerException {
+ LOG.debug("Allocating id from local pool {}. Parent pool {}. Idkey {}", localPoolName, parentPoolName, idKey);
+ String uniqueIdKey = idUtils.getUniqueKey(parentPoolName, idKey);
+ CompletableFuture<List<Long>> futureIdValues = new CompletableFuture<>();
+ CompletableFuture<List<Long>> existingFutureIdValue =
+ idUtils.putAllocatedIdsIfAbsent(uniqueIdKey, futureIdValues);
+ if (existingFutureIdValue != null) {
try {
- WriteTransaction tx = broker.newWriteOnlyTransaction();
- IdPool parentIdPool = singleTxDB.syncRead(CONFIGURATION, parentIdPoolInstanceIdentifier);
- localIdPool = createLocalPool(tx, localPoolName, parentIdPool); // Return localIdPool.....
- tx.submit().checkedGet();
- } finally {
- idUtils.unlockPool(lockManager, parentPoolName);
+ return existingFutureIdValue.get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.warn("Could not obtain id from existing futureIdValue for idKey {} and pool {}.",
+ idKey, parentPoolName);
+ throw new IdManagerException(e.getMessage(), e);
}
}
- if (LOG.isDebugEnabled()) {
+ try {
+ List<Long> newIdValuesList = checkForIdInIdEntries(parentPoolName, idKey, uniqueIdKey, futureIdValues,
+ false);
+ if (!newIdValuesList.isEmpty()) {
+ return newIdValuesList;
+ }
+ //This get will not help in concurrent reads. Hence the same read needs to be done again.
+ IdLocalPool localIdPool = getOrCreateLocalIdPool(parentPoolName, localPoolName);
LOG.debug("Got pool {}", localIdPool);
- }
- if (size == 1) {
- newIdValue = getIdFromLocalPoolCache(localIdPool, parentPoolName);
- newIdValuesList.add(newIdValue);
- } else {
- IdPool parentIdPool = singleTxDB.syncRead(CONFIGURATION, parentIdPoolInstanceIdentifier);
- long totalAvailableIdCount = localIdPool.getAvailableIds().getAvailableIdCount()
- + localIdPool.getReleasedIds().getAvailableIdCount();
- AvailableIdsHolderBuilder availableParentIds = idUtils.getAvailableIdsHolderBuilder(parentIdPool);
- ReleasedIdsHolderBuilder releasedParentIds = idUtils.getReleaseIdsHolderBuilder(parentIdPool);
- totalAvailableIdCount = totalAvailableIdCount + releasedParentIds.getAvailableIdCount()
- + idUtils.getAvailableIdsCount(availableParentIds);
- if (totalAvailableIdCount > size) {
- while (size > 0) {
- try {
- newIdValue = getIdFromLocalPoolCache(localIdPool, parentPoolName);
- } catch (OperationFailedException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Releasing IDs to pool {}", localPoolName);
- }
- // Releasing the IDs added in newIdValuesList since a null list would be returned now, as the
- // requested size of list IDs exceeds the number of available IDs.
- updateDelayedEntriesInLocalCache(newIdValuesList, parentPoolName, localIdPool);
- }
- newIdValuesList.add(newIdValue);
- size--;
- }
+ long newIdValue = -1;
+ localPoolName = localPoolName.intern();
+ if (size == 1) {
+ newIdValue = getIdFromLocalPoolCache(localIdPool, parentPoolName);
+ newIdValuesList.add(newIdValue);
} else {
- return null;
+ getRangeOfIds(parentPoolName, localPoolName, size, newIdValuesList, localIdPool, newIdValue);
}
- }
- if (LOG.isDebugEnabled()) {
LOG.debug("The newIdValues {} for the idKey {}", newIdValuesList, idKey);
+ idUtils.putReleaseIdLatch(uniqueIdKey, new CountDownLatch(1));
+ UpdateIdEntryJob job = new UpdateIdEntryJob(parentPoolName, localPoolName, idKey, newIdValuesList, txRunner,
+ idUtils, lockManager);
+ jobCoordinator.enqueueJob(parentPoolName, job, IdUtils.RETRY_COUNT);
+ futureIdValues.complete(newIdValuesList);
+ return newIdValuesList;
+ } catch (OperationFailedException | IdManagerException e) {
+ idUtils.unlock(lockManager, uniqueIdKey);
+ throw e;
}
- UpdateIdEntryJob job = new UpdateIdEntryJob(parentPoolName, localPoolName, idKey, newIdValuesList, broker,
- idUtils);
- DataStoreJobCoordinator.getInstance().enqueueJob(parentPoolName, job, IdUtils.RETRY_COUNT);
- return newIdValuesList;
}
private Long getIdFromLocalPoolCache(IdLocalPool localIdPool, String parentPoolName)
- throws OperationFailedException, IdManagerException {
+ throws IdManagerException {
while (true) {
- IdHolder releasedIds = localIdPool.getReleasedIds();
- Optional<Long> releasedId = releasedIds.allocateId();
- if (releasedId.isPresent()) {
- IdHolderSyncJob poolSyncJob = new IdHolderSyncJob(localIdPool.getPoolName(), releasedIds, broker,
- idUtils);
- DataStoreJobCoordinator.getInstance().enqueueJob(localIdPool.getPoolName(), poolSyncJob,
- IdUtils.RETRY_COUNT);
- return releasedId.get();
- }
IdHolder availableIds = localIdPool.getAvailableIds();
if (availableIds != null) {
Optional<Long> availableId = availableIds.allocateId();
if (availableId.isPresent()) {
- IdHolderSyncJob poolSyncJob = new IdHolderSyncJob(localIdPool.getPoolName(), availableIds, broker,
- idUtils);
- DataStoreJobCoordinator.getInstance().enqueueJob(localIdPool.getPoolName(), poolSyncJob,
- IdUtils.RETRY_COUNT);
+ IdHolderSyncJob poolSyncJob =
+ new IdHolderSyncJob(localIdPool.getPoolName(), localIdPool.getAvailableIds(), txRunner,
+ idUtils);
+ jobCoordinator.enqueueJob(localIdPool.getPoolName(), poolSyncJob, IdUtils.RETRY_COUNT);
return availableId.get();
}
}
+ IdHolder releasedIds = localIdPool.getReleasedIds();
+ Optional<Long> releasedId = releasedIds.allocateId();
+ if (releasedId.isPresent()) {
+ IdHolderSyncJob poolSyncJob =
+ new IdHolderSyncJob(localIdPool.getPoolName(), localIdPool.getReleasedIds(), txRunner,
+ idUtils);
+ jobCoordinator.enqueueJob(localIdPool.getPoolName(), poolSyncJob, IdUtils.RETRY_COUNT);
+ return releasedId.get();
+ }
long idCount = getIdBlockFromParentPool(parentPoolName, localIdPool);
if (idCount <= 0) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Unable to allocate Id block from global pool");
+ LOG.debug("Unable to allocate Id block from global pool {}", parentPoolName);
}
throw new IdManagerException(String.format("Ids exhausted for pool : %s", parentPoolName));
}
* Changes made to availableIds and releasedIds will not be persisted to the datastore.
*/
private long getIdBlockFromParentPool(String parentPoolName, IdLocalPool localIdPool)
- throws OperationFailedException, IdManagerException {
+ throws IdManagerException {
if (LOG.isDebugEnabled()) {
LOG.debug("Allocating block of id from parent pool {}", parentPoolName);
}
InstanceIdentifier<IdPool> idPoolInstanceIdentifier = idUtils.getIdPoolInstance(parentPoolName);
parentPoolName = parentPoolName.intern();
- idUtils.lockPool(lockManager, parentPoolName);
+ idUtils.lock(lockManager, parentPoolName);
try {
// Check if the childpool already got id block.
long availableIdCount =
if (availableIdCount > 0) {
return availableIdCount;
}
- WriteTransaction tx = broker.newWriteOnlyTransaction();
- IdPool parentIdPool = singleTxDB.syncRead(CONFIGURATION, idPoolInstanceIdentifier);
- long idCount = allocateIdBlockFromParentPool(localIdPool, parentIdPool, tx);
- tx.submit().checkedGet();
- return idCount;
+ return txRunner.applyWithNewReadWriteTransactionAndSubmit(CONFIGURATION, confTx -> {
+ Optional<IdPool> parentIdPool = confTx.read(idPoolInstanceIdentifier).get();
+ if (parentIdPool.isPresent()) {
+ return allocateIdBlockFromParentPool(localIdPool, parentIdPool.get(), confTx);
+ } else {
+ throw new ExpectedDataObjectNotFoundException(LogicalDatastoreType.CONFIGURATION,
+ idPoolInstanceIdentifier);
+ }
+ }).get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IdManagerException("Error getting id block from parent pool", e);
} finally {
- idUtils.unlockPool(lockManager, parentPoolName);
+ idUtils.unlock(lockManager, parentPoolName);
}
}
- private long allocateIdBlockFromParentPool(IdLocalPool localPoolCache, IdPool parentIdPool, WriteTransaction tx)
+ private long allocateIdBlockFromParentPool(IdLocalPool localPoolCache, IdPool parentIdPool,
+ TypedWriteTransaction<Configuration> confTx)
throws OperationFailedException, IdManagerException {
- long idCount = -1;
- ReleasedIdsHolderBuilder releasedIdsBuilderParent = idUtils.getReleaseIdsHolderBuilder(parentIdPool);
+ long idCount;
+ ReleasedIdsHolderBuilder releasedIdsBuilderParent = IdUtils.getReleaseIdsHolderBuilder(parentIdPool);
while (true) {
- idCount = allocateIdBlockFromReleasedIdsHolder(localPoolCache, releasedIdsBuilderParent, parentIdPool, tx);
+ idCount = allocateIdBlockFromAvailableIdsHolder(localPoolCache, parentIdPool, confTx);
if (idCount > 0) {
return idCount;
}
- idCount = allocateIdBlockFromAvailableIdsHolder(localPoolCache, parentIdPool, tx);
+ idCount = allocateIdBlockFromReleasedIdsHolder(localPoolCache, releasedIdsBuilderParent, parentIdPool,
+ confTx);
if (idCount > 0) {
return idCount;
}
List<ChildPools> childPoolsList = parentIdPool.getChildPools();
// Sorting the child pools on last accessed time so that the pool that
// was not accessed for a long time comes first.
- Collections.sort(childPoolsList,
- (childPool1, childPool2) -> childPool1.getLastAccessTime().compareTo(childPool2.getLastAccessTime()));
+ childPoolsList.sort(comparing(ChildPools::getLastAccessTime));
long currentTime = System.currentTimeMillis() / 1000;
for (ChildPools childPools : childPoolsList) {
if (childPools.getLastAccessTime() + DEFAULT_IDLE_TIME > currentTime) {
if (!childPools.getChildPoolName().equals(idUtils.getLocalPoolName(parentIdPool.getPoolName()))) {
InstanceIdentifier<IdPool> idPoolInstanceIdentifier = idUtils
.getIdPoolInstance(childPools.getChildPoolName());
- IdPool otherChildPool = singleTxDB.syncRead(CONFIGURATION, idPoolInstanceIdentifier);
- ReleasedIdsHolderBuilder releasedIds = idUtils.getReleaseIdsHolderBuilder(otherChildPool);
+ IdPool otherChildPool =
+ singleTxDB.syncRead(LogicalDatastoreType.CONFIGURATION, idPoolInstanceIdentifier);
+ ReleasedIdsHolderBuilder releasedIds = IdUtils.getReleaseIdsHolderBuilder(otherChildPool);
List<DelayedIdEntries> delayedIdEntriesChild = releasedIds.getDelayedIdEntries();
List<DelayedIdEntries> delayedIdEntriesParent = releasedIdsBuilderParent.getDelayedIdEntries();
delayedIdEntriesParent = new LinkedList<>();
}
delayedIdEntriesParent.addAll(delayedIdEntriesChild);
- delayedIdEntriesChild.removeAll(delayedIdEntriesChild);
+ delayedIdEntriesChild.clear();
AvailableIdsHolderBuilder availableIds = idUtils.getAvailableIdsHolderBuilder(otherChildPool);
while (idUtils.isIdAvailable(availableIds)) {
+ idUtils.getAvailableIdsCount(availableIds);
long count = releasedIdsBuilderParent.getAvailableIdCount() + totalAvailableIdCount;
releasedIdsBuilderParent.setDelayedIdEntries(delayedIdEntriesParent).setAvailableIdCount(count);
- singleTxDB.syncUpdate(CONFIGURATION, idPoolInstanceIdentifier,
- new IdPoolBuilder().setKey(new IdPoolKey(otherChildPool.getPoolName()))
+ singleTxDB.syncUpdate(LogicalDatastoreType.CONFIGURATION, idPoolInstanceIdentifier,
+ new IdPoolBuilder().withKey(new IdPoolKey(otherChildPool.getPoolName()))
.setAvailableIdsHolder(availableIds.build()).setReleasedIdsHolder(releasedIds.build())
.build());
return totalAvailableIdCount;
}
private long allocateIdBlockFromReleasedIdsHolder(IdLocalPool localIdPool,
- ReleasedIdsHolderBuilder releasedIdsBuilderParent, IdPool parentIdPool, WriteTransaction tx) {
+ ReleasedIdsHolderBuilder releasedIdsBuilderParent, IdPool parentIdPool,
+ TypedWriteTransaction<Configuration> confTx) {
if (releasedIdsBuilderParent.getAvailableIdCount() == 0) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Ids unavailable in releasedIds of parent pool {}", parentIdPool);
- }
+ LOG.debug("Ids unavailable in releasedIds of parent pool {}", parentIdPool);
return 0;
}
List<DelayedIdEntries> delayedIdEntriesParent = releasedIdsBuilderParent.getDelayedIdEntries();
List<DelayedIdEntries> idEntriesToBeRemoved = delayedIdEntriesParent.subList(0, idCount);
ReleasedIdHolder releasedIds = (ReleasedIdHolder) localIdPool.getReleasedIds();
List<DelayedIdEntry> delayedIdEntriesLocalCache = releasedIds.getDelayedEntries();
- delayedIdEntriesLocalCache = idEntriesToBeRemoved
- .parallelStream()
+ List<DelayedIdEntry> delayedIdEntriesFromParentPool = idEntriesToBeRemoved
+ .stream()
.map(delayedIdEntry -> new DelayedIdEntry(delayedIdEntry
.getId(), delayedIdEntry.getReadyTimeSec()))
- .sorted((idEntry1, idEntry2) -> Long.compare(idEntry1.getReadyTimeSec(),
- idEntry2.getReadyTimeSec())).collect(Collectors.toList());
- releasedIds.setDelayedEntries(delayedIdEntriesLocalCache);
+ .sorted(comparing(DelayedIdEntry::getReadyTimeSec))
+ .collect(toCollection(ArrayList::new));
+ delayedIdEntriesFromParentPool.addAll(delayedIdEntriesLocalCache);
+ releasedIds.replaceDelayedEntries(delayedIdEntriesFromParentPool);
releasedIds.setAvailableIdCount(releasedIds.getAvailableIdCount() + idCount);
localIdPool.setReleasedIds(releasedIds);
delayedIdEntriesParent.removeAll(idEntriesToBeRemoved);
.builder(IdPools.class).child(IdPool.class,
new IdPoolKey(parentIdPool.getPoolName())).child(ReleasedIdsHolder.class).build();
releasedIdsBuilderParent.setAvailableIdCount(releasedIdsBuilderParent.getAvailableIdCount() - idCount);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Allocated {} ids from releasedIds of parent pool {}", idCount, parentIdPool);
- }
- tx.merge(CONFIGURATION, releasedIdsHolderInstanceIdentifier,
- releasedIdsBuilderParent.build(), true);
+ LOG.debug("Allocated {} ids from releasedIds of parent pool {}", idCount, parentIdPool);
+ confTx.merge(releasedIdsHolderInstanceIdentifier, releasedIdsBuilderParent.build(), CREATE_MISSING_PARENTS);
return idCount;
}
private long allocateIdBlockFromAvailableIdsHolder(IdLocalPool localIdPool, IdPool parentIdPool,
- WriteTransaction tx) {
+ TypedWriteTransaction<Configuration> confTx) {
long idCount = 0;
AvailableIdsHolderBuilder availableIdsBuilderParent = idUtils.getAvailableIdsHolderBuilder(parentIdPool);
long end = availableIdsBuilderParent.getEnd();
if (LOG.isDebugEnabled()) {
LOG.debug("Allocated {} ids from availableIds of global pool {}", idCount, parentIdPool);
}
- tx.merge(CONFIGURATION, availableIdsHolderInstanceIdentifier,
- availableIdsBuilderParent.build(), true);
+ confTx.merge(availableIdsHolderInstanceIdentifier, availableIdsBuilderParent.build(), CREATE_MISSING_PARENTS);
return idCount;
}
private void releaseIdFromLocalPool(String parentPoolName, String localPoolName, String idKey)
throws ReadFailedException, IdManagerException {
+ String idLatchKey = idUtils.getUniqueKey(parentPoolName, idKey);
+ LOG.debug("Releasing ID {} from pool {}", idKey, localPoolName);
+ CountDownLatch latch = idUtils.getReleaseIdLatch(idLatchKey);
+ if (latch != null) {
+ try {
+ if (!latch.await(10, TimeUnit.SECONDS)) {
+ LOG.warn("Timed out while releasing id {} from id pool {}", idKey, parentPoolName);
+ }
+ } catch (InterruptedException ignored) {
+ LOG.warn("Thread interrupted while releasing id {} from id pool {}", idKey, parentPoolName);
+ } finally {
+ idUtils.removeReleaseIdLatch(idLatchKey);
+ }
+ }
localPoolName = localPoolName.intern();
InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier = idUtils.getIdPoolInstance(parentPoolName);
- IdPool parentIdPool = singleTxDB.syncRead(CONFIGURATION, parentIdPoolInstanceIdentifier);
+ IdPool parentIdPool = singleTxDB.syncRead(LogicalDatastoreType.CONFIGURATION, parentIdPoolInstanceIdentifier);
List<IdEntries> idEntries = parentIdPool.getIdEntries();
- List<IdEntries> newIdEntries = idEntries;
if (idEntries == null) {
- throw new IdManagerException("Id Entries does not exist");
+ throw new IdDoesNotExistException(parentPoolName, idKey);
}
InstanceIdentifier<IdEntries> existingId = idUtils.getIdEntry(parentIdPoolInstanceIdentifier, idKey);
- Optional<IdEntries> existingIdEntryObject = singleTxDB.syncReadOptional(CONFIGURATION, existingId);
+ Optional<IdEntries> existingIdEntryObject =
+ singleTxDB.syncReadOptional(LogicalDatastoreType.CONFIGURATION, existingId);
if (!existingIdEntryObject.isPresent()) {
- throw new IdManagerException(
- String.format("Specified Id key %s does not exist in id pool %s", idKey, parentPoolName));
+ LOG.info("Specified Id key {} does not exist in id pool {}", idKey, parentPoolName);
+ idUtils.unlock(lockManager, idLatchKey);
+ return;
}
IdEntries existingIdEntry = existingIdEntryObject.get();
List<Long> idValuesList = existingIdEntry.getIdValue();
IdLocalPool localIdPoolCache = localPool.get(parentPoolName);
- boolean isRemoved = newIdEntries.remove(existingIdEntry);
- if (LOG.isDebugEnabled()) {
- LOG.debug("The entry {} is removed {}", existingIdEntry, isRemoved);
- }
+ boolean isRemoved = idEntries.remove(existingIdEntry);
+ LOG.debug("The entry {} is removed {}", existingIdEntry, isRemoved);
updateDelayedEntriesInLocalCache(idValuesList, parentPoolName, localIdPoolCache);
- IdHolderSyncJob poolSyncJob = new IdHolderSyncJob(localPoolName, localIdPoolCache.getReleasedIds(), broker,
+ IdHolderSyncJob poolSyncJob = new IdHolderSyncJob(localPoolName, localIdPoolCache.getReleasedIds(), txRunner,
idUtils);
- DataStoreJobCoordinator.getInstance().enqueueJob(localPoolName, poolSyncJob, IdUtils.RETRY_COUNT);
+ jobCoordinator.enqueueJob(localPoolName, poolSyncJob, IdUtils.RETRY_COUNT);
scheduleCleanUpTask(localIdPoolCache, parentPoolName, parentIdPool.getBlockSize());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Released id ({}, {}) from pool {}", idKey, idValuesList, localPoolName);
- }
+ LOG.debug("Released id ({}, {}) from pool {}", idKey, idValuesList, localPoolName);
// Updating id entries in the parent pool. This will be used for restart scenario
- UpdateIdEntryJob job = new UpdateIdEntryJob(parentPoolName, localPoolName, idKey, null, broker, idUtils);
- DataStoreJobCoordinator.getInstance().enqueueJob(parentPoolName, job, IdUtils.RETRY_COUNT);
+ UpdateIdEntryJob job = new UpdateIdEntryJob(parentPoolName, localPoolName, idKey, null, txRunner, idUtils,
+ lockManager);
+ jobCoordinator.enqueueJob(parentPoolName, job, IdUtils.RETRY_COUNT);
}
private void scheduleCleanUpTask(final IdLocalPool localIdPoolCache,
TimerTask scheduledTask = new TimerTask() {
@Override
public void run() {
- CleanUpJob job = new CleanUpJob(localIdPoolCache, broker, parentPoolName, blockSize, lockManager,
- idUtils);
- DataStoreJobCoordinator.getInstance().enqueueJob(localIdPoolCache.getPoolName(), job,
- IdUtils.RETRY_COUNT);
+ CleanUpJob job =
+ new CleanUpJob(localIdPoolCache, txRunner, broker, parentPoolName, blockSize, lockManager,
+ idUtils, jobCoordinator);
+ jobCoordinator.enqueueJob(localIdPoolCache.getPoolName(), job, IdUtils.RETRY_COUNT);
}
};
cleanJobTimer.schedule(scheduledTask, IdUtils.DEFAULT_DELAY_TIME * 1000);
}
- private IdPool createGlobalPool(WriteTransaction tx, String poolName, long low, long high, long blockSize)
- throws ReadFailedException {
+ private IdPool createGlobalPool(TypedReadWriteTransaction<Configuration> confTx, String poolName, long low,
+ long high, long blockSize) throws IdManagerException {
IdPool idPool;
InstanceIdentifier<IdPool> idPoolInstanceIdentifier = idUtils.getIdPoolInstance(poolName);
- Optional<IdPool> existingIdPool = singleTxDB.syncReadOptional(CONFIGURATION, idPoolInstanceIdentifier);
- if (!existingIdPool.isPresent()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Creating new global pool {}", poolName);
- }
- idPool = idUtils.createGlobalPool(poolName, low, high, blockSize);
- tx.put(CONFIGURATION, idPoolInstanceIdentifier, idPool, true);
- } else {
- idPool = existingIdPool.get();
- if (LOG.isDebugEnabled()) {
- LOG.debug("GlobalPool exists {}", idPool);
+ try {
+ Optional<IdPool> existingIdPool = confTx.read(idPoolInstanceIdentifier).get();
+ if (!existingIdPool.isPresent()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Creating new global pool {}", poolName);
+ }
+ idPool = idUtils.createGlobalPool(poolName, low, high, blockSize);
+ confTx.put(idPoolInstanceIdentifier, idPool, CREATE_MISSING_PARENTS);
+ } else {
+ idPool = existingIdPool.get();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("GlobalPool exists {}", idPool);
+ }
}
+ return idPool;
+ } catch (ExecutionException | InterruptedException e) {
+ throw new IdManagerException("Error retrieving the existing id pool for " + poolName, e);
}
- return idPool;
}
- private IdLocalPool createLocalPool(WriteTransaction tx, String localPoolName, IdPool idPool)
+ private IdLocalPool createLocalPool(TypedWriteTransaction<Configuration> confTx, String localPoolName,
+ IdPool idPool)
throws OperationFailedException, IdManagerException {
localPoolName = localPoolName.intern();
IdLocalPool idLocalPool = new IdLocalPool(idUtils, localPoolName);
- allocateIdBlockFromParentPool(idLocalPool, idPool, tx);
+ allocateIdBlockFromParentPool(idLocalPool, idPool, confTx);
String parentPool = idPool.getPoolName();
localPool.put(parentPool, idLocalPool);
- LocalPoolCreateJob job = new LocalPoolCreateJob(idLocalPool, broker, idPool.getPoolName(),
+ LocalPoolCreateJob job = new LocalPoolCreateJob(idLocalPool, txRunner, idPool.getPoolName(),
idPool.getBlockSize(), idUtils);
- DataStoreJobCoordinator.getInstance().enqueueJob(localPoolName, job, IdUtils.RETRY_COUNT);
+ jobCoordinator.enqueueJob(localPoolName, job, IdUtils.RETRY_COUNT);
return idLocalPool;
}
private void deletePool(String poolName) {
- LocalPoolDeleteJob job = new LocalPoolDeleteJob(poolName, broker, idUtils);
- DataStoreJobCoordinator.getInstance().enqueueJob(poolName, job, IdUtils.RETRY_COUNT);
+ LocalPoolDeleteJob job = new LocalPoolDeleteJob(poolName, txRunner, idUtils);
+ jobCoordinator.enqueueJob(poolName, job, IdUtils.RETRY_COUNT);
}
public void poolDeleted(String parentPoolName, String poolName) {
localPool.put(parentPoolName, localPoolCache);
}
+ private List<Long> checkForIdInIdEntries(String parentPoolName, String idKey, String uniqueIdKey,
+ CompletableFuture<List<Long>> futureIdValues, boolean hasExistingFutureIdValues)
+ throws IdManagerException, ReadFailedException {
+ InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier = idUtils.getIdPoolInstance(parentPoolName);
+ InstanceIdentifier<IdEntries> existingId = idUtils.getIdEntry(parentIdPoolInstanceIdentifier, idKey);
+ idUtils.lock(lockManager, uniqueIdKey);
+ List<Long> newIdValuesList = new ArrayList<>();
+ Optional<IdEntries> existingIdEntry =
+ singleTxDB.syncReadOptional(LogicalDatastoreType.CONFIGURATION, existingId);
+ if (existingIdEntry.isPresent()) {
+ newIdValuesList = existingIdEntry.get().getIdValue();
+ LOG.debug("Existing ids {} for the key {} ", newIdValuesList, idKey);
+ // Inform other waiting threads about this new value.
+ futureIdValues.complete(newIdValuesList);
+ // This is to avoid stale entries in the map. If this thread had populated the map,
+ // then the entry should be removed.
+ if (!hasExistingFutureIdValues) {
+ idUtils.removeAllocatedIds(uniqueIdKey);
+ }
+ idUtils.unlock(lockManager, uniqueIdKey);
+ return newIdValuesList;
+ }
+ return newIdValuesList;
+ }
+
+ private IdLocalPool getOrCreateLocalIdPool(String parentPoolName, String localPoolName)
+ throws IdManagerException, ReadFailedException {
+ IdLocalPool localIdPool = localPool.get(parentPoolName);
+ if (localIdPool == null) {
+ idUtils.lock(lockManager, parentPoolName);
+ try {
+ // Check if a previous thread that got the cluster-wide lock
+ // first, has created the localPool
+ InstanceIdentifier<IdPool> childIdPoolInstanceIdentifier = idUtils
+ .getIdPoolInstance(localPoolName);
+ IdPool childIdPool = singleTxDB.syncRead(LogicalDatastoreType.CONFIGURATION,
+ childIdPoolInstanceIdentifier);
+ if (childIdPool != null) {
+ updateLocalIdPoolCache(childIdPool, parentPoolName);
+ }
+ if (localPool.get(parentPoolName) == null) {
+ try {
+ return txRunner.applyWithNewReadWriteTransactionAndSubmit(CONFIGURATION, confTx -> {
+ InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier = idUtils
+ .getIdPoolInstance(parentPoolName);
+ Optional<IdPool> parentIdPool = confTx.read(parentIdPoolInstanceIdentifier).get();
+ if (parentIdPool.isPresent()) {
+ // Return localIdPool
+ return createLocalPool(confTx, localPoolName, parentIdPool.get());
+ } else {
+ throw new ExpectedDataObjectNotFoundException(LogicalDatastoreType.CONFIGURATION,
+ parentIdPoolInstanceIdentifier);
+ }
+ }).get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IdManagerException("Error creating a local id pool", e);
+ }
+ } else {
+ localIdPool = localPool.get(parentPoolName);
+ }
+ } finally {
+ idUtils.unlock(lockManager, parentPoolName);
+ }
+ }
+ return localIdPool;
+ }
+
+ private void getRangeOfIds(String parentPoolName, String localPoolName, long size, List<Long> newIdValuesList,
+ IdLocalPool localIdPool, long newIdValue) throws ReadFailedException, IdManagerException {
+ InstanceIdentifier<IdPool> parentIdPoolInstanceIdentifier1 = idUtils.getIdPoolInstance(parentPoolName);
+ IdPool parentIdPool = singleTxDB.syncRead(LogicalDatastoreType.CONFIGURATION, parentIdPoolInstanceIdentifier1);
+ long totalAvailableIdCount = localIdPool.getAvailableIds().getAvailableIdCount()
+ + localIdPool.getReleasedIds().getAvailableIdCount();
+ AvailableIdsHolderBuilder availableParentIds = idUtils.getAvailableIdsHolderBuilder(parentIdPool);
+ ReleasedIdsHolderBuilder releasedParentIds = IdUtils.getReleaseIdsHolderBuilder(parentIdPool);
+ totalAvailableIdCount = totalAvailableIdCount + releasedParentIds.getAvailableIdCount()
+ + idUtils.getAvailableIdsCount(availableParentIds);
+ if (totalAvailableIdCount > size) {
+ while (size > 0) {
+ try {
+ newIdValue = getIdFromLocalPoolCache(localIdPool, parentPoolName);
+ } catch (IdManagerException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Releasing IDs to pool {}", localPoolName);
+ }
+ // Releasing the IDs added in newIdValuesList since
+ // a null list would be returned now, as the
+ // requested size of list IDs exceeds the number of
+ // available IDs.
+ updateDelayedEntriesInLocalCache(newIdValuesList, parentPoolName, localIdPool);
+ }
+ newIdValuesList.add(newIdValue);
+ size--;
+ }
+ } else {
+ throw new IdManagerException(String.format("Ids exhausted for pool : %s", parentPoolName));
+ }
+ }
}