X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=a3ef0339b7571172dfb0d5b2338f52911a86cf9c;hp=7d6dde9c8af296df1b82f331f431daaa0d431832;hb=3f153e5fa694fe4147e72e615edbb5c263e5a394;hpb=deb9baa6423b052d51bb4ea354b8582aac76e41e diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 7d6dde9c8a..a3ef0339b7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -95,11 +95,10 @@ import scala.concurrent.duration.FiniteDuration; */ public class Shard extends RaftActor { - private static final Object COMMIT_TRANSACTION_REPLY = new CommitTransactionReply().toSerializable(); - private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck"; - public static final String DEFAULT_NAME = "default"; + @VisibleForTesting + static final String DEFAULT_NAME = "default"; // The state of this Shard private final InMemoryDOMDataStore store; @@ -133,7 +132,7 @@ public class Shard extends RaftActor { private Cancellable txCommitTimeoutCheckSchedule; - private Optional roleChangeNotifier; + private final Optional roleChangeNotifier; /** * Coordinates persistence recovery on startup. @@ -321,8 +320,14 @@ public class Shard extends RaftActor { // currently uses a same thread executor anyway. cohortEntry.getCohort().preCommit().get(); - Shard.this.persistData(getSender(), transactionID, - new CompositeModificationByteStringPayload(cohortEntry.getModification().toSerializable())); + // If we do not have any followers and we are not using persistence we can + // apply modification to the state immediately + if(!hasFollowers() && !persistence().isRecoveryApplicable()){ + applyModificationToState(getSender(), transactionID, cohortEntry.getModification()); + } else { + Shard.this.persistData(getSender(), transactionID, + new CompositeModificationByteStringPayload(cohortEntry.getModification().toSerializable())); + } } catch (InterruptedException | ExecutionException e) { LOG.error(e, "An exception occurred while preCommitting transaction {}", cohortEntry.getTransactionID()); @@ -348,7 +353,7 @@ public class Shard extends RaftActor { cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID); if(cohortEntry != null) { commitWithNewTransaction(cohortEntry.getModification()); - sender.tell(COMMIT_TRANSACTION_REPLY, getSelf()); + sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf()); } else { // This really shouldn't happen - it likely means that persistence or replication // took so long to complete such that the cohort entry was expired from the cache. @@ -370,7 +375,7 @@ public class Shard extends RaftActor { // currently uses a same thread executor anyway. cohortEntry.getCohort().commit().get(); - sender.tell(COMMIT_TRANSACTION_REPLY, getSelf()); + sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf()); shardMBean.incrementCommittedTransactionCount(); shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis()); @@ -406,7 +411,7 @@ public class Shard extends RaftActor { // transactionId so to maintain backwards compatibility, we create a separate cohort actor // to provide the compatible behavior. ActorRef replyActorPath = self(); - if(ready.getTxnClientVersion() < CreateTransaction.HELIUM_1_VERSION) { + if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) { LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort"); replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props( ready.getTransactionID())); @@ -422,7 +427,7 @@ public class Shard extends RaftActor { doAbortTransaction(abort.getTransactionID(), getSender()); } - private void doAbortTransaction(final String transactionID, final ActorRef sender) { + void doAbortTransaction(final String transactionID, final ActorRef sender) { final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID); if(cohortEntry != null) { LOG.debug("Aborting transaction {}", transactionID); @@ -441,7 +446,7 @@ public class Shard extends RaftActor { shardMBean.incrementAbortTransactionsCount(); if(sender != null) { - sender.tell(new AbortTransactionReply().toSerializable(), self); + sender.tell(AbortTransactionReply.INSTANCE.toSerializable(), self); } } @@ -474,7 +479,7 @@ public class Shard extends RaftActor { // This must be for install snapshot. Don't want to open this up and trigger // deSerialization - self().tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)), + self().tell(new CaptureSnapshotReply(ReadDataReply.fromSerializableAsByteString(message)), self()); createSnapshotTransaction = null; @@ -494,7 +499,8 @@ public class Shard extends RaftActor { } private ActorRef createTypedTransactionActor(int transactionType, - ShardTransactionIdentifier transactionId, String transactionChainId, int clientVersion ) { + ShardTransactionIdentifier transactionId, String transactionChainId, + short clientVersion ) { DOMStoreTransactionFactory factory = store; @@ -562,7 +568,7 @@ public class Shard extends RaftActor { } private ActorRef createTransaction(int transactionType, String remoteTransactionId, - String transactionChainId, int clientVersion) { + String transactionChainId, short clientVersion) { ShardTransactionIdentifier transactionId = ShardTransactionIdentifier.builder() @@ -653,7 +659,7 @@ public class Shard extends RaftActor { dataChangeListeners.add(dataChangeListenerPath); AsyncDataChangeListener> listener = - new DataChangeListenerProxy(schemaContext, dataChangeListenerPath); + new DataChangeListenerProxy(dataChangeListenerPath); LOG.debug("Registering for path {}", registerChangeListener.getPath()); @@ -812,7 +818,7 @@ public class Shard extends RaftActor { createSnapshotTransaction = createTransaction( TransactionProxy.TransactionType.READ_ONLY.ordinal(), "createSnapshot" + ++createSnapshotTransactionCounter, "", - CreateTransaction.CURRENT_VERSION); + DataStoreVersions.CURRENT_VERSION); createSnapshotTransaction.tell( new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());