private static final int PERIODICITY_IN_MS = 500;
private static final int BATCH_SIZE = 1000;
- public enum SHARD_RESOURCE {
+ public enum ShardResource {
CONFIG_TOPOLOGY(LogicalDatastoreType.CONFIGURATION),
OPERATIONAL_TOPOLOGY(LogicalDatastoreType.OPERATIONAL),
CONFIG_INVENTORY(LogicalDatastoreType.CONFIGURATION),
BlockingQueue<ActionableResource> queue = new LinkedBlockingQueue<>();
LogicalDatastoreType datastoreType;
- SHARD_RESOURCE(LogicalDatastoreType datastoreType) {
+ ShardResource(LogicalDatastoreType datastoreType) {
this.datastoreType = datastoreType;
}
- public LogicalDatastoreType getDatastoreType(){
+ public LogicalDatastoreType getDatastoreType() {
return datastoreType;
}
+
BlockingQueue<ActionableResource> getQueue() {
return queue;
}
throw new RuntimeException("Resource type already registered");
}
resourceHandlerMapper.put(resourceType, new ImmutablePair<>(resQueue, resHandler));
- ScheduledThreadPoolExecutor resDelegatorService = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1);
+ ScheduledThreadPoolExecutor resDelegatorService = (ScheduledThreadPoolExecutor) Executors
+ .newScheduledThreadPool(1);
resourceBatchingThreadMapper.put(resourceType, resDelegatorService);
LOG.info("Registered resourceType {} with batchSize {} and batchInterval {}", resourceType,
resHandler.getBatchSize(), resHandler.getBatchInterval());
Integer batchSize = Integer.getInteger("resource.manager.batch.size", BATCH_SIZE);
Integer batchInterval = Integer.getInteger("resource.manager.batch.periodicity.ms", PERIODICITY_IN_MS);
- for (SHARD_RESOURCE shardResource : SHARD_RESOURCE.values()) {
+ for (ShardResource shardResource : ShardResource.values()) {
if (resourceHandlerMapper.containsKey(shardResource.name())) {
continue;
}
- DefaultBatchHandler batchHandler = new DefaultBatchHandler(broker, shardResource.datastoreType, batchSize, batchInterval);
+ DefaultBatchHandler batchHandler = new DefaultBatchHandler(broker, shardResource.datastoreType, batchSize,
+ batchInterval);
registerBatchableResource(shardResource.name(), shardResource.getQueue(), batchHandler);
}
}
- public void put(SHARD_RESOURCE shardResource, InstanceIdentifier identifier, DataObject updatedData) {
+ public void merge(ShardResource shardResource, InstanceIdentifier identifier, DataObject updatedData) {
BlockingQueue<ActionableResource> queue = shardResource.getQueue();
if (queue != null) {
ActionableResource actResource = new ActionableResourceImpl(identifier.toString(),
- identifier, ActionableResource.CREATE, updatedData, null/*oldData*/);
+ identifier, ActionableResource.UPDATE, updatedData, null/*oldData*/);
queue.add(actResource);
}
}
- public void merge(SHARD_RESOURCE shardResource, InstanceIdentifier identifier, DataObject updatedData) {
- BlockingQueue<ActionableResource> queue = shardResource.getQueue();
+ public void merge(String resourceType, InstanceIdentifier identifier, DataObject updatedData) {
+ BlockingQueue<ActionableResource> queue = getQueue(resourceType);
if (queue != null) {
ActionableResource actResource = new ActionableResourceImpl(identifier.toString(),
identifier, ActionableResource.UPDATE, updatedData, null/*oldData*/);
}
}
- public void delete(SHARD_RESOURCE shardResource, InstanceIdentifier identifier) {
+ public void delete(ShardResource shardResource, InstanceIdentifier identifier) {
BlockingQueue<ActionableResource> queue = shardResource.getQueue();
if (queue != null) {
ActionableResource actResource = new ActionableResourceImpl(identifier.toString(),
}
}
- public void put(String resourceType, InstanceIdentifier identifier, DataObject updatedData) {
+ public void delete(String resourceType, InstanceIdentifier identifier) {
BlockingQueue<ActionableResource> queue = getQueue(resourceType);
if (queue != null) {
ActionableResource actResource = new ActionableResourceImpl(identifier.toString(),
- identifier, ActionableResource.CREATE, updatedData, null/*oldData*/);
+ identifier, ActionableResource.DELETE, null, null/*oldData*/);
queue.add(actResource);
}
}
- public void merge(String resourceType, InstanceIdentifier identifier, DataObject updatedData) {
- BlockingQueue<ActionableResource> queue = getQueue(resourceType);
+ public void put(ShardResource shardResource, InstanceIdentifier identifier, DataObject updatedData) {
+ BlockingQueue<ActionableResource> queue = shardResource.getQueue();
if (queue != null) {
ActionableResource actResource = new ActionableResourceImpl(identifier.toString(),
- identifier, ActionableResource.UPDATE, updatedData, null/*oldData*/);
+ identifier, ActionableResource.CREATE, updatedData, null/*oldData*/);
queue.add(actResource);
}
}
- public void delete(String resourceType, InstanceIdentifier identifier) {
+ public void put(String resourceType, InstanceIdentifier identifier, DataObject updatedData) {
BlockingQueue<ActionableResource> queue = getQueue(resourceType);
if (queue != null) {
ActionableResource actResource = new ActionableResourceImpl(identifier.toString(),
- identifier, ActionableResource.DELETE, null, null/*oldData*/);
+ identifier, ActionableResource.CREATE, updatedData, null/*oldData*/);
queue.add(actResource);
}
}
return null;
}
-
public void deregisterBatchableResource(String resourceType) {
resourceHandlerMapper.remove(resourceType);
resourceBatchingThreadMapper.remove(resourceType);
for (SubTransaction object : transactionObjects) {
WriteTransaction writeTransaction = broker.newWriteOnlyTransaction();
switch (object.getAction()) {
- case SubTransaction.CREATE :
- writeTransaction.put(dsType, object.getInstanceIdentifier(), (DataObject)object.getInstance(), true);
+ case SubTransaction.CREATE:
+ writeTransaction.put(dsType, object.getInstanceIdentifier(),
+ (DataObject) object.getInstance(), true);
break;
- case SubTransaction.DELETE :
+ case SubTransaction.DELETE:
writeTransaction.delete(dsType, object.getInstanceIdentifier());
break;
- case SubTransaction.UPDATE :
- writeTransaction.merge(dsType, object.getInstanceIdentifier(), (DataObject)object.getInstance(), true);
+ case SubTransaction.UPDATE:
+ writeTransaction.merge(dsType, object.getInstanceIdentifier(),
+ (DataObject) object.getInstance(), true);
break;
default:
- LOG.error("Unable to determine Action for transaction object with id {}", object.getInstanceIdentifier());
+ LOG.error("Unable to determine Action for transaction object with id {}",
+ object.getInstanceIdentifier());
}
- CheckedFuture<Void, TransactionCommitFailedException> futureOperation = writeTransaction.submit();
+ CheckedFuture<Void, TransactionCommitFailedException> futureOperation = writeTransaction
+ .submit();
try {
futureOperation.get();
} catch (InterruptedException | ExecutionException exception) {
}
}
}
-
- } catch (final Exception e) {
- LOG.error("Transaction submission failed", e);
+ } finally {
+ LOG.error("Transaction submission failed");
}
}
}