X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=14a20da247f5138b17e3c30e437bf3c661e5ec25;hb=655216a6c75aa29d31c4c56c56a5000db56ba233;hp=e62e918e5e7eff53b8f7e00dbe1e8300d270b59f;hpb=fdddb482b07c3ee2f3ca853d09ee9a6ecdd7eb2a;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index e62e918e5e..14a20da247 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -46,6 +46,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; +import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; @@ -115,7 +116,8 @@ public class Shard extends RaftActor { protected Shard(final ShardIdentifier name, final Map peerAddresses, final DatastoreContext datastoreContext, final SchemaContext schemaContext) { - super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig())); + super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig()), + DataStoreVersions.CURRENT_VERSION); this.name = name.toString(); this.datastoreContext = datastoreContext; @@ -228,6 +230,8 @@ public class Shard extends RaftActor { } else if (message instanceof ForwardedReadyTransaction) { commitCoordinator.handleForwardedReadyTransaction((ForwardedReadyTransaction) message, getSender(), this); + } else if (message instanceof ReadyLocalTransaction) { + handleReadyLocalTransaction((ReadyLocalTransaction)message); } else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) { handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message)); } else if (CommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) { @@ -393,6 +397,15 @@ public class Shard extends RaftActor { commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this); } + private void noLeaderError(Object message) { + // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make + // it more resilient in case we're in the process of electing a new leader. + getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format( + "Could not find the leader for shard %s. This typically happens" + + " when the system is coming up or recovering and a leader is being elected. Try again" + + " later.", persistenceId()))), getSelf()); + } + private void handleBatchedModifications(BatchedModifications batched) { // This message is sent to prepare the modificationsa transaction directly on the Shard as an // optimization to avoid the extra overhead of a separate ShardTransaction actor. On the last @@ -423,12 +436,27 @@ public class Shard extends RaftActor { LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader); leader.forward(batched, getContext()); } else { - // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make - // it more resilient in case we're in the process of electing a new leader. - getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format( - "Could not find the leader for shard %s. This typically happens" + - " when the system is coming up or recovering and a leader is being elected. Try again" + - " later.", persistenceId()))), getSelf()); + noLeaderError(batched); + } + } + } + + private void handleReadyLocalTransaction(final ReadyLocalTransaction message) { + if (isLeader()) { + try { + commitCoordinator.handleReadyLocalTransaction(message, getSender(), this); + } catch (Exception e) { + LOG.error("{}: Error handling LocalModifications for Tx {}", persistenceId(), + message.getTransactionID(), e); + getSender().tell(new akka.actor.Status.Failure(e), getSelf()); + } + } else { + ActorSelection leader = getLeader(); + if (leader != null) { + LOG.debug("{}: Forwarding LocalModifications to leader {}", persistenceId(), leader); + leader.forward(message, getContext()); + } else { + noLeaderError(message); } } } @@ -493,7 +521,7 @@ public class Shard extends RaftActor { ShardTransactionIdentifier transactionId, String transactionChainId, short clientVersion ) { - return transactionActorFactory.newShardTransaction(TransactionProxy.TransactionType.fromInt(transactionType), + return transactionActorFactory.newShardTransaction(TransactionType.fromInt(transactionType), transactionId, transactionChainId, clientVersion); }