X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=dea377a810cb73be01cf4c17dab93c4c2b476c1b;hb=0d4c11af06567b4692b8894bbe2cac16cb4db0ad;hp=7d6dde9c8af296df1b82f331f431daaa0d431832;hpb=da873284e3ed90af065b81ca219dfa72ea5545a3;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 7d6dde9c8a..dea377a810 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -27,6 +27,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; +import java.io.IOException; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -63,6 +64,7 @@ import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeList import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.modification.Modification; +import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec; import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; @@ -95,11 +97,10 @@ import scala.concurrent.duration.FiniteDuration; */ public class Shard extends RaftActor { - private static final Object COMMIT_TRANSACTION_REPLY = new CommitTransactionReply().toSerializable(); - private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck"; - public static final String DEFAULT_NAME = "default"; + @VisibleForTesting + static final String DEFAULT_NAME = "default"; // The state of this Shard private final InMemoryDOMDataStore store; @@ -133,7 +134,7 @@ public class Shard extends RaftActor { private Cancellable txCommitTimeoutCheckSchedule; - private Optional roleChangeNotifier; + private final Optional roleChangeNotifier; /** * Coordinates persistence recovery on startup. @@ -321,9 +322,15 @@ public class Shard extends RaftActor { // currently uses a same thread executor anyway. cohortEntry.getCohort().preCommit().get(); - Shard.this.persistData(getSender(), transactionID, - new CompositeModificationByteStringPayload(cohortEntry.getModification().toSerializable())); - } catch (InterruptedException | ExecutionException e) { + // If we do not have any followers and we are not using persistence we can + // apply modification to the state immediately + if(!hasFollowers() && !persistence().isRecoveryApplicable()){ + applyModificationToState(getSender(), transactionID, cohortEntry.getModification()); + } else { + Shard.this.persistData(getSender(), transactionID, + new ModificationPayload(cohortEntry.getModification())); + } + } catch (InterruptedException | ExecutionException | IOException e) { LOG.error(e, "An exception occurred while preCommitting transaction {}", cohortEntry.getTransactionID()); shardMBean.incrementFailedTransactionsCount(); @@ -348,7 +355,7 @@ public class Shard extends RaftActor { cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID); if(cohortEntry != null) { commitWithNewTransaction(cohortEntry.getModification()); - sender.tell(COMMIT_TRANSACTION_REPLY, getSelf()); + sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf()); } else { // This really shouldn't happen - it likely means that persistence or replication // took so long to complete such that the cohort entry was expired from the cache. @@ -370,7 +377,7 @@ public class Shard extends RaftActor { // currently uses a same thread executor anyway. cohortEntry.getCohort().commit().get(); - sender.tell(COMMIT_TRANSACTION_REPLY, getSelf()); + sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf()); shardMBean.incrementCommittedTransactionCount(); shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis()); @@ -406,7 +413,7 @@ public class Shard extends RaftActor { // transactionId so to maintain backwards compatibility, we create a separate cohort actor // to provide the compatible behavior. ActorRef replyActorPath = self(); - if(ready.getTxnClientVersion() < CreateTransaction.HELIUM_1_VERSION) { + if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) { LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort"); replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props( ready.getTransactionID())); @@ -422,7 +429,7 @@ public class Shard extends RaftActor { doAbortTransaction(abort.getTransactionID(), getSender()); } - private void doAbortTransaction(final String transactionID, final ActorRef sender) { + void doAbortTransaction(final String transactionID, final ActorRef sender) { final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID); if(cohortEntry != null) { LOG.debug("Aborting transaction {}", transactionID); @@ -441,7 +448,7 @@ public class Shard extends RaftActor { shardMBean.incrementAbortTransactionsCount(); if(sender != null) { - sender.tell(new AbortTransactionReply().toSerializable(), self); + sender.tell(AbortTransactionReply.INSTANCE.toSerializable(), self); } } @@ -474,7 +481,7 @@ public class Shard extends RaftActor { // This must be for install snapshot. Don't want to open this up and trigger // deSerialization - self().tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)), + self().tell(new CaptureSnapshotReply(ReadDataReply.fromSerializableAsByteString(message)), self()); createSnapshotTransaction = null; @@ -494,7 +501,8 @@ public class Shard extends RaftActor { } private ActorRef createTypedTransactionActor(int transactionType, - ShardTransactionIdentifier transactionId, String transactionChainId, int clientVersion ) { + ShardTransactionIdentifier transactionId, String transactionChainId, + short clientVersion ) { DOMStoreTransactionFactory factory = store; @@ -562,7 +570,7 @@ public class Shard extends RaftActor { } private ActorRef createTransaction(int transactionType, String remoteTransactionId, - String transactionChainId, int clientVersion) { + String transactionChainId, short clientVersion) { ShardTransactionIdentifier transactionId = ShardTransactionIdentifier.builder() @@ -653,7 +661,7 @@ public class Shard extends RaftActor { dataChangeListeners.add(dataChangeListenerPath); AsyncDataChangeListener> listener = - new DataChangeListenerProxy(schemaContext, dataChangeListenerPath); + new DataChangeListenerProxy(dataChangeListenerPath); LOG.debug("Registering for path {}", registerChangeListener.getPath()); @@ -678,7 +686,13 @@ public class Shard extends RaftActor { @Override protected void appendRecoveredLogEntry(final Payload data) { - if (data instanceof CompositeModificationPayload) { + if(data instanceof ModificationPayload) { + try { + currentLogRecoveryBatch.add(((ModificationPayload) data).getModification()); + } catch (ClassNotFoundException | IOException e) { + LOG.error(e, "Error extracting ModificationPayload"); + } + } else if (data instanceof CompositeModificationPayload) { currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification()); } else if (data instanceof CompositeModificationByteStringPayload) { currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification()); @@ -755,7 +769,14 @@ public class Shard extends RaftActor { @Override protected void applyState(final ActorRef clientActor, final String identifier, final Object data) { - if (data instanceof CompositeModificationPayload) { + if(data instanceof ModificationPayload) { + try { + applyModificationToState(clientActor, identifier, ((ModificationPayload) data).getModification()); + } catch (ClassNotFoundException | IOException e) { + LOG.error(e, "Error extracting ModificationPayload"); + } + } + else if (data instanceof CompositeModificationPayload) { Object modification = ((CompositeModificationPayload) data).getModification(); applyModificationToState(clientActor, identifier, modification); @@ -763,7 +784,6 @@ public class Shard extends RaftActor { Object modification = ((CompositeModificationByteStringPayload) data).getModification(); applyModificationToState(clientActor, identifier, modification); - } else { LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}", data, data.getClass().getClassLoader(), @@ -782,8 +802,7 @@ public class Shard extends RaftActor { } else if(clientActor == null) { // There's no clientActor to which to send a commit reply so we must be applying // replicated state from the leader. - commitWithNewTransaction(MutableCompositeModification.fromSerializable( - modification, schemaContext)); + commitWithNewTransaction(MutableCompositeModification.fromSerializable(modification)); } else { // This must be the OK to commit after replication consensus. finishCommit(clientActor, identifier); @@ -812,7 +831,7 @@ public class Shard extends RaftActor { createSnapshotTransaction = createTransaction( TransactionProxy.TransactionType.READ_ONLY.ordinal(), "createSnapshot" + ++createSnapshotTransactionCounter, "", - CreateTransaction.CURRENT_VERSION); + DataStoreVersions.CURRENT_VERSION); createSnapshotTransaction.tell( new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());