private ActorRef replySender;
private Shard shard;
- CohortEntry(TransactionIdentifier transactionID, ReadWriteShardDataTreeTransaction transaction,
+ private CohortEntry(TransactionIdentifier transactionID, ReadWriteShardDataTreeTransaction transaction,
DataTreeCohortActorRegistry cohortRegistry, SchemaContext schema, short clientVersion) {
this.transaction = Preconditions.checkNotNull(transaction);
this.transactionID = Preconditions.checkNotNull(transactionID);
this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT);
}
- CohortEntry(TransactionIdentifier transactionID, ShardDataTreeCohort cohort, DataTreeCohortActorRegistry cohortRegistry,
+ private CohortEntry(TransactionIdentifier transactionID, ShardDataTreeCohort cohort, DataTreeCohortActorRegistry cohortRegistry,
SchemaContext schema, short clientVersion) {
this.transactionID = Preconditions.checkNotNull(transactionID);
this.cohort = cohort;
this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT);
}
+ static CohortEntry createOpen(TransactionIdentifier transactionID, ReadWriteShardDataTreeTransaction transaction,
+ DataTreeCohortActorRegistry cohortRegistry, SchemaContext schema, short clientVersion) {
+ return new CohortEntry(transactionID, transaction, cohortRegistry, schema, clientVersion);
+ }
+
+ static CohortEntry createReady(TransactionIdentifier transactionID, ShardDataTreeCohort cohort,
+ DataTreeCohortActorRegistry cohortRegistry, SchemaContext schema, short clientVersion) {
+ return new CohortEntry(transactionID, cohort, cohortRegistry, schema, clientVersion);
+ }
+
void updateLastAccessTime() {
lastAccessTimer.reset();
lastAccessTimer.start();
ready.getTransactionID(), ready.getTxnClientVersion());
final ShardDataTreeCohort cohort = ready.getTransaction().ready();
- final CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort, cohortRegistry, schema, ready.getTxnClientVersion());
+ final CohortEntry cohortEntry = CohortEntry.createReady(ready.getTransactionID(), cohort, cohortRegistry,
+ schema, ready.getTxnClientVersion());
cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
if(!queueCohortEntry(cohortEntry, sender, shard)) {
void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard) {
CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
if(cohortEntry == null) {
- cohortEntry = new CohortEntry(batched.getTransactionID(),
+ cohortEntry = CohortEntry.createOpen(batched.getTransactionID(),
dataTree.newReadWriteTransaction(batched.getTransactionID()),
cohortRegistry, dataTree.getSchemaContext(), batched.getVersion());
cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(),
message.getTransactionID());
- final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort, cohortRegistry,
+ final CohortEntry cohortEntry = CohortEntry.createReady(message.getTransactionID(), cohort, cohortRegistry,
dataTree.getSchemaContext(), DataStoreVersions.CURRENT_VERSION);
cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());