X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=1b12462a4ab8e4125788f45de3e76f9e1417f6a9;hp=bd44f0b41d3ba30006f750b35199a2dcd58d7f44;hb=edd61d79da614388134b0e0a618010c91e9c91bd;hpb=b70b396725749d3fd6ca761f02f4b630f6f4f1ce diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index bd44f0b41d..1b12462a4a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -16,12 +16,11 @@ import akka.serialization.Serialization; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; +import com.google.common.base.Ticker; import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.Map; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; @@ -39,7 +38,6 @@ import org.opendaylight.controller.cluster.datastore.messages.BatchedModificatio import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain; import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; -import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; @@ -52,8 +50,6 @@ import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeList import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; -import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload; -import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier; import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; @@ -67,9 +63,8 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.yangtools.concepts.Identifier; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; -import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; +import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.duration.Duration; @@ -134,7 +129,9 @@ public class Shard extends RaftActor { private final ShardTransactionMessageRetrySupport messageRetrySupport; - protected Shard(AbstractBuilder builder) { + private final FrontendMetadata frontendMetadata = new FrontendMetadata(); + + protected Shard(final AbstractBuilder builder) { super(builder.getId().toString(), builder.getPeerAddresses(), Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION); @@ -146,9 +143,17 @@ public class Shard extends RaftActor { LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent()); - store = new ShardDataTree(builder.getSchemaContext(), builder.getTreeType(), - new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher"), - new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher"), name); + ShardDataTreeChangeListenerPublisherActorProxy treeChangeListenerPublisher = + new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher"); + ShardDataChangeListenerPublisherActorProxy dataChangeListenerPublisher = + new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher"); + if(builder.getDataTree() != null) { + store = new ShardDataTree(this, builder.getSchemaContext(), builder.getDataTree(), + treeChangeListenerPublisher, dataChangeListenerPublisher, name); + } else { + store = new ShardDataTree(this, builder.getSchemaContext(), builder.getTreeType(), + treeChangeListenerPublisher, dataChangeListenerPublisher, name); + } shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(), datastoreContext.getDataStoreMXBeanType()); @@ -158,9 +163,7 @@ public class Shard extends RaftActor { getContext().become(new MeteringBehavior(this)); } - commitCoordinator = new ShardCommitCoordinator(store, - datastoreContext.getShardCommitQueueExpiryTimeoutInMillis(), - datastoreContext.getShardTransactionCommitQueueCapacity(), LOG, this.name); + commitCoordinator = new ShardCommitCoordinator(store, LOG, this.name); setTransactionCommitTimeout(); @@ -185,7 +188,7 @@ public class Shard extends RaftActor { datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS) / 2; } - private Optional createRoleChangeNotifier(String shardId) { + private Optional createRoleChangeNotifier(final String shardId) { ActorRef shardRoleChangeNotifier = this.getContext().actorOf( RoleChangeNotifier.getProps(shardId), shardId + "-notifier"); return Optional.of(shardRoleChangeNotifier); @@ -199,7 +202,7 @@ public class Shard extends RaftActor { messageRetrySupport.close(); - if(txCommitTimeoutCheckSchedule != null) { + if (txCommitTimeoutCheckSchedule != null) { txCommitTimeoutCheckSchedule.cancel(); } @@ -255,24 +258,25 @@ public class Shard extends RaftActor { setPeerAddress(resolved.getPeerId().toString(), resolved.getPeerAddress()); } else if (TX_COMMIT_TIMEOUT_CHECK_MESSAGE.equals(message)) { + store.checkForExpiredTransactions(transactionCommitTimeout); commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this); - } else if(message instanceof DatastoreContext) { + } else if (message instanceof DatastoreContext) { onDatastoreContext((DatastoreContext)message); - } else if(message instanceof RegisterRoleChangeListener){ + } else if (message instanceof RegisterRoleChangeListener){ roleChangeNotifier.get().forward(message, context()); } else if (message instanceof FollowerInitialSyncUpStatus) { shardMBean.setFollowerInitialSyncStatus(((FollowerInitialSyncUpStatus) message).isInitialSyncDone()); context().parent().tell(message, self()); - } else if(GET_SHARD_MBEAN_MESSAGE.equals(message)){ + } else if (GET_SHARD_MBEAN_MESSAGE.equals(message)){ sender().tell(getShardMBean(), self()); - } else if(message instanceof GetShardDataTree) { + } else if (message instanceof GetShardDataTree) { sender().tell(store.getDataTree(), self()); - } else if(message instanceof ServerRemoved){ + } else if (message instanceof ServerRemoved){ context().parent().forward(message, context()); - } else if(ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) { + } else if (ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) { messageRetrySupport.onTimerMessage(message); } else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand) { - commitCoordinator.processCohortRegistryCommand(getSender(), + store.processCohortRegistryCommand(getSender(), (DataTreeCohortActorRegistry.CohortRegistryCommand) message); } else { super.handleNonRaftCommand(message); @@ -285,7 +289,7 @@ public class Shard extends RaftActor { } public int getPendingTxCommitQueueSize() { - return commitCoordinator.getQueueSize(); + return store.getQueueSize(); } public int getCohortCacheSize() { @@ -298,61 +302,40 @@ public class Shard extends RaftActor { } @Override - protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId, short leaderPayloadVersion) { + protected LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId, final short leaderPayloadVersion) { return isLeader() ? new ShardLeaderStateChanged(memberId, leaderId, store.getDataTree(), leaderPayloadVersion) : new ShardLeaderStateChanged(memberId, leaderId, leaderPayloadVersion); } - protected void onDatastoreContext(DatastoreContext context) { + protected void onDatastoreContext(final DatastoreContext context) { datastoreContext = context; - commitCoordinator.setQueueCapacity(datastoreContext.getShardTransactionCommitQueueCapacity()); - setTransactionCommitTimeout(); - if(datastoreContext.isPersistent() && !persistence().isRecoveryApplicable()) { + if (datastoreContext.isPersistent() && !persistence().isRecoveryApplicable()) { setPersistence(true); - } else if(!datastoreContext.isPersistent() && persistence().isRecoveryApplicable()) { + } else if (!datastoreContext.isPersistent() && persistence().isRecoveryApplicable()) { setPersistence(false); } updateConfigParams(datastoreContext.getShardRaftConfig()); } - private static boolean isEmptyCommit(final DataTreeCandidate candidate) { - return ModificationType.UNMODIFIED.equals(candidate.getRootNode().getModificationType()); + boolean canSkipPayload() { + // If we do not have any followers and we are not using persistence we can apply modification to the state + // immediately + return !hasFollowers() && !persistence().isRecoveryApplicable(); } - void continueCommit(final CohortEntry cohortEntry) { - final DataTreeCandidate candidate = cohortEntry.getCandidate(); - final TransactionIdentifier transactionId = cohortEntry.getTransactionID(); - - // If we do not have any followers and we are not using persistence - // or if cohortEntry has no modifications - // we can apply modification to the state immediately - if ((!hasFollowers() && !persistence().isRecoveryApplicable()) || isEmptyCommit(candidate)) { - applyModificationToState(cohortEntry.getReplySender(), transactionId, candidate); - return; - } - - final Payload payload; - try { - payload = CommitTransactionPayload.create(transactionId, candidate); - } catch (IOException e) { - LOG.error("{}: failed to encode transaction {} candidate {}", persistenceId(), transactionId, candidate, - e); - // TODO: do we need to do something smarter here? - throw Throwables.propagate(e); - } - - persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), payload); + // applyState() will be invoked once consensus is reached on the payload + void persistPayload(final TransactionIdentifier transactionId, final Payload payload) { + // We are faking the sender + persistData(self(), transactionId, payload); } private void handleCommitTransaction(final CommitTransaction commit) { if (isLeader()) { - if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) { - shardMBean.incrementFailedTransactionsCount(); - } + commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this); } else { ActorSelection leader = getLeader(); if (leader == null) { @@ -365,86 +348,6 @@ public class Shard extends RaftActor { } } - private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final Identifier transactionID, - @Nonnull final CohortEntry cohortEntry) { - LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID()); - - try { - try { - cohortEntry.commit(); - } catch(ExecutionException e) { - // We may get a "store tree and candidate base differ" IllegalStateException from commit under - // certain edge case scenarios so we'll try to re-apply the candidate from scratch as a last - // resort. Eg, we're a follower and a tx payload is replicated but the leader goes down before - // applying it to the state. We then become the leader and a second tx is pre-committed and - // replicated. When consensus occurs, this will cause the first tx to be applied as a foreign - // candidate via applyState prior to the second tx. Since the second tx has already been - // pre-committed, when it gets here to commit it will get an IllegalStateException. - - // FIXME - this is not an ideal way to handle this scenario. This is temporary - a cleaner - // solution will be forthcoming. - if(e.getCause() instanceof IllegalStateException) { - LOG.debug("{}: commit failed for transaction {} - retrying as foreign candidate", persistenceId(), - transactionID, e); - store.applyForeignCandidate(transactionID, cohortEntry.getCandidate()); - } else { - throw e; - } - } - - sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), getSelf()); - - shardMBean.incrementCommittedTransactionCount(); - shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis()); - - } catch (Exception e) { - sender.tell(new akka.actor.Status.Failure(e), getSelf()); - - LOG.error("{}, An exception occurred while committing transaction {}", persistenceId(), - transactionID, e); - shardMBean.incrementFailedTransactionsCount(); - } finally { - commitCoordinator.currentTransactionComplete(transactionID, true); - } - } - - private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull Identifier transactionID) { - // With persistence enabled, this method is called via applyState by the leader strategy - // after the commit has been replicated to a majority of the followers. - - CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID); - if (cohortEntry == null) { - // The transaction is no longer the current commit. This can happen if the transaction - // was aborted prior, most likely due to timeout in the front-end. We need to finish - // committing the transaction though since it was successfully persisted and replicated - // however we can't use the original cohort b/c it was already preCommitted and may - // conflict with the current commit or may have been aborted so we commit with a new - // transaction. - cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID); - if(cohortEntry != null) { - try { - store.applyForeignCandidate(transactionID, cohortEntry.getCandidate()); - } catch (DataValidationFailedException e) { - shardMBean.incrementFailedTransactionsCount(); - LOG.error("{}: Failed to re-apply transaction {}", persistenceId(), transactionID, e); - } - - sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), - getSelf()); - } else { - // This really shouldn't happen - it likely means that persistence or replication - // took so long to complete such that the cohort entry was expired from the cache. - IllegalStateException ex = new IllegalStateException( - String.format("%s: Could not finish committing transaction %s - no CohortEntry found", - persistenceId(), transactionID)); - LOG.error(ex.getMessage()); - sender.tell(new akka.actor.Status.Failure(ex), getSelf()); - } - } else { - finishCommit(sender, transactionID, cohortEntry); - } - } - private void handleCanCommitTransaction(final CanCommitTransaction canCommit) { LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID()); @@ -462,7 +365,7 @@ public class Shard extends RaftActor { } } - protected void handleBatchedModificationsLocal(BatchedModifications batched, ActorRef sender) { + protected void handleBatchedModificationsLocal(final BatchedModifications batched, final ActorRef sender) { try { commitCoordinator.handleBatchedModifications(batched, sender, this); } catch (Exception e) { @@ -472,7 +375,7 @@ public class Shard extends RaftActor { } } - private void handleBatchedModifications(BatchedModifications batched) { + private void handleBatchedModifications(final BatchedModifications batched) { // This message is sent to prepare the modifications transaction directly on the Shard as an // optimization to avoid the extra overhead of a separate ShardTransaction actor. On the last // BatchedModifications message, the caller sets the ready flag in the message indicating @@ -504,15 +407,15 @@ public class Shard extends RaftActor { LOG.debug("{}: Forwarding {} BatchedModifications to leader {}", persistenceId(), newModifications.size(), leader); - for(BatchedModifications bm: newModifications) { + for (BatchedModifications bm : newModifications) { leader.forward(bm, getContext()); } } } } - private boolean failIfIsolatedLeader(ActorRef sender) { - if(isIsolatedLeader()) { + private boolean failIfIsolatedLeader(final ActorRef sender) { + if (isIsolatedLeader()) { sender.tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format( "Shard %s was the leader but has lost contact with all of its followers. Either all" + " other follower nodes are down or this node is isolated by a network partition.", @@ -552,13 +455,12 @@ public class Shard extends RaftActor { } } - private void handleForwardedReadyTransaction(ForwardedReadyTransaction forwardedReady) { + private void handleForwardedReadyTransaction(final ForwardedReadyTransaction forwardedReady) { LOG.debug("{}: handleForwardedReadyTransaction for {}", persistenceId(), forwardedReady.getTransactionID()); boolean isLeaderActive = isLeaderActive(); if (isLeader() && isLeaderActive) { - commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this, - store.getSchemaContext()); + commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this); } else { ActorSelection leader = getLeader(); if (!isLeaderActive || leader == null) { @@ -598,9 +500,9 @@ public class Shard extends RaftActor { store.closeTransactionChain(closeTransactionChain.getIdentifier()); } - private void createTransaction(CreateTransaction createTransaction) { + private void createTransaction(final CreateTransaction createTransaction) { try { - if(TransactionType.fromInt(createTransaction.getTransactionType()) != TransactionType.READ_ONLY && + if (TransactionType.fromInt(createTransaction.getTransactionType()) != TransactionType.READ_ONLY && failIfIsolatedLeader(getSender())) { return; } @@ -615,25 +517,12 @@ public class Shard extends RaftActor { } } - private ActorRef createTransaction(int transactionType, TransactionIdentifier transactionId) { + private ActorRef createTransaction(final int transactionType, final TransactionIdentifier transactionId) { LOG.debug("{}: Creating transaction : {} ", persistenceId(), transactionId); return transactionActorFactory.newShardTransaction(TransactionType.fromInt(transactionType), transactionId); } - private void commitWithNewTransaction(final BatchedModifications modification) { - ReadWriteShardDataTreeTransaction tx = store.newReadWriteTransaction(modification.getTransactionID()); - modification.apply(tx.getSnapshot()); - try { - snapshotCohort.syncCommitTransaction(tx); - shardMBean.incrementCommittedTransactionCount(); - shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis()); - } catch (Exception e) { - shardMBean.incrementFailedTransactionsCount(); - LOG.error("{}: Failed to commit", persistenceId(), e); - } - } - private void updateSchemaContext(final UpdateSchemaContext message) { updateSchemaContext(message.getSchemaContext()); } @@ -669,7 +558,7 @@ public class Shard extends RaftActor { getContext().parent().tell(new ActorInitialized(), getSelf()); // Being paranoid here - this method should only be called once but just in case... - if(txCommitTimeoutCheckSchedule == null) { + if (txCommitTimeoutCheckSchedule == null) { // Schedule a message to be periodically sent to check if the current in-progress // transaction should be expired and aborted. FiniteDuration period = Duration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS); @@ -681,40 +570,14 @@ public class Shard extends RaftActor { @Override protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) { - if (data instanceof DataTreeCandidateSupplier) { - if (clientActor == null) { - // No clientActor indicates a replica coming from the leader - try { - store.applyForeignCandidate(identifier, ((DataTreeCandidateSupplier)data).getCandidate().getValue()); - } catch (DataValidationFailedException | IOException e) { - LOG.error("{}: Error applying replica {}", persistenceId(), identifier, e); - } - } else { - // Replication consensus reached, proceed to commit - finishCommit(clientActor, identifier); + if (data instanceof Payload) { + try { + store.applyReplicatedPayload(identifier, (Payload)data); + } catch (DataValidationFailedException | IOException e) { + LOG.error("{}: Error applying replica {}", persistenceId(), identifier, e); } } else { - LOG.error("{}: Unknown state received {} ClassLoader {}", persistenceId(), data, - data.getClass().getClassLoader()); - } - } - - private void applyModificationToState(ActorRef clientActor, Identifier identifier, Object modification) { - if(modification == null) { - LOG.error( - "{}: modification is null - this is very unexpected, clientActor = {}, identifier = {}", - persistenceId(), identifier, clientActor != null ? clientActor.path().toString() : null); - } else if(clientActor == null) { - // There's no clientActor to which to send a commit reply so we must be applying - // replicated state from the leader. - - // The only implementation we know of is BatchedModifications, which also carries a transaction - // identifier -- which we really need that. - Preconditions.checkArgument(modification instanceof BatchedModifications); - commitWithNewTransaction((BatchedModifications)modification); - } else { - // This must be the OK to commit after replication consensus. - finishCommit(clientActor, identifier); + LOG.error("{}: Unknown state for {} received {}", persistenceId(), identifier, data); } } @@ -727,7 +590,7 @@ public class Shard extends RaftActor { // If this actor is no longer the leader close all the transaction chains if (!isLeader) { - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug( "{}: onStateChanged: Closing all transaction chains because shard {} is no longer the leader", persistenceId(), getId()); @@ -736,29 +599,29 @@ public class Shard extends RaftActor { store.closeAllTransactionChains(); } - if(hasLeader && !isIsolatedLeader()) { + if (hasLeader && !isIsolatedLeader()) { messageRetrySupport.retryMessages(); } } @Override - protected void onLeaderChanged(String oldLeader, String newLeader) { + protected void onLeaderChanged(final String oldLeader, final String newLeader) { shardMBean.incrementLeadershipChangeCount(); boolean hasLeader = hasLeader(); - if(hasLeader && !isLeader()) { + if (hasLeader && !isLeader()) { // Another leader was elected. If we were the previous leader and had pending transactions, convert // them to transaction messages and send to the new leader. ActorSelection leader = getLeader(); - if(leader != null) { - Collection messagesToForward = commitCoordinator.convertPendingTransactionsToMessages( - datastoreContext.getShardBatchedModificationCount()); + if (leader != null) { + Collection messagesToForward = commitCoordinator.convertPendingTransactionsToMessages( + datastoreContext.getShardBatchedModificationCount()); - if(!messagesToForward.isEmpty()) { + if (!messagesToForward.isEmpty()) { LOG.debug("{}: Forwarding {} pending transaction messages to leader {}", persistenceId(), messagesToForward.size(), leader); - for(Object message: messagesToForward) { + for (Object message : messagesToForward) { leader.tell(message, self()); } } @@ -769,15 +632,15 @@ public class Shard extends RaftActor { } } - if(hasLeader && !isIsolatedLeader()) { + if (hasLeader && !isIsolatedLeader()) { messageRetrySupport.retryMessages(); } } @Override - protected void pauseLeader(Runnable operation) { + protected void pauseLeader(final Runnable operation) { LOG.debug("{}: In pauseLeader, operation: {}", persistenceId(), operation); - commitCoordinator.setRunOnPendingTransactionsComplete(operation); + store.setRunOnPendingTransactionsComplete(operation); } @Override @@ -815,9 +678,10 @@ public class Shard extends RaftActor { private DatastoreContext datastoreContext; private SchemaContext schemaContext; private DatastoreSnapshot.ShardSnapshot restoreFromSnapshot; + private TipProducingDataTree dataTree; private volatile boolean sealed; - protected AbstractBuilder(Class shardClass) { + protected AbstractBuilder(final Class shardClass) { this.shardClass = shardClass; } @@ -830,36 +694,42 @@ public class Shard extends RaftActor { return (T) this; } - public T id(ShardIdentifier id) { + public T id(final ShardIdentifier id) { checkSealed(); this.id = id; return self(); } - public T peerAddresses(Map peerAddresses) { + public T peerAddresses(final Map peerAddresses) { checkSealed(); this.peerAddresses = peerAddresses; return self(); } - public T datastoreContext(DatastoreContext datastoreContext) { + public T datastoreContext(final DatastoreContext datastoreContext) { checkSealed(); this.datastoreContext = datastoreContext; return self(); } - public T schemaContext(SchemaContext schemaContext) { + public T schemaContext(final SchemaContext schemaContext) { checkSealed(); this.schemaContext = schemaContext; return self(); } - public T restoreFromSnapshot(DatastoreSnapshot.ShardSnapshot restoreFromSnapshot) { + public T restoreFromSnapshot(final DatastoreSnapshot.ShardSnapshot restoreFromSnapshot) { checkSealed(); this.restoreFromSnapshot = restoreFromSnapshot; return self(); } + public T dataTree(final TipProducingDataTree dataTree) { + checkSealed(); + this.dataTree = dataTree; + return self(); + } + public ShardIdentifier getId() { return id; } @@ -880,6 +750,10 @@ public class Shard extends RaftActor { return restoreFromSnapshot; } + public TipProducingDataTree getDataTree() { + return dataTree; + } + public TreeType getTreeType() { switch (datastoreContext.getLogicalStoreType()) { case CONFIGURATION: @@ -910,4 +784,8 @@ public class Shard extends RaftActor { super(Shard.class); } } + + Ticker ticker() { + return Ticker.systemTicker(); + } }