X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=ddb5989f096145cddd3b716afa8dfc1dce3311e3;hp=f3f8b8b193c78a5511c02ec6ede526f632c3f172;hb=9e59cc0d824e6752a7a3f3ba092abaaf3c1d4193;hpb=62eeafe8db02d51d0d8c47db2cc07bee23f618b8 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index f3f8b8b193..ddb5989f09 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -20,6 +20,7 @@ import akka.serialization.Serialization; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -35,7 +36,6 @@ import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; -import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.EnableNotification; import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction; @@ -48,12 +48,11 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec; -import org.opendaylight.controller.cluster.raft.ConfigParams; -import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.cluster.raft.RaftActor; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; @@ -68,14 +67,12 @@ import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; -import scala.concurrent.duration.FiniteDuration; - import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; /** * A Shard represents a portion of the logical data tree
@@ -85,8 +82,6 @@ import java.util.concurrent.TimeUnit; */ public class Shard extends RaftActor { - private static final ConfigParams configParams = new ShardConfigParams(); - public static final String DEFAULT_NAME = "default"; // The state of this Shard @@ -115,11 +110,18 @@ public class Shard extends RaftActor { private ActorRef createSnapshotTransaction; + /** + * Coordinates persistence recovery on startup. + */ + private ShardRecoveryCoordinator recoveryCoordinator; + private List currentLogRecoveryBatch; + private final Map transactionChains = new HashMap<>(); - private Shard(ShardIdentifier name, Map peerAddresses, + protected Shard(ShardIdentifier name, Map peerAddresses, DatastoreContext datastoreContext, SchemaContext schemaContext) { - super(name.toString(), mapPeerAddresses(peerAddresses), Optional.of(configParams)); + super(name.toString(), mapPeerAddresses(peerAddresses), + Optional.of(datastoreContext.getShardRaftConfig())); this.name = name; this.datastoreContext = datastoreContext; @@ -172,8 +174,11 @@ public class Shard extends RaftActor { } @Override public void onReceiveRecover(Object message) { - LOG.debug("onReceiveRecover: Received message {} from {}", message.getClass().toString(), - getSender()); + if(LOG.isDebugEnabled()) { + LOG.debug("onReceiveRecover: Received message {} from {}", + message.getClass().toString(), + getSender()); + } if (message instanceof RecoveryFailure){ LOG.error(((RecoveryFailure) message).cause(), "Recovery failed because of this cause"); @@ -183,8 +188,11 @@ public class Shard extends RaftActor { } @Override public void onReceiveCommand(Object message) { - LOG.debug("onReceiveCommand: Received message {} from {}", message.getClass().toString(), - getSender()); + if(LOG.isDebugEnabled()) { + LOG.debug("onReceiveCommand: Received message {} from {}", + message.getClass().toString(), + getSender()); + } if(message.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) { // This must be for install snapshot. Don't want to open this up and trigger @@ -299,7 +307,9 @@ public class Shard extends RaftActor { ShardTransactionIdentifier.builder() .remoteTransactionId(remoteTransactionId) .build(); - LOG.debug("Creating transaction : {} ", transactionId); + if(LOG.isDebugEnabled()) { + LOG.debug("Creating transaction : {} ", transactionId); + } ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId, transactionChainId); @@ -326,40 +336,22 @@ public class Shard extends RaftActor { DOMStoreThreePhaseCommitCohort cohort = modificationToCohort.remove(serialized); if (cohort == null) { - LOG.debug( - "Could not find cohort for modification : {}. Writing modification using a new transaction", - modification); - DOMStoreWriteTransaction transaction = - store.newWriteOnlyTransaction(); - - LOG.debug("Created new transaction {}", transaction.getIdentifier().toString()); - - modification.apply(transaction); - try { - syncCommitTransaction(transaction); - } catch (InterruptedException | ExecutionException e) { - shardMBean.incrementFailedTransactionsCount(); - LOG.error("Failed to commit", e); - return; - } - //we want to just apply the recovery commit and return - shardMBean.incrementCommittedTransactionCount(); + // If there's no cached cohort then we must be applying replicated state. + commitWithNewTransaction(serialized); return; } - - if(sender == null){ + if(sender == null) { LOG.error("Commit failed. Sender cannot be null"); return; } - final ListenableFuture future = cohort.commit(); - final ActorRef self = getSelf(); + ListenableFuture future = cohort.commit(); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(Void v) { - sender.tell(new CommitTransactionReply().toSerializable(), self); + sender.tell(new CommitTransactionReply().toSerializable(), getSelf()); shardMBean.incrementCommittedTransactionCount(); shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis()); } @@ -368,12 +360,24 @@ public class Shard extends RaftActor { public void onFailure(Throwable t) { LOG.error(t, "An exception happened during commit"); shardMBean.incrementFailedTransactionsCount(); - sender.tell(new akka.actor.Status.Failure(t), self); + sender.tell(new akka.actor.Status.Failure(t), getSelf()); } }); } + private void commitWithNewTransaction(Object modification) { + DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction(); + MutableCompositeModification.fromSerializable(modification, schemaContext).apply(tx); + try { + syncCommitTransaction(tx); + shardMBean.incrementCommittedTransactionCount(); + } catch (InterruptedException | ExecutionException e) { + shardMBean.incrementFailedTransactionsCount(); + LOG.error(e, "Failed to commit"); + } + } + private void handleForwardedCommit(ForwardedCommitTransaction message) { Object serializedModification = message.getModification().toSerializable(); @@ -402,8 +406,10 @@ public class Shard extends RaftActor { private void registerChangeListener( RegisterChangeListener registerChangeListener) { - LOG.debug("registerDataChangeListener for {}", registerChangeListener - .getPath()); + if(LOG.isDebugEnabled()) { + LOG.debug("registerDataChangeListener for {}", registerChangeListener + .getPath()); + } ActorSelection dataChangeListenerPath = getContext() @@ -431,48 +437,118 @@ public class Shard extends RaftActor { getContext().actorOf( DataChangeListenerRegistration.props(registration)); - LOG.debug( - "registerDataChangeListener sending reply, listenerRegistrationPath = {} " - , listenerRegistration.path().toString()); + if(LOG.isDebugEnabled()) { + LOG.debug( + "registerDataChangeListener sending reply, listenerRegistrationPath = {} " + , listenerRegistration.path().toString()); + } getSender() .tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf()); } - private void createTransactionChain() { - DOMStoreTransactionChain chain = store.createTransactionChain(); - ActorRef transactionChain = getContext().actorOf( - ShardTransactionChain.props(chain, schemaContext, datastoreContext, shardMBean)); - getSender().tell(new CreateTransactionChainReply(transactionChain.path()).toSerializable(), - getSelf()); - } - private boolean isMetricsCaptureEnabled(){ CommonConfig config = new CommonConfig(getContext().system().settings().config()); return config.isMetricCaptureEnabled(); } - @Override protected void applyState(ActorRef clientActor, String identifier, - Object data) { + @Override + protected + void startLogRecoveryBatch(int maxBatchSize) { + currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize); + + if(LOG.isDebugEnabled()) { + LOG.debug("{} : starting log recovery batch with max size {}", persistenceId(), maxBatchSize); + } + } + + @Override + protected void appendRecoveredLogEntry(Payload data) { + if (data instanceof CompositeModificationPayload) { + currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification()); + } else { + LOG.error("Unknown state received {} during recovery", data); + } + } + + @Override + protected void applyRecoverySnapshot(ByteString snapshot) { + if(recoveryCoordinator == null) { + recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext); + } + + recoveryCoordinator.submit(snapshot, store.newWriteOnlyTransaction()); + + if(LOG.isDebugEnabled()) { + LOG.debug("{} : submitted recovery sbapshot", persistenceId()); + } + } + + @Override + protected void applyCurrentLogRecoveryBatch() { + if(recoveryCoordinator == null) { + recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext); + } + + recoveryCoordinator.submit(currentLogRecoveryBatch, store.newWriteOnlyTransaction()); + + if(LOG.isDebugEnabled()) { + LOG.debug("{} : submitted log recovery batch with size {}", persistenceId(), + currentLogRecoveryBatch.size()); + } + } + + @Override + protected void onRecoveryComplete() { + if(recoveryCoordinator != null) { + Collection txList = recoveryCoordinator.getTransactions(); + + if(LOG.isDebugEnabled()) { + LOG.debug("{} : recovery complete - committing {} Tx's", persistenceId(), txList.size()); + } + + for(DOMStoreWriteTransaction tx: txList) { + try { + syncCommitTransaction(tx); + shardMBean.incrementCommittedTransactionCount(); + } catch (InterruptedException | ExecutionException e) { + shardMBean.incrementFailedTransactionsCount(); + LOG.error(e, "Failed to commit"); + } + } + } + + recoveryCoordinator = null; + currentLogRecoveryBatch = null; + updateJournalStats(); + } + + @Override + protected void applyState(ActorRef clientActor, String identifier, Object data) { if (data instanceof CompositeModificationPayload) { - Object modification = - ((CompositeModificationPayload) data).getModification(); + Object modification = ((CompositeModificationPayload) data).getModification(); if (modification != null) { commit(clientActor, modification); } else { LOG.error( "modification is null - this is very unexpected, clientActor = {}, identifier = {}", - identifier, clientActor.path().toString()); + identifier, clientActor != null ? clientActor.path().toString() : null); } } else { - LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}", data, data.getClass().getClassLoader(), CompositeModificationPayload.class.getClassLoader()); + LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}", + data, data.getClass().getClassLoader(), + CompositeModificationPayload.class.getClassLoader()); } - // Update stats + updateJournalStats(); + + } + + private void updateJournalStats() { ReplicatedLogEntry lastLogEntry = getLastLogEntry(); if (lastLogEntry != null) { @@ -482,10 +558,10 @@ public class Shard extends RaftActor { shardMBean.setCommitIndex(getCommitIndex()); shardMBean.setLastApplied(getLastApplied()); - } - @Override protected void createSnapshot() { + @Override + protected void createSnapshot() { if (createSnapshotTransaction == null) { // Create a transaction. We are really going to treat the transaction as a worker @@ -500,7 +576,9 @@ public class Shard extends RaftActor { } } - @VisibleForTesting @Override protected void applySnapshot(ByteString snapshot) { + @VisibleForTesting + @Override + protected void applySnapshot(ByteString snapshot) { // Since this will be done only on Recovery or when this actor is a Follower // we can safely commit everything in here. We not need to worry about event notifications // as they would have already been disabled on the follower @@ -531,14 +609,17 @@ public class Shard extends RaftActor { .tell(new EnableNotification(isLeader()), getSelf()); } - shardMBean.setRaftState(getRaftState().name()); shardMBean.setCurrentTerm(getCurrentTerm()); // If this actor is no longer the leader close all the transaction chains if(!isLeader()){ for(Map.Entry entry : transactionChains.entrySet()){ - LOG.debug("onStateChanged: Closing transaction chain {} because shard {} is no longer the leader", entry.getKey(), getId()); + if(LOG.isDebugEnabled()) { + LOG.debug( + "onStateChanged: Closing transaction chain {} because shard {} is no longer the leader", + entry.getKey(), getId()); + } entry.getValue().close(); } @@ -554,16 +635,6 @@ public class Shard extends RaftActor { return this.name.toString(); } - - private static class ShardConfigParams extends DefaultConfigParamsImpl { - public static final FiniteDuration HEART_BEAT_INTERVAL = - new FiniteDuration(500, TimeUnit.MILLISECONDS); - - @Override public FiniteDuration getHeartBeatInterval() { - return HEART_BEAT_INTERVAL; - } - } - private static class ShardCreator implements Creator { private static final long serialVersionUID = 1L; @@ -587,20 +658,24 @@ public class Shard extends RaftActor { } } - @VisibleForTesting NormalizedNode readStore() throws ExecutionException, InterruptedException { + @VisibleForTesting + NormalizedNode readStore(YangInstanceIdentifier id) + throws ExecutionException, InterruptedException { DOMStoreReadTransaction transaction = store.newReadOnlyTransaction(); CheckedFuture>, ReadFailedException> future = - transaction.read(YangInstanceIdentifier.builder().build()); + transaction.read(id); - NormalizedNode node = future.get().get(); + Optional> optional = future.get(); + NormalizedNode node = optional.isPresent()? optional.get() : null; transaction.close(); return node; } - @VisibleForTesting void writeToStore(YangInstanceIdentifier id, NormalizedNode node) + @VisibleForTesting + void writeToStore(YangInstanceIdentifier id, NormalizedNode node) throws ExecutionException, InterruptedException { DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction(); @@ -609,4 +684,8 @@ public class Shard extends RaftActor { syncCommitTransaction(transaction); } + @VisibleForTesting + ShardStats getShardMBean() { + return shardMBean; + } }