X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=0b4abe98a10a9ecac5be58a036f0653bcece5e8c;hb=4379f102fa0c85abf58f60d81fec9c698582fb1a;hp=14a20da247f5138b17e3c30e437bf3c661e5ec25;hpb=f12d62d2dc28a883c1f1b38df7d72a9142c2abfb;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 14a20da247..0b4abe98a1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -84,6 +84,9 @@ public class Shard extends RaftActor { private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck"; + @VisibleForTesting + static final Object GET_SHARD_MBEAN_MESSAGE = "getShardMBeanMessage"; + @VisibleForTesting static final String DEFAULT_NAME = "default"; @@ -137,7 +140,7 @@ public class Shard extends RaftActor { } commitCoordinator = new ShardCommitCoordinator(store, - TimeUnit.SECONDS.convert(5, TimeUnit.MINUTES), + datastoreContext.getShardCommitQueueExpiryTimeoutInMillis(), datastoreContext.getShardTransactionCommitQueueCapacity(), self(), LOG, this.name); setTransactionCommitTimeout(); @@ -157,7 +160,7 @@ public class Shard extends RaftActor { private void setTransactionCommitTimeout() { transactionCommitTimeout = TimeUnit.MILLISECONDS.convert( - datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS); + datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS) / 2; } public static Props props(final ShardIdentifier name, @@ -256,9 +259,11 @@ public class Shard extends RaftActor { onDatastoreContext((DatastoreContext)message); } else if(message instanceof RegisterRoleChangeListener){ roleChangeNotifier.get().forward(message, context()); - } else if (message instanceof FollowerInitialSyncUpStatus){ + } else if (message instanceof FollowerInitialSyncUpStatus) { shardMBean.setFollowerInitialSyncStatus(((FollowerInitialSyncUpStatus) message).isInitialSyncDone()); context().parent().tell(message, self()); + } else if(GET_SHARD_MBEAN_MESSAGE.equals(message)){ + sender().tell(getShardMBean(), self()); } else { super.onReceiveCommand(message); } @@ -273,9 +278,10 @@ public class Shard extends RaftActor { } @Override - protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId) { + protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId, short leaderPayloadVersion) { return new ShardLeaderStateChanged(memberId, leaderId, - isLeader() ? Optional.of(store.getDataTree()) : Optional.absent()); + isLeader() ? Optional.of(store.getDataTree()) : Optional.absent(), + leaderPayloadVersion); } private void onDatastoreContext(DatastoreContext context) { @@ -297,14 +303,15 @@ public class Shard extends RaftActor { private void handleTransactionCommitTimeoutCheck() { CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry(); if(cohortEntry != null) { - long elapsed = System.currentTimeMillis() - cohortEntry.getLastAccessTime(); - if(elapsed > transactionCommitTimeout) { + if(cohortEntry.isExpired(transactionCommitTimeout)) { LOG.warn("{}: Current transaction {} has timed out after {} ms - aborting", persistenceId(), cohortEntry.getTransactionID(), transactionCommitTimeout); doAbortTransaction(cohortEntry.getTransactionID(), null); } } + + commitCoordinator.cleanupExpiredCohortEntries(); } private static boolean isEmptyCommit(final DataTreeCandidate candidate) { @@ -318,9 +325,9 @@ public class Shard extends RaftActor { // or if cohortEntry has no modifications // we can apply modification to the state immediately if ((!hasFollowers() && !persistence().isRecoveryApplicable()) || isEmptyCommit(candidate)) { - applyModificationToState(getSender(), cohortEntry.getTransactionID(), candidate); + applyModificationToState(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), candidate); } else { - Shard.this.persistData(getSender(), cohortEntry.getTransactionID(), + Shard.this.persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), DataTreeCandidatePayload.create(candidate)); } } @@ -407,7 +414,7 @@ public class Shard extends RaftActor { } private void handleBatchedModifications(BatchedModifications batched) { - // This message is sent to prepare the modificationsa transaction directly on the Shard as an + // This message is sent to prepare the modifications transaction directly on the Shard as an // optimization to avoid the extra overhead of a separate ShardTransaction actor. On the last // BatchedModifications message, the caller sets the ready flag in the message indicating // modifications are complete. The reply contains the cohort actor path (this actor) for the caller @@ -446,14 +453,15 @@ public class Shard extends RaftActor { try { commitCoordinator.handleReadyLocalTransaction(message, getSender(), this); } catch (Exception e) { - LOG.error("{}: Error handling LocalModifications for Tx {}", persistenceId(), + LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(), message.getTransactionID(), e); getSender().tell(new akka.actor.Status.Failure(e), getSelf()); } } else { ActorSelection leader = getLeader(); if (leader != null) { - LOG.debug("{}: Forwarding LocalModifications to leader {}", persistenceId(), leader); + LOG.debug("{}: Forwarding ReadyLocalTransaction to leader {}", persistenceId(), leader); + message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion()); leader.forward(message, getContext()); } else { noLeaderError(message); @@ -582,7 +590,8 @@ public class Shard extends RaftActor { } @Override - protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() { + @VisibleForTesting + public RaftActorSnapshotCohort getRaftActorSnapshotCohort() { return snapshotCohort; } @@ -594,6 +603,7 @@ public class Shard extends RaftActor { @Override protected void onRecoveryComplete() { + store.recoveryDone(); //notify shard manager getContext().parent().tell(new ActorInitialized(), getSelf());