X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=d52d3a3bf20e576fce0556e866a53cf74d7c6a4d;hp=1b12462a4ab8e4125788f45de3e76f9e1417f6a9;hb=925cb4a228d0fda99c7bfeb432eb25285a223887;hpb=edd61d79da614388134b0e0a618010c91e9c91bd diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 1b12462a4a..d52d3a3bf2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -71,10 +71,9 @@ import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; /** - * A Shard represents a portion of the logical data tree
- *

+ * A Shard represents a portion of the logical data tree. + *

* Our Shard uses InMemoryDataTree as it's internal representation and delegates all requests it - *

*/ public class Shard extends RaftActor { @@ -147,7 +146,7 @@ public class Shard extends RaftActor { new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher"); ShardDataChangeListenerPublisherActorProxy dataChangeListenerPublisher = new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher"); - if(builder.getDataTree() != null) { + if (builder.getDataTree() != null) { store = new ShardDataTree(this, builder.getSchemaContext(), builder.getDataTree(), treeChangeListenerPublisher, dataChangeListenerPublisher, name); } else { @@ -262,16 +261,16 @@ public class Shard extends RaftActor { commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this); } else if (message instanceof DatastoreContext) { onDatastoreContext((DatastoreContext)message); - } else if (message instanceof RegisterRoleChangeListener){ + } else if (message instanceof RegisterRoleChangeListener) { roleChangeNotifier.get().forward(message, context()); } else if (message instanceof FollowerInitialSyncUpStatus) { shardMBean.setFollowerInitialSyncStatus(((FollowerInitialSyncUpStatus) message).isInitialSyncDone()); context().parent().tell(message, self()); - } else if (GET_SHARD_MBEAN_MESSAGE.equals(message)){ + } else if (GET_SHARD_MBEAN_MESSAGE.equals(message)) { sender().tell(getShardMBean(), self()); } else if (message instanceof GetShardDataTree) { sender().tell(store.getDataTree(), self()); - } else if (message instanceof ServerRemoved){ + } else if (message instanceof ServerRemoved) { context().parent().forward(message, context()); } else if (ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) { messageRetrySupport.onTimerMessage(message); @@ -302,7 +301,8 @@ public class Shard extends RaftActor { } @Override - protected LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId, final short leaderPayloadVersion) { + protected LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId, + final short leaderPayloadVersion) { return isLeader() ? new ShardLeaderStateChanged(memberId, leaderId, store.getDataTree(), leaderPayloadVersion) : new ShardLeaderStateChanged(memberId, leaderId, leaderPayloadVersion); } @@ -312,11 +312,7 @@ public class Shard extends RaftActor { setTransactionCommitTimeout(); - if (datastoreContext.isPersistent() && !persistence().isRecoveryApplicable()) { - setPersistence(true); - } else if (!datastoreContext.isPersistent() && persistence().isRecoveryApplicable()) { - setPersistence(false); - } + setPersistence(datastoreContext.isPersistent()); updateConfigParams(datastoreContext.getShardRaftConfig()); } @@ -352,7 +348,7 @@ public class Shard extends RaftActor { LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID()); if (isLeader()) { - commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this); + commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this); } else { ActorSelection leader = getLeader(); if (leader == null) { @@ -365,6 +361,7 @@ public class Shard extends RaftActor { } } + @SuppressWarnings("checkstyle:IllegalCatch") protected void handleBatchedModificationsLocal(final BatchedModifications batched, final ActorRef sender) { try { commitCoordinator.handleBatchedModifications(batched, sender, this); @@ -401,8 +398,9 @@ public class Shard extends RaftActor { // we need to reconstruct previous BatchedModifications from the transaction // DataTreeModification, honoring the max batched modification count, and forward all the // previous BatchedModifications to the new leader. - Collection newModifications = commitCoordinator.createForwardedBatchedModifications( - batched, datastoreContext.getShardBatchedModificationCount()); + Collection newModifications = commitCoordinator + .createForwardedBatchedModifications(batched, + datastoreContext.getShardBatchedModificationCount()); LOG.debug("{}: Forwarding {} BatchedModifications to leader {}", persistenceId(), newModifications.size(), leader); @@ -417,8 +415,8 @@ public class Shard extends RaftActor { private boolean failIfIsolatedLeader(final ActorRef sender) { if (isIsolatedLeader()) { sender.tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format( - "Shard %s was the leader but has lost contact with all of its followers. Either all" + - " other follower nodes are down or this node is isolated by a network partition.", + "Shard %s was the leader but has lost contact with all of its followers. Either all" + + " other follower nodes are down or this node is isolated by a network partition.", persistenceId()))), getSelf()); return true; } @@ -430,6 +428,7 @@ public class Shard extends RaftActor { return getRaftState() == RaftState.IsolatedLeader; } + @SuppressWarnings("checkstyle:IllegalCatch") private void handleReadyLocalTransaction(final ReadyLocalTransaction message) { LOG.debug("{}: handleReadyLocalTransaction for {}", persistenceId(), message.getTransactionID()); @@ -500,10 +499,11 @@ public class Shard extends RaftActor { store.closeTransactionChain(closeTransactionChain.getIdentifier()); } + @SuppressWarnings("checkstyle:IllegalCatch") private void createTransaction(final CreateTransaction createTransaction) { try { - if (TransactionType.fromInt(createTransaction.getTransactionType()) != TransactionType.READ_ONLY && - failIfIsolatedLeader(getSender())) { + if (TransactionType.fromInt(createTransaction.getTransactionType()) != TransactionType.READ_ONLY + && failIfIsolatedLeader(getSender())) { return; } @@ -614,8 +614,7 @@ public class Shard extends RaftActor { // them to transaction messages and send to the new leader. ActorSelection leader = getLeader(); if (leader != null) { - Collection messagesToForward = commitCoordinator.convertPendingTransactionsToMessages( - datastoreContext.getShardBatchedModificationCount()); + Collection messagesToForward = convertPendingTransactionsToMessages(); if (!messagesToForward.isEmpty()) { LOG.debug("{}: Forwarding {} pending transaction messages to leader {}", persistenceId(), @@ -626,9 +625,8 @@ public class Shard extends RaftActor { } } } else { - commitCoordinator.abortPendingTransactions( - "The transacton was aborted due to inflight leadership change and the leader address isn't available.", - this); + commitCoordinator.abortPendingTransactions("The transacton was aborted due to inflight leadership " + + "change and the leader address isn't available.", this); } } @@ -637,6 +635,16 @@ public class Shard extends RaftActor { } } + /** + * Clears all pending transactions and converts them to messages to be forwarded to a new leader. + * + * @return the converted messages + */ + public Collection convertPendingTransactionsToMessages() { + return commitCoordinator.convertPendingTransactionsToMessages( + datastoreContext.getShardBatchedModificationCount()); + } + @Override protected void pauseLeader(final Runnable operation) { LOG.debug("{}: In pauseLeader, operation: {}", persistenceId(), operation); @@ -671,7 +679,7 @@ public class Shard extends RaftActor { return new Builder(); } - public static abstract class AbstractBuilder, S extends Shard> { + public abstract static class AbstractBuilder, S extends Shard> { private final Class shardClass; private ShardIdentifier id; private Map peerAddresses = Collections.emptyMap(); @@ -694,39 +702,39 @@ public class Shard extends RaftActor { return (T) this; } - public T id(final ShardIdentifier id) { + public T id(final ShardIdentifier newId) { checkSealed(); - this.id = id; + this.id = newId; return self(); } - public T peerAddresses(final Map peerAddresses) { + public T peerAddresses(final Map newPeerAddresses) { checkSealed(); - this.peerAddresses = peerAddresses; + this.peerAddresses = newPeerAddresses; return self(); } - public T datastoreContext(final DatastoreContext datastoreContext) { + public T datastoreContext(final DatastoreContext newDatastoreContext) { checkSealed(); - this.datastoreContext = datastoreContext; + this.datastoreContext = newDatastoreContext; return self(); } - public T schemaContext(final SchemaContext schemaContext) { + public T schemaContext(final SchemaContext newSchemaContext) { checkSealed(); - this.schemaContext = schemaContext; + this.schemaContext = newSchemaContext; return self(); } - public T restoreFromSnapshot(final DatastoreSnapshot.ShardSnapshot restoreFromSnapshot) { + public T restoreFromSnapshot(final DatastoreSnapshot.ShardSnapshot newRestoreFromSnapshot) { checkSealed(); - this.restoreFromSnapshot = restoreFromSnapshot; + this.restoreFromSnapshot = newRestoreFromSnapshot; return self(); } - public T dataTree(final TipProducingDataTree dataTree) { + public T dataTree(final TipProducingDataTree newDataTree) { checkSealed(); - this.dataTree = dataTree; + this.dataTree = newDataTree; return self(); } @@ -756,13 +764,14 @@ public class Shard extends RaftActor { public TreeType getTreeType() { switch (datastoreContext.getLogicalStoreType()) { - case CONFIGURATION: - return TreeType.CONFIGURATION; - case OPERATIONAL: - return TreeType.OPERATIONAL; + case CONFIGURATION: + return TreeType.CONFIGURATION; + case OPERATIONAL: + return TreeType.OPERATIONAL; + default: + throw new IllegalStateException("Unhandled logical store type " + + datastoreContext.getLogicalStoreType()); } - - throw new IllegalStateException("Unhandled logical store type " + datastoreContext.getLogicalStoreType()); } protected void verify() {