+ throw new IllegalArgumentException(
+ "Shard="+name + ":CreateTransaction message has unidentified transaction type="
+ + transactionType);
+ }
+ }
+
+ private ActorRef createShardTransaction(DOMStoreTransaction transaction, ShardTransactionIdentifier transactionId,
+ short clientVersion){
+ return getContext().actorOf(
+ ShardTransaction.props(transaction, getSelf(),
+ schemaContext, datastoreContext, shardMBean,
+ transactionId.getRemoteTransactionId(), clientVersion)
+ .withDispatcher(txnDispatcherPath),
+ transactionId.toString());
+
+ }
+
+ private void createTransaction(CreateTransaction createTransaction) {
+ try {
+ ActorRef transactionActor = createTransaction(createTransaction.getTransactionType(),
+ createTransaction.getTransactionId(), createTransaction.getTransactionChainId(),
+ createTransaction.getVersion());
+
+ getSender().tell(new CreateTransactionReply(Serialization.serializedActorPath(transactionActor),
+ createTransaction.getTransactionId()).toSerializable(), getSelf());
+ } catch (Exception e) {
+ getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+ }
+ }
+
+ private ActorRef createTransaction(int transactionType, String remoteTransactionId,
+ String transactionChainId, short clientVersion) {
+
+ ShardTransactionIdentifier transactionId =
+ ShardTransactionIdentifier.builder()
+ .remoteTransactionId(remoteTransactionId)
+ .build();
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: Creating transaction : {} ", persistenceId(), transactionId);
+ }
+
+ ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId,
+ transactionChainId, clientVersion);
+
+ return transactionActor;
+ }
+
+ private void syncCommitTransaction(final DOMStoreWriteTransaction transaction)
+ throws ExecutionException, InterruptedException {
+ DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
+ commitCohort.preCommit().get();
+ commitCohort.commit().get();
+ }
+
+ private void commitWithNewTransaction(final Modification modification) {
+ DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction();
+ modification.apply(tx);
+ try {
+ syncCommitTransaction(tx);
+ shardMBean.incrementCommittedTransactionCount();
+ shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
+ } catch (InterruptedException | ExecutionException e) {
+ shardMBean.incrementFailedTransactionsCount();
+ LOG.error("{}: Failed to commit", persistenceId(), e);