new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher");
ShardDataChangeListenerPublisherActorProxy dataChangeListenerPublisher =
new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher");
new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher");
ShardDataChangeListenerPublisherActorProxy dataChangeListenerPublisher =
new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher");
store = new ShardDataTree(this, builder.getSchemaContext(), builder.getDataTree(),
treeChangeListenerPublisher, dataChangeListenerPublisher, name);
} else {
store = new ShardDataTree(this, builder.getSchemaContext(), builder.getDataTree(),
treeChangeListenerPublisher, dataChangeListenerPublisher, name);
} else {
commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this);
} else if (message instanceof DatastoreContext) {
onDatastoreContext((DatastoreContext)message);
commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this);
} else if (message instanceof DatastoreContext) {
onDatastoreContext((DatastoreContext)message);
roleChangeNotifier.get().forward(message, context());
} else if (message instanceof FollowerInitialSyncUpStatus) {
shardMBean.setFollowerInitialSyncStatus(((FollowerInitialSyncUpStatus) message).isInitialSyncDone());
context().parent().tell(message, self());
roleChangeNotifier.get().forward(message, context());
} else if (message instanceof FollowerInitialSyncUpStatus) {
shardMBean.setFollowerInitialSyncStatus(((FollowerInitialSyncUpStatus) message).isInitialSyncDone());
context().parent().tell(message, self());
sender().tell(getShardMBean(), self());
} else if (message instanceof GetShardDataTree) {
sender().tell(store.getDataTree(), self());
sender().tell(getShardMBean(), self());
} else if (message instanceof GetShardDataTree) {
sender().tell(store.getDataTree(), self());
context().parent().forward(message, context());
} else if (ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) {
messageRetrySupport.onTimerMessage(message);
context().parent().forward(message, context());
} else if (ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) {
messageRetrySupport.onTimerMessage(message);
- protected LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId, final short leaderPayloadVersion) {
+ protected LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId,
+ final short leaderPayloadVersion) {
return isLeader() ? new ShardLeaderStateChanged(memberId, leaderId, store.getDataTree(), leaderPayloadVersion)
: new ShardLeaderStateChanged(memberId, leaderId, leaderPayloadVersion);
}
return isLeader() ? new ShardLeaderStateChanged(memberId, leaderId, store.getDataTree(), leaderPayloadVersion)
: new ShardLeaderStateChanged(memberId, leaderId, leaderPayloadVersion);
}
protected void handleBatchedModificationsLocal(final BatchedModifications batched, final ActorRef sender) {
try {
commitCoordinator.handleBatchedModifications(batched, sender, this);
protected void handleBatchedModificationsLocal(final BatchedModifications batched, final ActorRef sender) {
try {
commitCoordinator.handleBatchedModifications(batched, sender, this);
// we need to reconstruct previous BatchedModifications from the transaction
// DataTreeModification, honoring the max batched modification count, and forward all the
// previous BatchedModifications to the new leader.
// we need to reconstruct previous BatchedModifications from the transaction
// DataTreeModification, honoring the max batched modification count, and forward all the
// previous BatchedModifications to the new leader.
- Collection<BatchedModifications> newModifications = commitCoordinator.createForwardedBatchedModifications(
- batched, datastoreContext.getShardBatchedModificationCount());
+ Collection<BatchedModifications> newModifications = commitCoordinator
+ .createForwardedBatchedModifications(batched,
+ datastoreContext.getShardBatchedModificationCount());
LOG.debug("{}: Forwarding {} BatchedModifications to leader {}", persistenceId(),
newModifications.size(), leader);
LOG.debug("{}: Forwarding {} BatchedModifications to leader {}", persistenceId(),
newModifications.size(), leader);
private boolean failIfIsolatedLeader(final ActorRef sender) {
if (isIsolatedLeader()) {
sender.tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
private boolean failIfIsolatedLeader(final ActorRef sender) {
if (isIsolatedLeader()) {
sender.tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
- "Shard %s was the leader but has lost contact with all of its followers. Either all" +
- " other follower nodes are down or this node is isolated by a network partition.",
+ "Shard %s was the leader but has lost contact with all of its followers. Either all"
+ + " other follower nodes are down or this node is isolated by a network partition.",
private void handleReadyLocalTransaction(final ReadyLocalTransaction message) {
LOG.debug("{}: handleReadyLocalTransaction for {}", persistenceId(), message.getTransactionID());
private void handleReadyLocalTransaction(final ReadyLocalTransaction message) {
LOG.debug("{}: handleReadyLocalTransaction for {}", persistenceId(), message.getTransactionID());
- if (TransactionType.fromInt(createTransaction.getTransactionType()) != TransactionType.READ_ONLY &&
- failIfIsolatedLeader(getSender())) {
+ if (TransactionType.fromInt(createTransaction.getTransactionType()) != TransactionType.READ_ONLY
+ && failIfIsolatedLeader(getSender())) {
- commitCoordinator.abortPendingTransactions(
- "The transacton was aborted due to inflight leadership change and the leader address isn't available.",
- this);
+ commitCoordinator.abortPendingTransactions("The transacton was aborted due to inflight leadership "
+ + "change and the leader address isn't available.", this);
private final Class<S> shardClass;
private ShardIdentifier id;
private Map<String, String> peerAddresses = Collections.emptyMap();
private final Class<S> shardClass;
private ShardIdentifier id;
private Map<String, String> peerAddresses = Collections.emptyMap();
- case CONFIGURATION:
- return TreeType.CONFIGURATION;
- case OPERATIONAL:
- return TreeType.OPERATIONAL;
+ case CONFIGURATION:
+ return TreeType.CONFIGURATION;
+ case OPERATIONAL:
+ return TreeType.OPERATIONAL;
+ default:
+ throw new IllegalStateException("Unhandled logical store type "
+ + datastoreContext.getLogicalStoreType());