-
- acquireLock();
- LOG.debug("Number of merge requests waiting in queue :{}", lock.getQueueLength());
- rwTx.merge(store, path, data);
- }
-
- public FluentFuture<? extends @NonNull CommitInfo> commit() {
- acquireLock();
- FluentFuture<? extends @NonNull CommitInfo> future = null;
- future = rwTx.commit();
- releaseLock();
- resetRwTx();
- return future;
- }
-
- public void close() {
- releaseLock();
- }
-
- private void acquireLock() {
- if (!lock.writeLock().isHeldByCurrentThread()) {
- lock.writeLock().lock();
- LOG.debug("Number of write lock requests waiting in queue :{}", lock.getQueueLength());
- LOG.info("Write Lock acquired by : {}", Thread.currentThread().getName());
- rwTx = resetRwTx();
- } else {
- LOG.debug("Lock already acquired by : {}", Thread.currentThread().getName());
+ String thread = Thread.currentThread().getName();
+ writeL.lock();
+ LOG.debug("merge locked {} by {}", store, thread);
+ try {
+ if (!writeTrMap.containsKey(thread)) {
+ writeTrMap.put(thread, dataBroker.newWriteOnlyTransaction());
+ }
+ writeTrMap.get(thread).merge(store, path, data);