X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=d52d3a3bf20e576fce0556e866a53cf74d7c6a4d;hb=925cb4a228d0fda99c7bfeb432eb25285a223887;hp=1b12462a4ab8e4125788f45de3e76f9e1417f6a9;hpb=edd61d79da614388134b0e0a618010c91e9c91bd;p=controller.git
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
index 1b12462a4a..d52d3a3bf2 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
@@ -71,10 +71,9 @@ import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
/**
- * A Shard represents a portion of the logical data tree
- *
+ * A Shard represents a portion of the logical data tree.
+ *
* Our Shard uses InMemoryDataTree as it's internal representation and delegates all requests it
- *
*/
public class Shard extends RaftActor {
@@ -147,7 +146,7 @@ public class Shard extends RaftActor {
new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher");
ShardDataChangeListenerPublisherActorProxy dataChangeListenerPublisher =
new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher");
- if(builder.getDataTree() != null) {
+ if (builder.getDataTree() != null) {
store = new ShardDataTree(this, builder.getSchemaContext(), builder.getDataTree(),
treeChangeListenerPublisher, dataChangeListenerPublisher, name);
} else {
@@ -262,16 +261,16 @@ public class Shard extends RaftActor {
commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this);
} else if (message instanceof DatastoreContext) {
onDatastoreContext((DatastoreContext)message);
- } else if (message instanceof RegisterRoleChangeListener){
+ } else if (message instanceof RegisterRoleChangeListener) {
roleChangeNotifier.get().forward(message, context());
} else if (message instanceof FollowerInitialSyncUpStatus) {
shardMBean.setFollowerInitialSyncStatus(((FollowerInitialSyncUpStatus) message).isInitialSyncDone());
context().parent().tell(message, self());
- } else if (GET_SHARD_MBEAN_MESSAGE.equals(message)){
+ } else if (GET_SHARD_MBEAN_MESSAGE.equals(message)) {
sender().tell(getShardMBean(), self());
} else if (message instanceof GetShardDataTree) {
sender().tell(store.getDataTree(), self());
- } else if (message instanceof ServerRemoved){
+ } else if (message instanceof ServerRemoved) {
context().parent().forward(message, context());
} else if (ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) {
messageRetrySupport.onTimerMessage(message);
@@ -302,7 +301,8 @@ public class Shard extends RaftActor {
}
@Override
- protected LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId, final short leaderPayloadVersion) {
+ protected LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId,
+ final short leaderPayloadVersion) {
return isLeader() ? new ShardLeaderStateChanged(memberId, leaderId, store.getDataTree(), leaderPayloadVersion)
: new ShardLeaderStateChanged(memberId, leaderId, leaderPayloadVersion);
}
@@ -312,11 +312,7 @@ public class Shard extends RaftActor {
setTransactionCommitTimeout();
- if (datastoreContext.isPersistent() && !persistence().isRecoveryApplicable()) {
- setPersistence(true);
- } else if (!datastoreContext.isPersistent() && persistence().isRecoveryApplicable()) {
- setPersistence(false);
- }
+ setPersistence(datastoreContext.isPersistent());
updateConfigParams(datastoreContext.getShardRaftConfig());
}
@@ -352,7 +348,7 @@ public class Shard extends RaftActor {
LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID());
if (isLeader()) {
- commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
+ commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
} else {
ActorSelection leader = getLeader();
if (leader == null) {
@@ -365,6 +361,7 @@ public class Shard extends RaftActor {
}
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
protected void handleBatchedModificationsLocal(final BatchedModifications batched, final ActorRef sender) {
try {
commitCoordinator.handleBatchedModifications(batched, sender, this);
@@ -401,8 +398,9 @@ public class Shard extends RaftActor {
// we need to reconstruct previous BatchedModifications from the transaction
// DataTreeModification, honoring the max batched modification count, and forward all the
// previous BatchedModifications to the new leader.
- Collection newModifications = commitCoordinator.createForwardedBatchedModifications(
- batched, datastoreContext.getShardBatchedModificationCount());
+ Collection newModifications = commitCoordinator
+ .createForwardedBatchedModifications(batched,
+ datastoreContext.getShardBatchedModificationCount());
LOG.debug("{}: Forwarding {} BatchedModifications to leader {}", persistenceId(),
newModifications.size(), leader);
@@ -417,8 +415,8 @@ public class Shard extends RaftActor {
private boolean failIfIsolatedLeader(final ActorRef sender) {
if (isIsolatedLeader()) {
sender.tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
- "Shard %s was the leader but has lost contact with all of its followers. Either all" +
- " other follower nodes are down or this node is isolated by a network partition.",
+ "Shard %s was the leader but has lost contact with all of its followers. Either all"
+ + " other follower nodes are down or this node is isolated by a network partition.",
persistenceId()))), getSelf());
return true;
}
@@ -430,6 +428,7 @@ public class Shard extends RaftActor {
return getRaftState() == RaftState.IsolatedLeader;
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
private void handleReadyLocalTransaction(final ReadyLocalTransaction message) {
LOG.debug("{}: handleReadyLocalTransaction for {}", persistenceId(), message.getTransactionID());
@@ -500,10 +499,11 @@ public class Shard extends RaftActor {
store.closeTransactionChain(closeTransactionChain.getIdentifier());
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
private void createTransaction(final CreateTransaction createTransaction) {
try {
- if (TransactionType.fromInt(createTransaction.getTransactionType()) != TransactionType.READ_ONLY &&
- failIfIsolatedLeader(getSender())) {
+ if (TransactionType.fromInt(createTransaction.getTransactionType()) != TransactionType.READ_ONLY
+ && failIfIsolatedLeader(getSender())) {
return;
}
@@ -614,8 +614,7 @@ public class Shard extends RaftActor {
// them to transaction messages and send to the new leader.
ActorSelection leader = getLeader();
if (leader != null) {
- Collection> messagesToForward = commitCoordinator.convertPendingTransactionsToMessages(
- datastoreContext.getShardBatchedModificationCount());
+ Collection> messagesToForward = convertPendingTransactionsToMessages();
if (!messagesToForward.isEmpty()) {
LOG.debug("{}: Forwarding {} pending transaction messages to leader {}", persistenceId(),
@@ -626,9 +625,8 @@ public class Shard extends RaftActor {
}
}
} else {
- commitCoordinator.abortPendingTransactions(
- "The transacton was aborted due to inflight leadership change and the leader address isn't available.",
- this);
+ commitCoordinator.abortPendingTransactions("The transacton was aborted due to inflight leadership "
+ + "change and the leader address isn't available.", this);
}
}
@@ -637,6 +635,16 @@ public class Shard extends RaftActor {
}
}
+ /**
+ * Clears all pending transactions and converts them to messages to be forwarded to a new leader.
+ *
+ * @return the converted messages
+ */
+ public Collection> convertPendingTransactionsToMessages() {
+ return commitCoordinator.convertPendingTransactionsToMessages(
+ datastoreContext.getShardBatchedModificationCount());
+ }
+
@Override
protected void pauseLeader(final Runnable operation) {
LOG.debug("{}: In pauseLeader, operation: {}", persistenceId(), operation);
@@ -671,7 +679,7 @@ public class Shard extends RaftActor {
return new Builder();
}
- public static abstract class AbstractBuilder, S extends Shard> {
+ public abstract static class AbstractBuilder, S extends Shard> {
private final Class shardClass;
private ShardIdentifier id;
private Map peerAddresses = Collections.emptyMap();
@@ -694,39 +702,39 @@ public class Shard extends RaftActor {
return (T) this;
}
- public T id(final ShardIdentifier id) {
+ public T id(final ShardIdentifier newId) {
checkSealed();
- this.id = id;
+ this.id = newId;
return self();
}
- public T peerAddresses(final Map peerAddresses) {
+ public T peerAddresses(final Map newPeerAddresses) {
checkSealed();
- this.peerAddresses = peerAddresses;
+ this.peerAddresses = newPeerAddresses;
return self();
}
- public T datastoreContext(final DatastoreContext datastoreContext) {
+ public T datastoreContext(final DatastoreContext newDatastoreContext) {
checkSealed();
- this.datastoreContext = datastoreContext;
+ this.datastoreContext = newDatastoreContext;
return self();
}
- public T schemaContext(final SchemaContext schemaContext) {
+ public T schemaContext(final SchemaContext newSchemaContext) {
checkSealed();
- this.schemaContext = schemaContext;
+ this.schemaContext = newSchemaContext;
return self();
}
- public T restoreFromSnapshot(final DatastoreSnapshot.ShardSnapshot restoreFromSnapshot) {
+ public T restoreFromSnapshot(final DatastoreSnapshot.ShardSnapshot newRestoreFromSnapshot) {
checkSealed();
- this.restoreFromSnapshot = restoreFromSnapshot;
+ this.restoreFromSnapshot = newRestoreFromSnapshot;
return self();
}
- public T dataTree(final TipProducingDataTree dataTree) {
+ public T dataTree(final TipProducingDataTree newDataTree) {
checkSealed();
- this.dataTree = dataTree;
+ this.dataTree = newDataTree;
return self();
}
@@ -756,13 +764,14 @@ public class Shard extends RaftActor {
public TreeType getTreeType() {
switch (datastoreContext.getLogicalStoreType()) {
- case CONFIGURATION:
- return TreeType.CONFIGURATION;
- case OPERATIONAL:
- return TreeType.OPERATIONAL;
+ case CONFIGURATION:
+ return TreeType.CONFIGURATION;
+ case OPERATIONAL:
+ return TreeType.OPERATIONAL;
+ default:
+ throw new IllegalStateException("Unhandled logical store type "
+ + datastoreContext.getLogicalStoreType());
}
-
- throw new IllegalStateException("Unhandled logical store type " + datastoreContext.getLogicalStoreType());
}
protected void verify() {