+ public void setQueueCapacity(int queueCapacity) {
+ this.queueCapacity = queueCapacity;
+ }
+
+ private ReadyTransactionReply readyTransactionReply(Shard shard) {
+ if(readyTransactionReply == null) {
+ readyTransactionReply = new ReadyTransactionReply(Serialization.serializedActorPath(shard.self()));
+ }
+
+ return readyTransactionReply;
+ }
+
+ private boolean queueCohortEntry(CohortEntry cohortEntry, ActorRef sender, Shard shard) {
+ if(queuedCohortEntries.size() < queueCapacity) {
+ queuedCohortEntries.offer(cohortEntry);
+ return true;
+ } else {
+ cohortCache.remove(cohortEntry.getTransactionID());
+
+ RuntimeException ex = new RuntimeException(
+ String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
+ " capacity %d has been reached.",
+ name, cohortEntry.getTransactionID(), queueCapacity));
+ log.error(ex.getMessage());
+ sender.tell(new Status.Failure(ex), shard.self());
+ return false;
+ }
+ }
+
+ /**
+ * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches
+ * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit.
+ */
+ public void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard) {
+ log.debug("{}: Readying transaction {}, client version {}", name,
+ ready.getTransactionID(), ready.getTxnClientVersion());
+
+ CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), ready.getCohort(),
+ (MutableCompositeModification) ready.getModification());
+ cohortCache.put(ready.getTransactionID(), cohortEntry);
+
+ if(!queueCohortEntry(cohortEntry, sender, shard)) {
+ return;
+ }
+
+ if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) {
+ // Return our actor path as we'll handle the three phase commit except if the Tx client
+ // version < Helium-1 version which means the Tx was initiated by a base Helium version node.
+ // In that case, the subsequent 3-phase commit messages won't contain the transactionId so to
+ // maintain backwards compatibility, we create a separate cohort actor to provide the compatible behavior.
+ ActorRef replyActorPath = shard.self();
+ if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
+ log.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", name);
+ replyActorPath = shard.getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
+ ready.getTransactionID()));
+ }
+
+ ReadyTransactionReply readyTransactionReply =
+ new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath),
+ ready.getTxnClientVersion());
+ sender.tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
+ readyTransactionReply, shard.self());
+ } else {
+ if(ready.isDoImmediateCommit()) {
+ cohortEntry.setDoImmediateCommit(true);
+ cohortEntry.setReplySender(sender);
+ cohortEntry.setShard(shard);
+ handleCanCommit(cohortEntry);
+ } else {
+ // The caller does not want immediate commit - the 3-phase commit will be coordinated by the
+ // front-end so send back a ReadyTransactionReply with our actor path.
+ sender.tell(readyTransactionReply(shard), shard.self());
+ }
+ }