+
+ // Throwing an exception here will fail the Future.
+ throw new IllegalArgumentException(String.format("%s: Invalid reply type %s",
+ getIdentifier(), serializedReadyReply.getClass()));
+ }
+ }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
+ }
+
+ protected String extractCohortPathFrom(ReadyTransactionReply readyTxReply) {
+ return readyTxReply.getCohortPath();
+ }
+
+ private BatchedModifications newBatchedModifications() {
+ return new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion, transactionChainId);
+ }
+
+ private void batchModification(Modification modification) {
+ if(batchedModifications == null) {
+ batchedModifications = newBatchedModifications();
+ }
+
+ batchedModifications.addModification(modification);
+
+ if(batchedModifications.getModifications().size() >=
+ actorContext.getDatastoreContext().getShardBatchedModificationCount()) {
+ sendBatchedModifications();
+ }
+ }
+
+ protected Future<Object> sendBatchedModifications() {
+ return sendBatchedModifications(false, false);
+ }
+
+ protected Future<Object> sendBatchedModifications(boolean ready, boolean doCommitOnReady) {
+ Future<Object> sent = null;
+ if(ready || (batchedModifications != null && !batchedModifications.getModifications().isEmpty())) {
+ if(batchedModifications == null) {
+ batchedModifications = newBatchedModifications();
+ }
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} sending {} batched modifications, ready: {}", getIdentifier(),
+ batchedModifications.getModifications().size(), ready);
+ }
+
+ batchedModifications.setReady(ready);
+ batchedModifications.setDoCommitOnReady(doCommitOnReady);
+ batchedModifications.setTotalMessagesSent(++totalBatchedModificationsSent);
+ sent = executeOperationAsync(batchedModifications);
+
+ if(ready) {
+ batchedModifications = null;
+ } else {
+ batchedModifications = newBatchedModifications();