X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardCommitCoordinator.java;h=33634b1d6c3e322642e9b458e05d59feb74a577e;hp=b3feadcfb90d1dcfe9fdb26b1587e8d5a8c88eac;hb=1d7e8fd9d781f630dee9dfb1b509067dd7fb9caa;hpb=a47dd7a5d21ca68804a6d0e2e3ca765f223c2ef4 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index b3feadcfb9..33634b1d6c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -97,16 +97,15 @@ final class ShardCommitCoordinator { * @param ready the ForwardedReadyTransaction message to process * @param sender the sender of the message * @param shard the transaction's shard actor - * @param schema */ void handleForwardedReadyTransaction(final ForwardedReadyTransaction ready, final ActorRef sender, final Shard shard) { log.debug("{}: Readying transaction {}, client version {}", name, - ready.getTransactionID(), ready.getTxnClientVersion()); + ready.getTransactionId(), ready.getTxnClientVersion()); final ShardDataTreeCohort cohort = ready.getTransaction().ready(); final CohortEntry cohortEntry = CohortEntry.createReady(cohort, ready.getTxnClientVersion()); - cohortCache.put(cohortEntry.getTransactionID(), cohortEntry); + cohortCache.put(cohortEntry.getTransactionId(), cohortEntry); if (ready.isDoImmediateCommit()) { cohortEntry.setDoImmediateCommit(true); @@ -130,28 +129,28 @@ final class ShardCommitCoordinator { * @param sender the sender of the message */ void handleBatchedModifications(final BatchedModifications batched, final ActorRef sender, final Shard shard) { - CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID()); + CohortEntry cohortEntry = cohortCache.get(batched.getTransactionId()); if (cohortEntry == null) { - cohortEntry = CohortEntry.createOpen(dataTree.newReadWriteTransaction(batched.getTransactionID()), + cohortEntry = CohortEntry.createOpen(dataTree.newReadWriteTransaction(batched.getTransactionId()), batched.getVersion()); - cohortCache.put(cohortEntry.getTransactionID(), cohortEntry); + cohortCache.put(cohortEntry.getTransactionId(), cohortEntry); } if (log.isDebugEnabled()) { log.debug("{}: Applying {} batched modifications for Tx {}", name, - batched.getModifications().size(), batched.getTransactionID()); + batched.getModifications().size(), batched.getTransactionId()); } cohortEntry.applyModifications(batched.getModifications()); if (batched.isReady()) { if (cohortEntry.getLastBatchedModificationsException() != null) { - cohortCache.remove(cohortEntry.getTransactionID()); + cohortCache.remove(cohortEntry.getTransactionId()); throw cohortEntry.getLastBatchedModificationsException(); } if (cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) { - cohortCache.remove(cohortEntry.getTransactionID()); + cohortCache.remove(cohortEntry.getTransactionId()); throw new IllegalStateException(String.format( "The total number of batched messages received %d does not match the number sent %d", cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent())); @@ -159,7 +158,7 @@ final class ShardCommitCoordinator { if (log.isDebugEnabled()) { log.debug("{}: Readying Tx {}, client version {}", name, - batched.getTransactionID(), batched.getVersion()); + batched.getTransactionId(), batched.getVersion()); } cohortEntry.setDoImmediateCommit(batched.isDoCommitOnReady()); @@ -187,13 +186,13 @@ final class ShardCommitCoordinator { * @param shard the transaction's shard actor */ void handleReadyLocalTransaction(final ReadyLocalTransaction message, final ActorRef sender, final Shard shard) { - final ShardDataTreeCohort cohort = dataTree.createReadyCohort(message.getTransactionID(), + final ShardDataTreeCohort cohort = dataTree.createReadyCohort(message.getTransactionId(), message.getModification()); final CohortEntry cohortEntry = CohortEntry.createReady(cohort, DataStoreVersions.CURRENT_VERSION); - cohortCache.put(cohortEntry.getTransactionID(), cohortEntry); + cohortCache.put(cohortEntry.getTransactionId(), cohortEntry); cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady()); - log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionID()); + log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionId()); if (message.isDoCommitOnReady()) { cohortEntry.setReplySender(sender); @@ -206,7 +205,7 @@ final class ShardCommitCoordinator { Collection createForwardedBatchedModifications(final BatchedModifications from, final int maxModificationsPerBatch) { - CohortEntry cohortEntry = cohortCache.remove(from.getTransactionID()); + CohortEntry cohortEntry = cohortCache.remove(from.getTransactionId()); if (cohortEntry == null || cohortEntry.getTransaction() == null) { return Collections.singletonList(from); } @@ -217,9 +216,9 @@ final class ShardCommitCoordinator { cohortEntry.getTransaction().getSnapshot().applyToCursor(new AbstractBatchedModificationsCursor() { @Override protected BatchedModifications getModifications() { - if (newModifications.isEmpty() || - newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) { - newModifications.add(new BatchedModifications(from.getTransactionID(), from.getVersion())); + if (newModifications.isEmpty() + || newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) { + newModifications.add(new BatchedModifications(from.getTransactionId(), from.getVersion())); } return newModifications.getLast(); @@ -237,7 +236,7 @@ final class ShardCommitCoordinator { cohortEntry.canCommit(new FutureCallback() { @Override public void onSuccess(final Void result) { - log.debug("{}: canCommit for {}: success", name, cohortEntry.getTransactionID()); + log.debug("{}: canCommit for {}: success", name, cohortEntry.getTransactionId()); if (cohortEntry.isDoImmediateCommit()) { doCommit(cohortEntry); @@ -249,12 +248,12 @@ final class ShardCommitCoordinator { } @Override - public void onFailure(final Throwable t) { + public void onFailure(final Throwable failure) { log.debug("{}: An exception occurred during canCommit for {}: {}", name, - cohortEntry.getTransactionID(), t); + cohortEntry.getTransactionId(), failure); - cohortCache.remove(cohortEntry.getTransactionID()); - cohortEntry.getReplySender().tell(new Failure(t), cohortEntry.getShard().self()); + cohortCache.remove(cohortEntry.getTransactionId()); + cohortEntry.getReplySender().tell(new Failure(failure), cohortEntry.getShard().self()); } }); } @@ -287,7 +286,7 @@ final class ShardCommitCoordinator { } private void doCommit(final CohortEntry cohortEntry) { - log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionID()); + log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionId()); // We perform the preCommit phase here atomically with the commit phase. This is an // optimization to eliminate the overhead of an extra preCommit message. We lose front-end @@ -300,38 +299,38 @@ final class ShardCommitCoordinator { } @Override - public void onFailure(final Throwable t) { + public void onFailure(final Throwable failure) { log.error("{} An exception occurred while preCommitting transaction {}", name, - cohortEntry.getTransactionID(), t); + cohortEntry.getTransactionId(), failure); - cohortCache.remove(cohortEntry.getTransactionID()); - cohortEntry.getReplySender().tell(new Failure(t), cohortEntry.getShard().self()); + cohortCache.remove(cohortEntry.getTransactionId()); + cohortEntry.getReplySender().tell(new Failure(failure), cohortEntry.getShard().self()); } }); } private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final CohortEntry cohortEntry) { - log.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID()); + log.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionId()); cohortEntry.commit(new FutureCallback() { @Override public void onSuccess(final UnsignedLong result) { - final TransactionIdentifier txId = cohortEntry.getTransactionID(); + final TransactionIdentifier txId = cohortEntry.getTransactionId(); log.debug("{}: Transaction {} committed as {}, sending response to {}", persistenceId(), txId, result, sender); - cohortCache.remove(cohortEntry.getTransactionID()); + cohortCache.remove(cohortEntry.getTransactionId()); sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), cohortEntry.getShard().self()); } @Override - public void onFailure(final Throwable t) { + public void onFailure(final Throwable failure) { log.error("{}, An exception occurred while committing transaction {}", persistenceId(), - cohortEntry.getTransactionID(), t); + cohortEntry.getTransactionId(), failure); - cohortCache.remove(cohortEntry.getTransactionID()); - sender.tell(new Failure(t), cohortEntry.getShard().self()); + cohortCache.remove(cohortEntry.getTransactionId()); + sender.tell(new Failure(failure), cohortEntry.getShard().self()); } }); } @@ -359,6 +358,7 @@ final class ShardCommitCoordinator { doCommit(cohortEntry); } + @SuppressWarnings("checkstyle:IllegalCatch") void handleAbort(final Identifier transactionID, final ActorRef sender, final Shard shard) { CohortEntry cohortEntry = cohortCache.remove(transactionID); if (cohortEntry == null) { @@ -368,28 +368,32 @@ final class ShardCommitCoordinator { log.debug("{}: Aborting transaction {}", name, transactionID); final ActorRef self = shard.getSelf(); - try { - cohortEntry.abort(); + cohortEntry.abort(new FutureCallback() { + @Override + public void onSuccess(final Void result) { + if (sender != null) { + sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self); + } + } - shard.getShardMBean().incrementAbortTransactionsCount(); + @Override + public void onFailure(final Throwable failure) { + log.error("{}: An exception happened during abort", name, failure); - if (sender != null) { - sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self); + if (sender != null) { + sender.tell(new Failure(failure), self); + } } - } catch (Exception e) { - log.error("{}: An exception happened during abort", name, e); + }); - if (sender != null) { - sender.tell(new Failure(e), self); - } - } + shard.getShardMBean().incrementAbortTransactionsCount(); } void checkForExpiredTransactions(final long timeout, final Shard shard) { Iterator iter = cohortCache.values().iterator(); while (iter.hasNext()) { CohortEntry cohortEntry = iter.next(); - if(cohortEntry.isFailed()) { + if (cohortEntry.isFailed()) { iter.remove(); } } @@ -434,7 +438,7 @@ final class ShardCommitCoordinator { } // Allocate a new message - final BatchedModifications ret = new BatchedModifications(cohortEntry.getTransactionID(), + final BatchedModifications ret = new BatchedModifications(cohortEntry.getTransactionId(), cohortEntry.getClientVersion()); newMessages.add(ret); return ret; @@ -454,12 +458,12 @@ final class ShardCommitCoordinator { switch (cohort.getState()) { case CAN_COMMIT_COMPLETE: case CAN_COMMIT_PENDING: - messages.add(new CanCommitTransaction(cohortEntry.getTransactionID(), + messages.add(new CanCommitTransaction(cohortEntry.getTransactionId(), cohortEntry.getClientVersion())); break; case PRE_COMMIT_COMPLETE: case PRE_COMMIT_PENDING: - messages.add(new CommitTransaction(cohortEntry.getTransactionID(), + messages.add(new CommitTransaction(cohortEntry.getTransactionId(), cohortEntry.getClientVersion())); break; default: