import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
protected Shard(final ShardIdentifier name, final Map<String, String> peerAddresses,
final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
- super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig()));
+ super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig()),
+ DataStoreVersions.CURRENT_VERSION);
this.name = name.toString();
this.datastoreContext = datastoreContext;
} else if (message instanceof ForwardedReadyTransaction) {
commitCoordinator.handleForwardedReadyTransaction((ForwardedReadyTransaction) message,
getSender(), this);
+ } else if (message instanceof ReadyLocalTransaction) {
+ handleReadyLocalTransaction((ReadyLocalTransaction)message);
} else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
} else if (CommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
}
+ private void noLeaderError(Object message) {
+ // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make
+ // it more resilient in case we're in the process of electing a new leader.
+ getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
+ "Could not find the leader for shard %s. This typically happens" +
+ " when the system is coming up or recovering and a leader is being elected. Try again" +
+ " later.", persistenceId()))), getSelf());
+ }
+
private void handleBatchedModifications(BatchedModifications batched) {
// This message is sent to prepare the modificationsa transaction directly on the Shard as an
// optimization to avoid the extra overhead of a separate ShardTransaction actor. On the last
LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader);
leader.forward(batched, getContext());
} else {
- // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make
- // it more resilient in case we're in the process of electing a new leader.
- getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
- "Could not find the leader for shard %s. This typically happens" +
- " when the system is coming up or recovering and a leader is being elected. Try again" +
- " later.", persistenceId()))), getSelf());
+ noLeaderError(batched);
+ }
+ }
+ }
+
+ private void handleReadyLocalTransaction(final ReadyLocalTransaction message) {
+ if (isLeader()) {
+ try {
+ commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
+ } catch (Exception e) {
+ LOG.error("{}: Error handling LocalModifications for Tx {}", persistenceId(),
+ message.getTransactionID(), e);
+ getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+ }
+ } else {
+ ActorSelection leader = getLeader();
+ if (leader != null) {
+ LOG.debug("{}: Forwarding LocalModifications to leader {}", persistenceId(), leader);
+ leader.forward(message, getContext());
+ } else {
+ noLeaderError(message);
}
}
}
ShardTransactionIdentifier transactionId, String transactionChainId,
short clientVersion ) {
- return transactionActorFactory.newShardTransaction(TransactionProxy.TransactionType.fromInt(transactionType),
+ return transactionActorFactory.newShardTransaction(TransactionType.fromInt(transactionType),
transactionId, transactionChainId, clientVersion);
}