X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=71afb5c9b7394215406926306f7555b034231b47;hb=879a3015b313694d8158e9fec151ce467f18a065;hp=62d3259a714059f497ca3e97ad08166a3a1106bf;hpb=ba3433c7bf94e551a4ea520c95165a9358bf9227;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 62d3259a71..71afb5c9b7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -46,14 +46,17 @@ import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; +import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; +import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.controller.cluster.datastore.utils.MessageTracker; +import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; import org.opendaylight.controller.cluster.raft.RaftActor; @@ -63,6 +66,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyn import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; @@ -80,6 +84,9 @@ public class Shard extends RaftActor { private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck"; + @VisibleForTesting + static final Object GET_SHARD_MBEAN_MESSAGE = "getShardMBeanMessage"; + @VisibleForTesting static final String DEFAULT_NAME = "default"; @@ -112,7 +119,8 @@ public class Shard extends RaftActor { protected Shard(final ShardIdentifier name, final Map peerAddresses, final DatastoreContext datastoreContext, final SchemaContext schemaContext) { - super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig())); + super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig()), + DataStoreVersions.CURRENT_VERSION); this.name = name.toString(); this.datastoreContext = datastoreContext; @@ -132,7 +140,7 @@ public class Shard extends RaftActor { } commitCoordinator = new ShardCommitCoordinator(store, - TimeUnit.SECONDS.convert(5, TimeUnit.MINUTES), + datastoreContext.getShardCommitQueueExpiryTimeoutInMillis(), datastoreContext.getShardTransactionCommitQueueCapacity(), self(), LOG, this.name); setTransactionCommitTimeout(); @@ -152,7 +160,7 @@ public class Shard extends RaftActor { private void setTransactionCommitTimeout() { transactionCommitTimeout = TimeUnit.MILLISECONDS.convert( - datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS); + datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS) / 2; } public static Props props(final ShardIdentifier name, @@ -225,6 +233,8 @@ public class Shard extends RaftActor { } else if (message instanceof ForwardedReadyTransaction) { commitCoordinator.handleForwardedReadyTransaction((ForwardedReadyTransaction) message, getSender(), this); + } else if (message instanceof ReadyLocalTransaction) { + handleReadyLocalTransaction((ReadyLocalTransaction)message); } else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) { handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message)); } else if (CommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) { @@ -249,9 +259,11 @@ public class Shard extends RaftActor { onDatastoreContext((DatastoreContext)message); } else if(message instanceof RegisterRoleChangeListener){ roleChangeNotifier.get().forward(message, context()); - } else if (message instanceof FollowerInitialSyncUpStatus){ + } else if (message instanceof FollowerInitialSyncUpStatus) { shardMBean.setFollowerInitialSyncStatus(((FollowerInitialSyncUpStatus) message).isInitialSyncDone()); context().parent().tell(message, self()); + } else if(GET_SHARD_MBEAN_MESSAGE.equals(message)){ + sender().tell(getShardMBean(), self()); } else { super.onReceiveCommand(message); } @@ -265,6 +277,12 @@ public class Shard extends RaftActor { return roleChangeNotifier; } + @Override + protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId) { + return new ShardLeaderStateChanged(memberId, leaderId, + isLeader() ? Optional.of(store.getDataTree()) : Optional.absent()); + } + private void onDatastoreContext(DatastoreContext context) { datastoreContext = context; @@ -284,14 +302,15 @@ public class Shard extends RaftActor { private void handleTransactionCommitTimeoutCheck() { CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry(); if(cohortEntry != null) { - long elapsed = System.currentTimeMillis() - cohortEntry.getLastAccessTime(); - if(elapsed > transactionCommitTimeout) { + if(cohortEntry.isExpired(transactionCommitTimeout)) { LOG.warn("{}: Current transaction {} has timed out after {} ms - aborting", persistenceId(), cohortEntry.getTransactionID(), transactionCommitTimeout); doAbortTransaction(cohortEntry.getTransactionID(), null); } } + + commitCoordinator.cleanupExpiredCohortEntries(); } private static boolean isEmptyCommit(final DataTreeCandidate candidate) { @@ -305,9 +324,9 @@ public class Shard extends RaftActor { // or if cohortEntry has no modifications // we can apply modification to the state immediately if ((!hasFollowers() && !persistence().isRecoveryApplicable()) || isEmptyCommit(candidate)) { - applyModificationToState(getSender(), cohortEntry.getTransactionID(), candidate); + applyModificationToState(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), candidate); } else { - Shard.this.persistData(getSender(), cohortEntry.getTransactionID(), + Shard.this.persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), DataTreeCandidatePayload.create(candidate)); } } @@ -384,8 +403,17 @@ public class Shard extends RaftActor { commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this); } + private void noLeaderError(Object message) { + // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make + // it more resilient in case we're in the process of electing a new leader. + getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format( + "Could not find the leader for shard %s. This typically happens" + + " when the system is coming up or recovering and a leader is being elected. Try again" + + " later.", persistenceId()))), getSelf()); + } + private void handleBatchedModifications(BatchedModifications batched) { - // This message is sent to prepare the modificationsa transaction directly on the Shard as an + // This message is sent to prepare the modifications transaction directly on the Shard as an // optimization to avoid the extra overhead of a separate ShardTransaction actor. On the last // BatchedModifications message, the caller sets the ready flag in the message indicating // modifications are complete. The reply contains the cohort actor path (this actor) for the caller @@ -414,12 +442,27 @@ public class Shard extends RaftActor { LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader); leader.forward(batched, getContext()); } else { - // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make - // it more resilient in case we're in the process of electing a new leader. - getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format( - "Could not find the leader for shard %s. This typically happens" + - " when the system is coming up or recovering and a leader is being elected. Try again" + - " later.", persistenceId()))), getSelf()); + noLeaderError(batched); + } + } + } + + private void handleReadyLocalTransaction(final ReadyLocalTransaction message) { + if (isLeader()) { + try { + commitCoordinator.handleReadyLocalTransaction(message, getSender(), this); + } catch (Exception e) { + LOG.error("{}: Error handling LocalModifications for Tx {}", persistenceId(), + message.getTransactionID(), e); + getSender().tell(new akka.actor.Status.Failure(e), getSelf()); + } + } else { + ActorSelection leader = getLeader(); + if (leader != null) { + LOG.debug("{}: Forwarding LocalModifications to leader {}", persistenceId(), leader); + leader.forward(message, getContext()); + } else { + noLeaderError(message); } } } @@ -484,7 +527,7 @@ public class Shard extends RaftActor { ShardTransactionIdentifier transactionId, String transactionChainId, short clientVersion ) { - return transactionActorFactory.newShardTransaction(TransactionProxy.TransactionType.fromInt(transactionType), + return transactionActorFactory.newShardTransaction(TransactionType.fromInt(transactionType), transactionId, transactionChainId, clientVersion); } @@ -557,6 +600,7 @@ public class Shard extends RaftActor { @Override protected void onRecoveryComplete() { + store.recoveryDone(); //notify shard manager getContext().parent().tell(new ActorInitialized(), getSelf());