X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardCommitCoordinator.java;h=2773a3e3bfd21ad4559bb6baf05c1d48661cfe9c;hb=057b787289f7b909d7013c22ac73a1c91c860af8;hp=b3feadcfb90d1dcfe9fdb26b1587e8d5a8c88eac;hpb=a47dd7a5d21ca68804a6d0e2e3ca765f223c2ef4;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index b3feadcfb9..2773a3e3bf 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -23,6 +23,8 @@ import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; @@ -97,16 +99,15 @@ final class ShardCommitCoordinator { * @param ready the ForwardedReadyTransaction message to process * @param sender the sender of the message * @param shard the transaction's shard actor - * @param schema */ void handleForwardedReadyTransaction(final ForwardedReadyTransaction ready, final ActorRef sender, final Shard shard) { log.debug("{}: Readying transaction {}, client version {}", name, - ready.getTransactionID(), ready.getTxnClientVersion()); + ready.getTransactionId(), ready.getTxnClientVersion()); final ShardDataTreeCohort cohort = ready.getTransaction().ready(); final CohortEntry cohortEntry = CohortEntry.createReady(cohort, ready.getTxnClientVersion()); - cohortCache.put(cohortEntry.getTransactionID(), cohortEntry); + cohortCache.put(cohortEntry.getTransactionId(), cohortEntry); if (ready.isDoImmediateCommit()) { cohortEntry.setDoImmediateCommit(true); @@ -130,28 +131,28 @@ final class ShardCommitCoordinator { * @param sender the sender of the message */ void handleBatchedModifications(final BatchedModifications batched, final ActorRef sender, final Shard shard) { - CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID()); + CohortEntry cohortEntry = cohortCache.get(batched.getTransactionId()); if (cohortEntry == null) { - cohortEntry = CohortEntry.createOpen(dataTree.newReadWriteTransaction(batched.getTransactionID()), + cohortEntry = CohortEntry.createOpen(dataTree.newReadWriteTransaction(batched.getTransactionId()), batched.getVersion()); - cohortCache.put(cohortEntry.getTransactionID(), cohortEntry); + cohortCache.put(cohortEntry.getTransactionId(), cohortEntry); } if (log.isDebugEnabled()) { log.debug("{}: Applying {} batched modifications for Tx {}", name, - batched.getModifications().size(), batched.getTransactionID()); + batched.getModifications().size(), batched.getTransactionId()); } cohortEntry.applyModifications(batched.getModifications()); if (batched.isReady()) { if (cohortEntry.getLastBatchedModificationsException() != null) { - cohortCache.remove(cohortEntry.getTransactionID()); + cohortCache.remove(cohortEntry.getTransactionId()); throw cohortEntry.getLastBatchedModificationsException(); } if (cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) { - cohortCache.remove(cohortEntry.getTransactionID()); + cohortCache.remove(cohortEntry.getTransactionId()); throw new IllegalStateException(String.format( "The total number of batched messages received %d does not match the number sent %d", cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent())); @@ -159,7 +160,7 @@ final class ShardCommitCoordinator { if (log.isDebugEnabled()) { log.debug("{}: Readying Tx {}, client version {}", name, - batched.getTransactionID(), batched.getVersion()); + batched.getTransactionId(), batched.getVersion()); } cohortEntry.setDoImmediateCommit(batched.isDoCommitOnReady()); @@ -187,13 +188,13 @@ final class ShardCommitCoordinator { * @param shard the transaction's shard actor */ void handleReadyLocalTransaction(final ReadyLocalTransaction message, final ActorRef sender, final Shard shard) { - final ShardDataTreeCohort cohort = dataTree.createReadyCohort(message.getTransactionID(), + final ShardDataTreeCohort cohort = dataTree.createReadyCohort(message.getTransactionId(), message.getModification()); final CohortEntry cohortEntry = CohortEntry.createReady(cohort, DataStoreVersions.CURRENT_VERSION); - cohortCache.put(cohortEntry.getTransactionID(), cohortEntry); + cohortCache.put(cohortEntry.getTransactionId(), cohortEntry); cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady()); - log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionID()); + log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionId()); if (message.isDoCommitOnReady()) { cohortEntry.setReplySender(sender); @@ -206,7 +207,7 @@ final class ShardCommitCoordinator { Collection createForwardedBatchedModifications(final BatchedModifications from, final int maxModificationsPerBatch) { - CohortEntry cohortEntry = cohortCache.remove(from.getTransactionID()); + CohortEntry cohortEntry = cohortCache.remove(from.getTransactionId()); if (cohortEntry == null || cohortEntry.getTransaction() == null) { return Collections.singletonList(from); } @@ -217,9 +218,9 @@ final class ShardCommitCoordinator { cohortEntry.getTransaction().getSnapshot().applyToCursor(new AbstractBatchedModificationsCursor() { @Override protected BatchedModifications getModifications() { - if (newModifications.isEmpty() || - newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) { - newModifications.add(new BatchedModifications(from.getTransactionID(), from.getVersion())); + if (newModifications.isEmpty() + || newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) { + newModifications.add(new BatchedModifications(from.getTransactionId(), from.getVersion())); } return newModifications.getLast(); @@ -237,7 +238,7 @@ final class ShardCommitCoordinator { cohortEntry.canCommit(new FutureCallback() { @Override public void onSuccess(final Void result) { - log.debug("{}: canCommit for {}: success", name, cohortEntry.getTransactionID()); + log.debug("{}: canCommit for {}: success", name, cohortEntry.getTransactionId()); if (cohortEntry.isDoImmediateCommit()) { doCommit(cohortEntry); @@ -249,12 +250,12 @@ final class ShardCommitCoordinator { } @Override - public void onFailure(final Throwable t) { + public void onFailure(final Throwable failure) { log.debug("{}: An exception occurred during canCommit for {}: {}", name, - cohortEntry.getTransactionID(), t); + cohortEntry.getTransactionId(), failure); - cohortCache.remove(cohortEntry.getTransactionID()); - cohortEntry.getReplySender().tell(new Failure(t), cohortEntry.getShard().self()); + cohortCache.remove(cohortEntry.getTransactionId()); + cohortEntry.getReplySender().tell(new Failure(failure), cohortEntry.getShard().self()); } }); } @@ -287,7 +288,7 @@ final class ShardCommitCoordinator { } private void doCommit(final CohortEntry cohortEntry) { - log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionID()); + log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionId()); // We perform the preCommit phase here atomically with the commit phase. This is an // optimization to eliminate the overhead of an extra preCommit message. We lose front-end @@ -300,38 +301,38 @@ final class ShardCommitCoordinator { } @Override - public void onFailure(final Throwable t) { + public void onFailure(final Throwable failure) { log.error("{} An exception occurred while preCommitting transaction {}", name, - cohortEntry.getTransactionID(), t); + cohortEntry.getTransactionId(), failure); - cohortCache.remove(cohortEntry.getTransactionID()); - cohortEntry.getReplySender().tell(new Failure(t), cohortEntry.getShard().self()); + cohortCache.remove(cohortEntry.getTransactionId()); + cohortEntry.getReplySender().tell(new Failure(failure), cohortEntry.getShard().self()); } }); } private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final CohortEntry cohortEntry) { - log.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID()); + log.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionId()); cohortEntry.commit(new FutureCallback() { @Override public void onSuccess(final UnsignedLong result) { - final TransactionIdentifier txId = cohortEntry.getTransactionID(); + final TransactionIdentifier txId = cohortEntry.getTransactionId(); log.debug("{}: Transaction {} committed as {}, sending response to {}", persistenceId(), txId, result, sender); - cohortCache.remove(cohortEntry.getTransactionID()); + cohortCache.remove(cohortEntry.getTransactionId()); sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), cohortEntry.getShard().self()); } @Override - public void onFailure(final Throwable t) { + public void onFailure(final Throwable failure) { log.error("{}, An exception occurred while committing transaction {}", persistenceId(), - cohortEntry.getTransactionID(), t); + cohortEntry.getTransactionId(), failure); - cohortCache.remove(cohortEntry.getTransactionID()); - sender.tell(new Failure(t), cohortEntry.getShard().self()); + cohortCache.remove(cohortEntry.getTransactionId()); + sender.tell(new Failure(failure), cohortEntry.getShard().self()); } }); } @@ -359,6 +360,7 @@ final class ShardCommitCoordinator { doCommit(cohortEntry); } + @SuppressWarnings("checkstyle:IllegalCatch") void handleAbort(final Identifier transactionID, final ActorRef sender, final Shard shard) { CohortEntry cohortEntry = cohortCache.remove(transactionID); if (cohortEntry == null) { @@ -376,7 +378,7 @@ final class ShardCommitCoordinator { if (sender != null) { sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self); } - } catch (Exception e) { + } catch (InterruptedException | ExecutionException | TimeoutException e) { log.error("{}: An exception happened during abort", name, e); if (sender != null) { @@ -389,7 +391,7 @@ final class ShardCommitCoordinator { Iterator iter = cohortCache.values().iterator(); while (iter.hasNext()) { CohortEntry cohortEntry = iter.next(); - if(cohortEntry.isFailed()) { + if (cohortEntry.isFailed()) { iter.remove(); } } @@ -434,7 +436,7 @@ final class ShardCommitCoordinator { } // Allocate a new message - final BatchedModifications ret = new BatchedModifications(cohortEntry.getTransactionID(), + final BatchedModifications ret = new BatchedModifications(cohortEntry.getTransactionId(), cohortEntry.getClientVersion()); newMessages.add(ret); return ret; @@ -454,12 +456,12 @@ final class ShardCommitCoordinator { switch (cohort.getState()) { case CAN_COMMIT_COMPLETE: case CAN_COMMIT_PENDING: - messages.add(new CanCommitTransaction(cohortEntry.getTransactionID(), + messages.add(new CanCommitTransaction(cohortEntry.getTransactionId(), cohortEntry.getClientVersion())); break; case PRE_COMMIT_COMPLETE: case PRE_COMMIT_PENDING: - messages.add(new CommitTransaction(cohortEntry.getTransactionID(), + messages.add(new CommitTransaction(cohortEntry.getTransactionId(), cohortEntry.getClientVersion())); break; default: