X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=a3ef0339b7571172dfb0d5b2338f52911a86cf9c;hp=af16d02eea2909ee75e83e2b62dc58bc633b7031;hb=413bae822cdbf37f4dc16ebe14cab621953e817a;hpb=cf161562786a13b5c68d0da10524359ee3a732e7 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index af16d02eea..a3ef0339b7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -69,6 +69,7 @@ import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; import org.opendaylight.controller.cluster.raft.RaftActor; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; @@ -94,11 +95,10 @@ import scala.concurrent.duration.FiniteDuration; */ public class Shard extends RaftActor { - private static final Object COMMIT_TRANSACTION_REPLY = new CommitTransactionReply().toSerializable(); - private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck"; - public static final String DEFAULT_NAME = "default"; + @VisibleForTesting + static final String DEFAULT_NAME = "default"; // The state of this Shard private final InMemoryDOMDataStore store; @@ -132,7 +132,7 @@ public class Shard extends RaftActor { private Cancellable txCommitTimeoutCheckSchedule; - private Optional roleChangeNotifier; + private final Optional roleChangeNotifier; /** * Coordinates persistence recovery on startup. @@ -320,8 +320,14 @@ public class Shard extends RaftActor { // currently uses a same thread executor anyway. cohortEntry.getCohort().preCommit().get(); - Shard.this.persistData(getSender(), transactionID, - new CompositeModificationPayload(cohortEntry.getModification().toSerializable())); + // If we do not have any followers and we are not using persistence we can + // apply modification to the state immediately + if(!hasFollowers() && !persistence().isRecoveryApplicable()){ + applyModificationToState(getSender(), transactionID, cohortEntry.getModification()); + } else { + Shard.this.persistData(getSender(), transactionID, + new CompositeModificationByteStringPayload(cohortEntry.getModification().toSerializable())); + } } catch (InterruptedException | ExecutionException e) { LOG.error(e, "An exception occurred while preCommitting transaction {}", cohortEntry.getTransactionID()); @@ -347,7 +353,7 @@ public class Shard extends RaftActor { cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID); if(cohortEntry != null) { commitWithNewTransaction(cohortEntry.getModification()); - sender.tell(COMMIT_TRANSACTION_REPLY, getSelf()); + sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf()); } else { // This really shouldn't happen - it likely means that persistence or replication // took so long to complete such that the cohort entry was expired from the cache. @@ -369,7 +375,7 @@ public class Shard extends RaftActor { // currently uses a same thread executor anyway. cohortEntry.getCohort().commit().get(); - sender.tell(COMMIT_TRANSACTION_REPLY, getSelf()); + sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf()); shardMBean.incrementCommittedTransactionCount(); shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis()); @@ -405,7 +411,7 @@ public class Shard extends RaftActor { // transactionId so to maintain backwards compatibility, we create a separate cohort actor // to provide the compatible behavior. ActorRef replyActorPath = self(); - if(ready.getTxnClientVersion() < CreateTransaction.HELIUM_1_VERSION) { + if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) { LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort"); replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props( ready.getTransactionID())); @@ -421,7 +427,7 @@ public class Shard extends RaftActor { doAbortTransaction(abort.getTransactionID(), getSender()); } - private void doAbortTransaction(final String transactionID, final ActorRef sender) { + void doAbortTransaction(final String transactionID, final ActorRef sender) { final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID); if(cohortEntry != null) { LOG.debug("Aborting transaction {}", transactionID); @@ -440,7 +446,7 @@ public class Shard extends RaftActor { shardMBean.incrementAbortTransactionsCount(); if(sender != null) { - sender.tell(new AbortTransactionReply().toSerializable(), self); + sender.tell(AbortTransactionReply.INSTANCE.toSerializable(), self); } } @@ -473,7 +479,7 @@ public class Shard extends RaftActor { // This must be for install snapshot. Don't want to open this up and trigger // deSerialization - self().tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)), + self().tell(new CaptureSnapshotReply(ReadDataReply.fromSerializableAsByteString(message)), self()); createSnapshotTransaction = null; @@ -493,7 +499,8 @@ public class Shard extends RaftActor { } private ActorRef createTypedTransactionActor(int transactionType, - ShardTransactionIdentifier transactionId, String transactionChainId, int clientVersion ) { + ShardTransactionIdentifier transactionId, String transactionChainId, + short clientVersion ) { DOMStoreTransactionFactory factory = store; @@ -561,7 +568,7 @@ public class Shard extends RaftActor { } private ActorRef createTransaction(int transactionType, String remoteTransactionId, - String transactionChainId, int clientVersion) { + String transactionChainId, short clientVersion) { ShardTransactionIdentifier transactionId = ShardTransactionIdentifier.builder() @@ -652,7 +659,7 @@ public class Shard extends RaftActor { dataChangeListeners.add(dataChangeListenerPath); AsyncDataChangeListener> listener = - new DataChangeListenerProxy(schemaContext, dataChangeListenerPath); + new DataChangeListenerProxy(dataChangeListenerPath); LOG.debug("Registering for path {}", registerChangeListener.getPath()); @@ -679,6 +686,8 @@ public class Shard extends RaftActor { protected void appendRecoveredLogEntry(final Payload data) { if (data instanceof CompositeModificationPayload) { currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification()); + } else if (data instanceof CompositeModificationByteStringPayload) { + currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification()); } else { LOG.error("Unknown state received {} during recovery", data); } @@ -755,19 +764,12 @@ public class Shard extends RaftActor { if (data instanceof CompositeModificationPayload) { Object modification = ((CompositeModificationPayload) data).getModification(); - if(modification == null) { - LOG.error( - "modification is null - this is very unexpected, clientActor = {}, identifier = {}", - identifier, clientActor != null ? clientActor.path().toString() : null); - } else if(clientActor == null) { - // There's no clientActor to which to send a commit reply so we must be applying - // replicated state from the leader. - commitWithNewTransaction(MutableCompositeModification.fromSerializable( - modification, schemaContext)); - } else { - // This must be the OK to commit after replication consensus. - finishCommit(clientActor, identifier); - } + applyModificationToState(clientActor, identifier, modification); + } else if(data instanceof CompositeModificationByteStringPayload ){ + Object modification = ((CompositeModificationByteStringPayload) data).getModification(); + + applyModificationToState(clientActor, identifier, modification); + } else { LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}", data, data.getClass().getClassLoader(), @@ -778,6 +780,22 @@ public class Shard extends RaftActor { } + private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) { + if(modification == null) { + LOG.error( + "modification is null - this is very unexpected, clientActor = {}, identifier = {}", + identifier, clientActor != null ? clientActor.path().toString() : null); + } else if(clientActor == null) { + // There's no clientActor to which to send a commit reply so we must be applying + // replicated state from the leader. + commitWithNewTransaction(MutableCompositeModification.fromSerializable( + modification, schemaContext)); + } else { + // This must be the OK to commit after replication consensus. + finishCommit(clientActor, identifier); + } + } + private void updateJournalStats() { ReplicatedLogEntry lastLogEntry = getLastLogEntry(); @@ -788,7 +806,7 @@ public class Shard extends RaftActor { shardMBean.setCommitIndex(getCommitIndex()); shardMBean.setLastApplied(getLastApplied()); - shardMBean.setDataSize(getRaftActorContext().getReplicatedLog().dataSize()); + shardMBean.setInMemoryJournalDataSize(getRaftActorContext().getReplicatedLog().dataSize()); } @Override @@ -800,7 +818,7 @@ public class Shard extends RaftActor { createSnapshotTransaction = createTransaction( TransactionProxy.TransactionType.READ_ONLY.ordinal(), "createSnapshot" + ++createSnapshotTransactionCounter, "", - CreateTransaction.CURRENT_VERSION); + DataStoreVersions.CURRENT_VERSION); createSnapshotTransaction.tell( new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());