+ }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
+ }
+
+ protected String deserializeCohortPath(String cohortPath) {
+ return cohortPath;
+ }
+
+ private void batchModification(Modification modification) {
+ if(batchedModifications == null) {
+ batchedModifications = new BatchedModifications(identifier.toString(), remoteTransactionVersion,
+ transactionChainId);
+ }
+
+ batchedModifications.addModification(modification);
+
+ if(batchedModifications.getModifications().size() >=
+ actorContext.getDatastoreContext().getShardBatchedModificationCount()) {
+ sendAndRecordBatchedModifications();
+ }
+ }
+
+ private void sendAndRecordBatchedModifications() {
+ Future<Object> sentFuture = sendBatchedModifications();
+ if(sentFuture != null) {
+ recordedOperationFutures.add(sentFuture);
+ }
+ }
+
+ protected Future<Object> sendBatchedModifications() {
+ return sendBatchedModifications(false);
+ }
+
+ protected Future<Object> sendBatchedModifications(boolean ready) {
+ Future<Object> sent = null;
+ if(batchedModifications != null) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} sending {} batched modifications, ready: {}", identifier,
+ batchedModifications.getModifications().size(), ready);
+ }
+
+ batchedModifications.setReady(ready);
+ sent = executeOperationAsync(batchedModifications);
+
+ batchedModifications = new BatchedModifications(identifier.toString(), remoteTransactionVersion,
+ transactionChainId);
+ }
+
+ return sent;