+ LOG.debug("Finishing commit for transaction {}", cohortEntry.getTransactionID());
+
+ try {
+ // We block on the future here so we don't have to worry about possibly accessing our
+ // state on a different thread outside of our dispatcher. Also, the data store
+ // currently uses a same thread executor anyway.
+ cohortEntry.getCohort().commit().get();
+
+ sender.tell(COMMIT_TRANSACTION_REPLY, getSelf());
+
+ shardMBean.incrementCommittedTransactionCount();
+ shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
+
+ } catch (InterruptedException | ExecutionException e) {
+ sender.tell(new akka.actor.Status.Failure(e), getSelf());
+
+ LOG.error(e, "An exception occurred while committing transaction {}", transactionID);
+ shardMBean.incrementFailedTransactionsCount();
+ }
+
+ commitCoordinator.currentTransactionComplete(transactionID, true);
+ }
+
+ private void handleCanCommitTransaction(CanCommitTransaction canCommit) {
+ LOG.debug("Can committing transaction {}", canCommit.getTransactionID());
+ commitCoordinator.handleCanCommit(canCommit, getSender(), self());
+ }
+
+ private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) {
+ LOG.debug("Readying transaction {}, client version {}", ready.getTransactionID(),
+ ready.getTxnClientVersion());
+
+ // This message is forwarded by the ShardTransaction on ready. We cache the cohort in the
+ // commitCoordinator in preparation for the subsequent three phase commit initiated by
+ // the front-end.
+ commitCoordinator.transactionReady(ready.getTransactionID(), ready.getCohort(),
+ ready.getModification());
+
+ // Return our actor path as we'll handle the three phase commit, except if the Tx client
+ // version < 1 (Helium-1 version). This means the Tx was initiated by a base Helium version
+ // node. In that case, the subsequent 3-phase commit messages won't contain the
+ // transactionId so to maintain backwards compatibility, we create a separate cohort actor
+ // to provide the compatible behavior.
+ ActorRef replyActorPath = self();
+ if(ready.getTxnClientVersion() < CreateTransaction.HELIUM_1_VERSION) {
+ LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort");
+ replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
+ ready.getTransactionID()));
+ }
+
+ ReadyTransactionReply readyTransactionReply = new ReadyTransactionReply(
+ Serialization.serializedActorPath(replyActorPath));
+ getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
+ readyTransactionReply, getSelf());
+ }
+
+ private void handleAbortTransaction(AbortTransaction abort) {
+ doAbortTransaction(abort.getTransactionID(), getSender());
+ }
+
+ private void doAbortTransaction(String transactionID, final ActorRef sender) {
+ final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
+ if(cohortEntry != null) {
+ LOG.debug("Aborting transaction {}", transactionID);
+
+ // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
+ // aborted during replication in which case we may still commit locally if replication
+ // succeeds.
+ commitCoordinator.currentTransactionComplete(transactionID, false);
+
+ final ListenableFuture<Void> future = cohortEntry.getCohort().abort();
+ final ActorRef self = getSelf();
+
+ Futures.addCallback(future, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(Void v) {
+ shardMBean.incrementAbortTransactionsCount();
+
+ if(sender != null) {
+ sender.tell(new AbortTransactionReply().toSerializable(), self);
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error(t, "An exception happened during abort");
+
+ if(sender != null) {
+ sender.tell(new akka.actor.Status.Failure(t), self);
+ }
+ }
+ });
+ }
+ }
+
+ private void handleCreateTransaction(Object message) {
+ if (isLeader()) {
+ createTransaction(CreateTransaction.fromSerializable(message));
+ } else if (getLeader() != null) {
+ getLeader().forward(message, getContext());
+ } else {
+ getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(
+ "Could not find shard leader so transaction cannot be created. This typically happens" +
+ " when the system is coming up or recovering and a leader is being elected. Try again" +
+ " later.")), getSelf());
+ }
+ }
+
+ private void handleReadDataReply(Object message) {
+ // This must be for install snapshot. Don't want to open this up and trigger
+ // deSerialization
+
+ self().tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)),
+ self());
+
+ createSnapshotTransaction = null;
+
+ // Send a PoisonPill instead of sending close transaction because we do not really need
+ // a response
+ getSender().tell(PoisonPill.getInstance(), self());
+ }
+
+ private void closeTransactionChain(CloseTransactionChain closeTransactionChain) {
+ DOMStoreTransactionChain chain =
+ transactionChains.remove(closeTransactionChain.getTransactionChainId());
+
+ if(chain != null) {
+ chain.close();
+ }
+ }
+
+ private ActorRef createTypedTransactionActor(int transactionType,
+ ShardTransactionIdentifier transactionId, String transactionChainId, int clientVersion ) {
+
+ DOMStoreTransactionFactory factory = store;
+
+ if(!transactionChainId.isEmpty()) {
+ factory = transactionChains.get(transactionChainId);
+ if(factory == null){
+ DOMStoreTransactionChain transactionChain = store.createTransactionChain();
+ transactionChains.put(transactionChainId, transactionChain);
+ factory = transactionChain;
+ }
+ }
+
+ if(this.schemaContext == null) {
+ throw new IllegalStateException("SchemaContext is not set");
+ }
+
+ if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {