X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionProxy.java;h=6cf16b44268c6c16e26e0658632f61994ee33971;hb=34bc6ec632529a0dfe419aa7404bb42a456fbc96;hp=97a9ff0bf379ef3b8f6568ed37603ed285ba35a5;hpb=47a2c420e5f271cb78ce10ea96883c17cc4b2cc9;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 97a9ff0bf3..6cf16b4426 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -224,8 +224,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { new TransactionProxyCleanupPhantomReference(this); phantomReferenceCache.put(cleanup, cleanup); } - - LOG.debug("Created txn {} of type {}", identifier, transactionType); + if(LOG.isDebugEnabled()) { + LOG.debug("Created txn {} of type {}", identifier, transactionType); + } } @Override @@ -235,8 +236,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY, "Read operation on write-only transaction is not allowed"); - LOG.debug("Tx {} read {}", identifier, path); - + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} read {}", identifier, path); + } createTransactionIfMissing(actorContext, path); return transactionContext(path).readData(path); @@ -248,8 +250,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY, "Exists operation on write-only transaction is not allowed"); - LOG.debug("Tx {} exists {}", identifier, path); - + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} exists {}", identifier, path); + } createTransactionIfMissing(actorContext, path); return transactionContext(path).dataExists(path); @@ -267,8 +270,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { checkModificationState(); - LOG.debug("Tx {} write {}", identifier, path); - + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} write {}", identifier, path); + } createTransactionIfMissing(actorContext, path); transactionContext(path).writeData(path, data); @@ -279,8 +283,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { checkModificationState(); - LOG.debug("Tx {} merge {}", identifier, path); - + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} merge {}", identifier, path); + } createTransactionIfMissing(actorContext, path); transactionContext(path).mergeData(path, data); @@ -290,9 +295,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { public void delete(YangInstanceIdentifier path) { checkModificationState(); - - LOG.debug("Tx {} delete {}", identifier, path); - + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} delete {}", identifier, path); + } createTransactionIfMissing(actorContext, path); transactionContext(path).deleteData(path); @@ -305,16 +310,18 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { inReadyState = true; - LOG.debug("Tx {} Trying to get {} transactions ready for commit", identifier, + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} Trying to get {} transactions ready for commit", identifier, remoteTransactionPaths.size()); - + } List> cohortPathFutures = Lists.newArrayList(); for(TransactionContext transactionContext : remoteTransactionPaths.values()) { - LOG.debug("Tx {} Readying transaction for shard {}", identifier, + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} Readying transaction for shard {}", identifier, transactionContext.getShardName()); - + } cohortPathFutures.add(transactionContext.readyTransaction()); } @@ -381,8 +388,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { String transactionPath = reply.getTransactionPath(); - LOG.debug("Tx {} Received transaction path = {}", identifier, transactionPath); - + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} Received transaction path = {}", identifier, transactionPath); + } ActorSelection transactionActor = actorContext.actorSelection(transactionPath); if (transactionType == TransactionType.READ_ONLY) { @@ -404,7 +412,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { "Invalid reply type {} for CreateTransaction", response.getClass())); } } catch (Exception e) { - LOG.debug("Tx {} Creating NoOpTransaction because of : {}", identifier, e.getMessage()); + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} Creating NoOpTransaction because of : {}", identifier, e.getMessage()); + } remoteTransactionPaths .put(shardName, new NoOpTransactionContext(shardName, e, identifier)); } @@ -489,15 +499,18 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { @Override public void closeTransaction() { - LOG.debug("Tx {} closeTransaction called", identifier); + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} closeTransaction called", identifier); + } actorContext.sendRemoteOperationAsync(getActor(), new CloseTransaction().toSerializable()); } @Override public Future readyTransaction() { - LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending", + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending", identifier, recordedOperationFutures.size()); - + } // Send the ReadyTransaction message to the Tx actor. final Future replyFuture = actorContext.executeRemoteOperationAsync(getActor(), @@ -522,10 +535,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { return combinedFutures.transform(new AbstractFunction1, ActorPath>() { @Override public ActorPath apply(Iterable notUsed) { - - LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded", + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded", identifier); - + } // At this point all the Futures succeeded and we need to extract the cohort // actor path from the ReadyTransactionReply. For the recorded operations, they // don't return any data so we're only interested that they completed @@ -543,9 +556,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { String resolvedCohortPath = getResolvedCohortPath( reply.getCohortPath().toString()); - LOG.debug("Tx {} readyTransaction: resolved cohort path {}", + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} readyTransaction: resolved cohort path {}", identifier, resolvedCohortPath); - + } return actorContext.actorFor(resolvedCohortPath); } else { // Throwing an exception here will fail the Future. @@ -559,21 +573,27 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { @Override public void deleteData(YangInstanceIdentifier path) { - LOG.debug("Tx {} deleteData called path = {}", identifier, path); + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} deleteData called path = {}", identifier, path); + } recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(), new DeleteData(path).toSerializable() )); } @Override public void mergeData(YangInstanceIdentifier path, NormalizedNode data) { - LOG.debug("Tx {} mergeData called path = {}", identifier, path); + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} mergeData called path = {}", identifier, path); + } recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(), new MergeData(path, data, schemaContext).toSerializable())); } @Override public void writeData(YangInstanceIdentifier path, NormalizedNode data) { - LOG.debug("Tx {} writeData called path = {}", identifier, path); + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} writeData called path = {}", identifier, path); + } recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(), new WriteData(path, data, schemaContext).toSerializable())); } @@ -582,8 +602,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { public CheckedFuture>, ReadFailedException> readData( final YangInstanceIdentifier path) { - LOG.debug("Tx {} readData called path = {}", identifier, path); - + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} readData called path = {}", identifier, path); + } final SettableFuture>> returnFuture = SettableFuture.create(); // If there were any previous recorded put/merge/delete operation reply Futures then we @@ -593,9 +614,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { if(recordedOperationFutures.isEmpty()) { finishReadData(path, returnFuture); } else { - LOG.debug("Tx {} readData: verifying {} previous recorded operations", + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} readData: verifying {} previous recorded operations", identifier, recordedOperationFutures.size()); - + } // Note: we make a copy of recordedOperationFutures to be on the safe side in case // Futures#sequence accesses the passed List on a different thread, as // recordedOperationFutures is not synchronized. @@ -608,9 +630,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { public void onComplete(Throwable failure, Iterable notUsed) throws Throwable { if(failure != null) { - LOG.debug("Tx {} readData: a recorded operation failed: {}", + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} readData: a recorded operation failed: {}", identifier, failure); - + } returnFuture.setException(new ReadFailedException( "The read could not be performed because a previous put, merge," + "or delete operation failed", failure)); @@ -629,20 +652,23 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private void finishReadData(final YangInstanceIdentifier path, final SettableFuture>> returnFuture) { - LOG.debug("Tx {} finishReadData called path = {}", identifier, path); - + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} finishReadData called path = {}", identifier, path); + } OnComplete onComplete = new OnComplete() { @Override public void onComplete(Throwable failure, Object readResponse) throws Throwable { if(failure != null) { - LOG.debug("Tx {} read operation failed: {}", identifier, failure); - + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} read operation failed: {}", identifier, failure); + } returnFuture.setException(new ReadFailedException( "Error reading data for path " + path, failure)); } else { - LOG.debug("Tx {} read operation succeeded", identifier, failure); - + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} read operation succeeded", identifier, failure); + } if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) { ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext, path, readResponse); @@ -669,8 +695,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { public CheckedFuture dataExists( final YangInstanceIdentifier path) { - LOG.debug("Tx {} dataExists called path = {}", identifier, path); - + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} dataExists called path = {}", identifier, path); + } final SettableFuture returnFuture = SettableFuture.create(); // If there were any previous recorded put/merge/delete operation reply Futures then we @@ -681,9 +708,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { if(recordedOperationFutures.isEmpty()) { finishDataExists(path, returnFuture); } else { - LOG.debug("Tx {} dataExists: verifying {} previous recorded operations", + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} dataExists: verifying {} previous recorded operations", identifier, recordedOperationFutures.size()); - + } // Note: we make a copy of recordedOperationFutures to be on the safe side in case // Futures#sequence accesses the passed List on a different thread, as // recordedOperationFutures is not synchronized. @@ -696,9 +724,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { public void onComplete(Throwable failure, Iterable notUsed) throws Throwable { if(failure != null) { - LOG.debug("Tx {} dataExists: a recorded operation failed: {}", + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} dataExists: a recorded operation failed: {}", identifier, failure); - + } returnFuture.setException(new ReadFailedException( "The data exists could not be performed because a previous " + "put, merge, or delete operation failed", failure)); @@ -717,19 +746,22 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private void finishDataExists(final YangInstanceIdentifier path, final SettableFuture returnFuture) { - LOG.debug("Tx {} finishDataExists called path = {}", identifier, path); - + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} finishDataExists called path = {}", identifier, path); + } OnComplete onComplete = new OnComplete() { @Override public void onComplete(Throwable failure, Object response) throws Throwable { if(failure != null) { - LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure); - + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure); + } returnFuture.setException(new ReadFailedException( "Error checking data exists for path " + path, failure)); } else { - LOG.debug("Tx {} dataExists operation succeeded", identifier, failure); - + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} dataExists operation succeeded", identifier, failure); + } if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) { returnFuture.set(Boolean.valueOf(DataExistsReply. fromSerializable(response).exists())); @@ -761,34 +793,46 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { @Override public void closeTransaction() { - LOG.debug("NoOpTransactionContext {} closeTransaction called", identifier); + if(LOG.isDebugEnabled()) { + LOG.debug("NoOpTransactionContext {} closeTransaction called", identifier); + } } @Override public Future readyTransaction() { - LOG.debug("Tx {} readyTransaction called", identifier); + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} readyTransaction called", identifier); + } return akka.dispatch.Futures.failed(failure); } @Override public void deleteData(YangInstanceIdentifier path) { - LOG.debug("Tx {} deleteData called path = {}", identifier, path); + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} deleteData called path = {}", identifier, path); + } } @Override public void mergeData(YangInstanceIdentifier path, NormalizedNode data) { - LOG.debug("Tx {} mergeData called path = {}", identifier, path); + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} mergeData called path = {}", identifier, path); + } } @Override public void writeData(YangInstanceIdentifier path, NormalizedNode data) { - LOG.debug("Tx {} writeData called path = {}", identifier, path); + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} writeData called path = {}", identifier, path); + } } @Override public CheckedFuture>, ReadFailedException> readData( YangInstanceIdentifier path) { - LOG.debug("Tx {} readData called path = {}", identifier, path); + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} readData called path = {}", identifier, path); + } return Futures.immediateFailedCheckedFuture(new ReadFailedException( "Error reading data for path " + path, failure)); } @@ -796,7 +840,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { @Override public CheckedFuture dataExists( YangInstanceIdentifier path) { - LOG.debug("Tx {} dataExists called path = {}", identifier, path); + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} dataExists called path = {}", identifier, path); + } return Futures.immediateFailedCheckedFuture(new ReadFailedException( "Error checking exists for path " + path, failure)); }