private final List<DelayedListenerRegistration> delayedListenerRegistrations =
Lists.newArrayList();
- private final DatastoreContext datastoreContext;
+ private DatastoreContext datastoreContext;
- private final DataPersistenceProvider dataPersistenceProvider;
+ private DataPersistenceProvider dataPersistenceProvider;
private SchemaContext schemaContext;
private final ShardCommitCoordinator commitCoordinator;
- private final long transactionCommitTimeout;
+ private long transactionCommitTimeout;
private Cancellable txCommitTimeoutCheckSchedule;
commitCoordinator = new ShardCommitCoordinator(TimeUnit.SECONDS.convert(1, TimeUnit.MINUTES),
datastoreContext.getShardTransactionCommitQueueCapacity(), LOG, name.toString());
- transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
- datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
+ setTransactionCommitTimeout();
// create a notifier actor for each cluster member
roleChangeNotifier = createRoleChangeNotifier(name.toString());
getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
}
+ private void setTransactionCommitTimeout() {
+ transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
+ datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
+ }
+
private static Map<String, String> mapPeerAddresses(
final Map<ShardIdentifier, String> peerAddresses) {
Map<String, String> map = new HashMap<>();
@Override
public void postStop() {
+ LOG.info("Stopping Shard {}", persistenceId());
+
super.postStop();
if(txCommitTimeoutCheckSchedule != null) {
txCommitTimeoutCheckSchedule.cancel();
}
+
+ shardMBean.unregisterMBean();
}
@Override
resolved.getPeerAddress());
} else if (message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
handleTransactionCommitTimeoutCheck();
+ } else if(message instanceof DatastoreContext) {
+ onDatastoreContext((DatastoreContext)message);
} else {
super.onReceiveCommand(message);
}
return roleChangeNotifier;
}
+ private void onDatastoreContext(DatastoreContext context) {
+ datastoreContext = context;
+
+ commitCoordinator.setQueueCapacity(datastoreContext.getShardTransactionCommitQueueCapacity());
+
+ setTransactionCommitTimeout();
+
+ if(datastoreContext.isPersistent() &&
+ dataPersistenceProvider instanceof NonPersistentRaftDataProvider) {
+ dataPersistenceProvider = new PersistentDataProvider();
+ } else if(!datastoreContext.isPersistent() &&
+ dataPersistenceProvider instanceof PersistentDataProvider) {
+ dataPersistenceProvider = new NonPersistentRaftDataProvider();
+ }
+
+ updateConfigParams(datastoreContext.getShardRaftConfig());
+ }
+
private void handleTransactionCommitTimeoutCheck() {
CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry();
if(cohortEntry != null) {