X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=02a14022c4366d1cef8380da0183e8ec19e56a4a;hb=175f38490b56c4b4e0ec356b17b91f887b295da4;hp=a647554806034f5b21d52e04dd8530dd6bb4d51d;hpb=6313c088fc7db266cc25b691e0cd909300fc8425;p=controller.git
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
index a647554806..02a14022c4 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
@@ -23,14 +23,13 @@ import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
import org.opendaylight.controller.cluster.common.actor.MessageTracker;
import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error;
import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
-import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
@@ -52,8 +51,6 @@ import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeList
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-import org.opendaylight.controller.cluster.datastore.modification.Modification;
-import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
@@ -65,8 +62,7 @@ import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import org.opendaylight.yangtools.concepts.Identifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
@@ -82,6 +78,7 @@ import scala.concurrent.duration.FiniteDuration;
*
*/
public class Shard extends RaftActor {
+
@VisibleForTesting
static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = new Object() {
@Override
@@ -173,7 +170,8 @@ public class Shard extends RaftActor {
new Dispatchers(context().system().dispatchers()).getDispatcherPath(
Dispatchers.DispatcherType.Transaction), self(), getContext(), shardMBean);
- snapshotCohort = new ShardSnapshotCohort(transactionActorFactory, store, LOG, this.name);
+ snapshotCohort = new ShardSnapshotCohort(builder.getId().getMemberName(), transactionActorFactory, store,
+ LOG, this.name);
messageRetrySupport = new ShardTransactionMessageRetrySupport(this);
}
@@ -218,7 +216,7 @@ public class Shard extends RaftActor {
}
@Override
- protected void handleCommand(final Object message) {
+ protected void handleNonRaftCommand(final Object message) {
try (final MessageTracker.Context context = appendEntriesReplyTracker.received(message)) {
final Optional maybeError = context.error();
if (maybeError.isPresent()) {
@@ -269,8 +267,11 @@ public class Shard extends RaftActor {
context().parent().forward(message, context());
} else if(ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) {
messageRetrySupport.onTimerMessage(message);
+ } else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand) {
+ commitCoordinator.processCohortRegistryCommand(getSender(),
+ (DataTreeCohortActorRegistry.CohortRegistryCommand) message);
} else {
- super.handleCommand(message);
+ super.handleNonRaftCommand(message);
}
}
}
@@ -327,7 +328,7 @@ public class Shard extends RaftActor {
if ((!hasFollowers() && !persistence().isRecoveryApplicable()) || isEmptyCommit(candidate)) {
applyModificationToState(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), candidate);
} else {
- Shard.this.persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(),
+ persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(),
DataTreeCandidatePayload.create(candidate));
}
}
@@ -349,12 +350,13 @@ public class Shard extends RaftActor {
}
}
- private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final String transactionID, @Nonnull final CohortEntry cohortEntry) {
+ private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final Identifier transactionID,
+ @Nonnull final CohortEntry cohortEntry) {
LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
try {
try {
- cohortEntry.commit();
+ cohortEntry.commit();
} catch(ExecutionException e) {
// We may get a "store tree and candidate base differ" IllegalStateException from commit under
// certain edge case scenarios so we'll try to re-apply the candidate from scratch as a last
@@ -391,7 +393,7 @@ public class Shard extends RaftActor {
}
}
- private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull String transactionID) {
+ private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull Identifier transactionID) {
// With persistence enabled, this method is called via applyState by the leader strategy
// after the commit has been replicated to a majority of the followers.
@@ -432,7 +434,7 @@ public class Shard extends RaftActor {
LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID());
if (isLeader()) {
- commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
+ commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
} else {
ActorSelection leader = getLeader();
if (leader == null) {
@@ -447,7 +449,7 @@ public class Shard extends RaftActor {
protected void handleBatchedModificationsLocal(BatchedModifications batched, ActorRef sender) {
try {
- commitCoordinator.handleBatchedModifications(batched, sender, this);
+ commitCoordinator.handleBatchedModifications(batched, sender, this, store.getSchemaContext());
} catch (Exception e) {
LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
batched.getTransactionID(), e);
@@ -516,7 +518,7 @@ public class Shard extends RaftActor {
boolean isLeaderActive = isLeaderActive();
if (isLeader() && isLeaderActive) {
try {
- commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
+ commitCoordinator.handleReadyLocalTransaction(message, getSender(), this, store.getSchemaContext());
} catch (Exception e) {
LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(),
message.getTransactionID(), e);
@@ -540,7 +542,8 @@ public class Shard extends RaftActor {
boolean isLeaderActive = isLeaderActive();
if (isLeader() && isLeaderActive) {
- commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this);
+ commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this,
+ store.getSchemaContext());
} else {
ActorSelection leader = getLeader();
if (!isLeaderActive || leader == null) {
@@ -561,7 +564,7 @@ public class Shard extends RaftActor {
doAbortTransaction(abort.getTransactionID(), getSender());
}
- void doAbortTransaction(final String transactionID, final ActorRef sender) {
+ void doAbortTransaction(final TransactionIdentifier transactionID, final ActorRef sender) {
commitCoordinator.handleAbort(transactionID, sender, this);
}
@@ -580,13 +583,6 @@ public class Shard extends RaftActor {
store.closeTransactionChain(closeTransactionChain.getTransactionChainId());
}
- private ActorRef createTypedTransactionActor(int transactionType,
- ShardTransactionIdentifier transactionId, String transactionChainId) {
-
- return transactionActorFactory.newShardTransaction(TransactionType.fromInt(transactionType),
- transactionId, transactionChainId);
- }
-
private void createTransaction(CreateTransaction createTransaction) {
try {
if(TransactionType.fromInt(createTransaction.getTransactionType()) != TransactionType.READ_ONLY &&
@@ -595,7 +591,7 @@ public class Shard extends RaftActor {
}
ActorRef transactionActor = createTransaction(createTransaction.getTransactionType(),
- createTransaction.getTransactionId(), createTransaction.getTransactionChainId());
+ createTransaction.getTransactionId());
getSender().tell(new CreateTransactionReply(Serialization.serializedActorPath(transactionActor),
createTransaction.getTransactionId(), createTransaction.getVersion()).toSerializable(), getSelf());
@@ -604,24 +600,14 @@ public class Shard extends RaftActor {
}
}
- private ActorRef createTransaction(int transactionType, String remoteTransactionId,
- String transactionChainId) {
-
-
- ShardTransactionIdentifier transactionId = new ShardTransactionIdentifier(remoteTransactionId);
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Creating transaction : {} ", persistenceId(), transactionId);
- }
-
- ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId,
- transactionChainId);
-
- return transactionActor;
+ private ActorRef createTransaction(int transactionType, TransactionIdentifier transactionId) {
+ LOG.debug("{}: Creating transaction : {} ", persistenceId(), transactionId);
+ return transactionActorFactory.newShardTransaction(TransactionType.fromInt(transactionType),
+ transactionId);
}
- private void commitWithNewTransaction(final Modification modification) {
- ReadWriteShardDataTreeTransaction tx = store.newReadWriteTransaction(modification.toString(), null);
+ private void commitWithNewTransaction(final BatchedModifications modification) {
+ ReadWriteShardDataTreeTransaction tx = store.newReadWriteTransaction(modification.getTransactionID());
modification.apply(tx.getSnapshot());
try {
snapshotCohort.syncCommitTransaction(tx);
@@ -679,7 +665,7 @@ public class Shard extends RaftActor {
}
@Override
- protected void applyState(final ActorRef clientActor, final String identifier, final Object data) {
+ protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
if (data instanceof DataTreeCandidatePayload) {
if (clientActor == null) {
// No clientActor indicates a replica coming from the leader
@@ -692,22 +678,13 @@ public class Shard extends RaftActor {
// Replication consensus reached, proceed to commit
finishCommit(clientActor, identifier);
}
- } else if (data instanceof CompositeModificationPayload) {
- Object modification = ((CompositeModificationPayload) data).getModification();
-
- applyModificationToState(clientActor, identifier, modification);
- } else if(data instanceof CompositeModificationByteStringPayload ){
- Object modification = ((CompositeModificationByteStringPayload) data).getModification();
-
- applyModificationToState(clientActor, identifier, modification);
} else {
- LOG.error("{}: Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
- persistenceId(), data, data.getClass().getClassLoader(),
- CompositeModificationPayload.class.getClassLoader());
+ LOG.error("{}: Unknown state received {} ClassLoader {}", persistenceId(), data,
+ data.getClass().getClassLoader());
}
}
- private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) {
+ private void applyModificationToState(ActorRef clientActor, Identifier identifier, Object modification) {
if(modification == null) {
LOG.error(
"{}: modification is null - this is very unexpected, clientActor = {}, identifier = {}",
@@ -715,7 +692,11 @@ public class Shard extends RaftActor {
} else if(clientActor == null) {
// There's no clientActor to which to send a commit reply so we must be applying
// replicated state from the leader.
- commitWithNewTransaction(MutableCompositeModification.fromSerializable(modification));
+
+ // The only implementation we know of is BatchedModifications, which also carries a transaction
+ // identifier -- which we really need that.
+ Preconditions.checkArgument(modification instanceof BatchedModifications);
+ commitWithNewTransaction((BatchedModifications)modification);
} else {
// This must be the OK to commit after replication consensus.
finishCommit(clientActor, identifier);