X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=9cd758ba30fdb94e85cd1703d99a8e0c55a50a17;hb=d13a643d14259975c33793335b73a7ad26c470eb;hp=1bf32e7fca7191236afeeb2c7ed0048b28db499c;hpb=3def254d3e8d2f24038ddfb7d1b1749ca2135fe2;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 1bf32e7fca..9cd758ba30 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -8,6 +8,23 @@ package org.opendaylight.controller.cluster.datastore; +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.Cancellable; +import akka.actor.Props; +import akka.event.Logging; +import akka.event.LoggingAdapter; +import akka.japi.Creator; +import akka.persistence.RecoveryFailure; +import akka.serialization.Serialization; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import java.io.IOException; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -32,29 +49,29 @@ import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransacti import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain; import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.EnableNotification; import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; -import org.opendaylight.controller.cluster.datastore.messages.ReadData; -import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.modification.Modification; +import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; -import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec; +import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; +import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; import org.opendaylight.controller.cluster.raft.RaftActor; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory; -import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory; @@ -65,25 +82,6 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; -import akka.actor.ActorRef; -import akka.actor.ActorSelection; -import akka.actor.Cancellable; -import akka.actor.PoisonPill; -import akka.actor.Props; -import akka.event.Logging; -import akka.event.LoggingAdapter; -import akka.japi.Creator; -import akka.persistence.RecoveryFailure; -import akka.serialization.Serialization; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; /** * A Shard represents a portion of the logical data tree
@@ -93,13 +91,12 @@ import com.google.protobuf.InvalidProtocolBufferException; */ public class Shard extends RaftActor { - private static final int HELIUM_1_TX_VERSION = 1; - - private static final Object COMMIT_TRANSACTION_REPLY = new CommitTransactionReply().toSerializable(); + private static final YangInstanceIdentifier DATASTORE_ROOT = YangInstanceIdentifier.builder().build(); private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck"; - public static final String DEFAULT_NAME = "default"; + @VisibleForTesting + static final String DEFAULT_NAME = "default"; // The state of this Shard private final InMemoryDOMDataStore store; @@ -123,8 +120,6 @@ public class Shard extends RaftActor { private SchemaContext schemaContext; - private ActorRef createSnapshotTransaction; - private int createSnapshotTransactionCounter; private final ShardCommitCoordinator commitCoordinator; @@ -133,6 +128,8 @@ public class Shard extends RaftActor { private Cancellable txCommitTimeoutCheckSchedule; + private final Optional roleChangeNotifier; + /** * Coordinates persistence recovery on startup. */ @@ -141,8 +138,8 @@ public class Shard extends RaftActor { private final Map transactionChains = new HashMap<>(); - protected Shard(ShardIdentifier name, Map peerAddresses, - DatastoreContext datastoreContext, SchemaContext schemaContext) { + protected Shard(final ShardIdentifier name, final Map peerAddresses, + final DatastoreContext datastoreContext, final SchemaContext schemaContext) { super(name.toString(), mapPeerAddresses(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig())); @@ -162,7 +159,6 @@ public class Shard extends RaftActor { shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(), datastoreContext.getDataStoreMXBeanType()); - shardMBean.setDataStoreExecutor(store.getDomStoreExecutor()); shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager()); if (isMetricsCaptureEnabled()) { @@ -174,10 +170,13 @@ public class Shard extends RaftActor { transactionCommitTimeout = TimeUnit.MILLISECONDS.convert( datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS); + + // create a notifier actor for each cluster member + roleChangeNotifier = createRoleChangeNotifier(name.toString()); } private static Map mapPeerAddresses( - Map peerAddresses) { + final Map peerAddresses) { Map map = new HashMap<>(); for (Map.Entry entry : peerAddresses @@ -190,7 +189,7 @@ public class Shard extends RaftActor { public static Props props(final ShardIdentifier name, final Map peerAddresses, - DatastoreContext datastoreContext, SchemaContext schemaContext) { + final DatastoreContext datastoreContext, final SchemaContext schemaContext) { Preconditions.checkNotNull(name, "name should not be null"); Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null"); Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null"); @@ -199,6 +198,12 @@ public class Shard extends RaftActor { return Props.create(new ShardCreator(name, peerAddresses, datastoreContext, schemaContext)); } + private Optional createRoleChangeNotifier(String shardId) { + ActorRef shardRoleChangeNotifier = this.getContext().actorOf( + RoleChangeNotifier.getProps(shardId), shardId + "-notifier"); + return Optional.of(shardRoleChangeNotifier); + } + @Override public void postStop() { super.postStop(); @@ -209,7 +214,7 @@ public class Shard extends RaftActor { } @Override - public void onReceiveRecover(Object message) throws Exception { + public void onReceiveRecover(final Object message) throws Exception { if(LOG.isDebugEnabled()) { LOG.debug("onReceiveRecover: Received message {} from {}", message.getClass().toString(), @@ -228,14 +233,12 @@ public class Shard extends RaftActor { } @Override - public void onReceiveCommand(Object message) throws Exception { + public void onReceiveCommand(final Object message) throws Exception { if(LOG.isDebugEnabled()) { LOG.debug("onReceiveCommand: Received message {} from {}", message, getSender()); } - if(message.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) { - handleReadDataReply(message); - } else if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) { + if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) { handleCreateTransaction(message); } else if(message instanceof ForwardedReadyTransaction) { handleForwardedReadyTransaction((ForwardedReadyTransaction)message); @@ -262,6 +265,11 @@ public class Shard extends RaftActor { } } + @Override + protected Optional getRoleChangeNotifier() { + return roleChangeNotifier; + } + private void handleTransactionCommitTimeoutCheck() { CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry(); if(cohortEntry != null) { @@ -275,7 +283,7 @@ public class Shard extends RaftActor { } } - private void handleCommitTransaction(CommitTransaction commit) { + private void handleCommitTransaction(final CommitTransaction commit) { final String transactionID = commit.getTransactionID(); LOG.debug("Committing transaction {}", transactionID); @@ -306,9 +314,15 @@ public class Shard extends RaftActor { // currently uses a same thread executor anyway. cohortEntry.getCohort().preCommit().get(); - Shard.this.persistData(getSender(), transactionID, - new CompositeModificationPayload(cohortEntry.getModification().toSerializable())); - } catch (InterruptedException | ExecutionException e) { + // If we do not have any followers and we are not using persistence we can + // apply modification to the state immediately + if(!hasFollowers() && !persistence().isRecoveryApplicable()){ + applyModificationToState(getSender(), transactionID, cohortEntry.getModification()); + } else { + Shard.this.persistData(getSender(), transactionID, + new ModificationPayload(cohortEntry.getModification())); + } + } catch (InterruptedException | ExecutionException | IOException e) { LOG.error(e, "An exception occurred while preCommitting transaction {}", cohortEntry.getTransactionID()); shardMBean.incrementFailedTransactionsCount(); @@ -333,7 +347,7 @@ public class Shard extends RaftActor { cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID); if(cohortEntry != null) { commitWithNewTransaction(cohortEntry.getModification()); - sender.tell(COMMIT_TRANSACTION_REPLY, getSelf()); + sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf()); } else { // This really shouldn't happen - it likely means that persistence or replication // took so long to complete such that the cohort entry was expired from the cache. @@ -355,7 +369,7 @@ public class Shard extends RaftActor { // currently uses a same thread executor anyway. cohortEntry.getCohort().commit().get(); - sender.tell(COMMIT_TRANSACTION_REPLY, getSelf()); + sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf()); shardMBean.incrementCommittedTransactionCount(); shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis()); @@ -370,7 +384,7 @@ public class Shard extends RaftActor { commitCoordinator.currentTransactionComplete(transactionID, true); } - private void handleCanCommitTransaction(CanCommitTransaction canCommit) { + private void handleCanCommitTransaction(final CanCommitTransaction canCommit) { LOG.debug("Can committing transaction {}", canCommit.getTransactionID()); commitCoordinator.handleCanCommit(canCommit, getSender(), self()); } @@ -391,7 +405,7 @@ public class Shard extends RaftActor { // transactionId so to maintain backwards compatibility, we create a separate cohort actor // to provide the compatible behavior. ActorRef replyActorPath = self(); - if(ready.getTxnClientVersion() < HELIUM_1_TX_VERSION) { + if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) { LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort"); replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props( ready.getTransactionID())); @@ -403,11 +417,11 @@ public class Shard extends RaftActor { readyTransactionReply, getSelf()); } - private void handleAbortTransaction(AbortTransaction abort) { + private void handleAbortTransaction(final AbortTransaction abort) { doAbortTransaction(abort.getTransactionID(), getSender()); } - private void doAbortTransaction(String transactionID, final ActorRef sender) { + void doAbortTransaction(final String transactionID, final ActorRef sender) { final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID); if(cohortEntry != null) { LOG.debug("Aborting transaction {}", transactionID); @@ -422,16 +436,16 @@ public class Shard extends RaftActor { Futures.addCallback(future, new FutureCallback() { @Override - public void onSuccess(Void v) { + public void onSuccess(final Void v) { shardMBean.incrementAbortTransactionsCount(); if(sender != null) { - sender.tell(new AbortTransactionReply().toSerializable(), self); + sender.tell(AbortTransactionReply.INSTANCE.toSerializable(), self); } } @Override - public void onFailure(Throwable t) { + public void onFailure(final Throwable t) { LOG.error(t, "An exception happened during abort"); if(sender != null) { @@ -442,7 +456,7 @@ public class Shard extends RaftActor { } } - private void handleCreateTransaction(Object message) { + private void handleCreateTransaction(final Object message) { if (isLeader()) { createTransaction(CreateTransaction.fromSerializable(message)); } else if (getLeader() != null) { @@ -455,21 +469,7 @@ public class Shard extends RaftActor { } } - private void handleReadDataReply(Object message) { - // This must be for install snapshot. Don't want to open this up and trigger - // deSerialization - - self().tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)), - self()); - - createSnapshotTransaction = null; - - // Send a PoisonPill instead of sending close transaction because we do not really need - // a response - getSender().tell(PoisonPill.getInstance(), self()); - } - - private void closeTransactionChain(CloseTransactionChain closeTransactionChain) { + private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) { DOMStoreTransactionChain chain = transactionChains.remove(closeTransactionChain.getTransactionChainId()); @@ -479,7 +479,8 @@ public class Shard extends RaftActor { } private ActorRef createTypedTransactionActor(int transactionType, - ShardTransactionIdentifier transactionId, String transactionChainId, int clientVersion ) { + ShardTransactionIdentifier transactionId, String transactionChainId, + short clientVersion ) { DOMStoreTransactionFactory factory = store; @@ -492,8 +493,8 @@ public class Shard extends RaftActor { } } - if(this.schemaContext == null){ - throw new NullPointerException("schemaContext should not be null"); + if(this.schemaContext == null) { + throw new IllegalStateException("SchemaContext is not set"); } if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) { @@ -534,41 +535,44 @@ public class Shard extends RaftActor { } private void createTransaction(CreateTransaction createTransaction) { - createTransaction(createTransaction.getTransactionType(), - createTransaction.getTransactionId(), createTransaction.getTransactionChainId(), - createTransaction.getClientVersion()); + try { + ActorRef transactionActor = createTransaction(createTransaction.getTransactionType(), + createTransaction.getTransactionId(), createTransaction.getTransactionChainId(), + createTransaction.getVersion()); + + getSender().tell(new CreateTransactionReply(Serialization.serializedActorPath(transactionActor), + createTransaction.getTransactionId()).toSerializable(), getSelf()); + } catch (Exception e) { + getSender().tell(new akka.actor.Status.Failure(e), getSelf()); + } } private ActorRef createTransaction(int transactionType, String remoteTransactionId, - String transactionChainId, int clientVersion) { + String transactionChainId, short clientVersion) { ShardTransactionIdentifier transactionId = ShardTransactionIdentifier.builder() .remoteTransactionId(remoteTransactionId) .build(); + if(LOG.isDebugEnabled()) { LOG.debug("Creating transaction : {} ", transactionId); } + ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId, transactionChainId, clientVersion); - getSender() - .tell(new CreateTransactionReply( - Serialization.serializedActorPath(transactionActor), - remoteTransactionId).toSerializable(), - getSelf()); - return transactionActor; } - private void syncCommitTransaction(DOMStoreWriteTransaction transaction) + private void syncCommitTransaction(final DOMStoreWriteTransaction transaction) throws ExecutionException, InterruptedException { DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready(); commitCohort.preCommit().get(); commitCohort.commit().get(); } - private void commitWithNewTransaction(Modification modification) { + private void commitWithNewTransaction(final Modification modification) { DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction(); modification.apply(tx); try { @@ -581,18 +585,18 @@ public class Shard extends RaftActor { } } - private void updateSchemaContext(UpdateSchemaContext message) { + private void updateSchemaContext(final UpdateSchemaContext message) { this.schemaContext = message.getSchemaContext(); updateSchemaContext(message.getSchemaContext()); store.onGlobalContextUpdated(message.getSchemaContext()); } @VisibleForTesting - void updateSchemaContext(SchemaContext schemaContext) { + void updateSchemaContext(final SchemaContext schemaContext) { store.onGlobalContextUpdated(schemaContext); } - private void registerChangeListener(RegisterChangeListener registerChangeListener) { + private void registerChangeListener(final RegisterChangeListener registerChangeListener) { LOG.debug("registerDataChangeListener for {}", registerChangeListener.getPath()); @@ -615,12 +619,12 @@ public class Shard extends RaftActor { LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = {} ", listenerRegistration.path()); - getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()),getSelf()); + getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf()); } private ListenerRegistration>> doChangeListenerRegistration( - RegisterChangeListener registerChangeListener) { + final RegisterChangeListener registerChangeListener) { ActorSelection dataChangeListenerPath = getContext().system().actorSelection( registerChangeListener.getDataChangeListenerPath()); @@ -635,7 +639,7 @@ public class Shard extends RaftActor { dataChangeListeners.add(dataChangeListenerPath); AsyncDataChangeListener> listener = - new DataChangeListenerProxy(schemaContext, dataChangeListenerPath); + new DataChangeListenerProxy(dataChangeListenerPath); LOG.debug("Registering for path {}", registerChangeListener.getPath()); @@ -650,7 +654,7 @@ public class Shard extends RaftActor { @Override protected - void startLogRecoveryBatch(int maxBatchSize) { + void startLogRecoveryBatch(final int maxBatchSize) { currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize); if(LOG.isDebugEnabled()) { @@ -659,21 +663,29 @@ public class Shard extends RaftActor { } @Override - protected void appendRecoveredLogEntry(Payload data) { - if (data instanceof CompositeModificationPayload) { + protected void appendRecoveredLogEntry(final Payload data) { + if(data instanceof ModificationPayload) { + try { + currentLogRecoveryBatch.add(((ModificationPayload) data).getModification()); + } catch (ClassNotFoundException | IOException e) { + LOG.error(e, "Error extracting ModificationPayload"); + } + } else if (data instanceof CompositeModificationPayload) { currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification()); + } else if (data instanceof CompositeModificationByteStringPayload) { + currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification()); } else { LOG.error("Unknown state received {} during recovery", data); } } @Override - protected void applyRecoverySnapshot(ByteString snapshot) { + protected void applyRecoverySnapshot(final byte[] snapshotBytes) { if(recoveryCoordinator == null) { recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext); } - recoveryCoordinator.submit(snapshot, store.newWriteOnlyTransaction()); + recoveryCoordinator.submit(snapshotBytes, store.newWriteOnlyTransaction()); if(LOG.isDebugEnabled()) { LOG.debug("{} : submitted recovery sbapshot", persistenceId()); @@ -733,24 +745,23 @@ public class Shard extends RaftActor { } @Override - protected void applyState(ActorRef clientActor, String identifier, Object data) { + protected void applyState(final ActorRef clientActor, final String identifier, final Object data) { - if (data instanceof CompositeModificationPayload) { + if(data instanceof ModificationPayload) { + try { + applyModificationToState(clientActor, identifier, ((ModificationPayload) data).getModification()); + } catch (ClassNotFoundException | IOException e) { + LOG.error(e, "Error extracting ModificationPayload"); + } + } + else if (data instanceof CompositeModificationPayload) { Object modification = ((CompositeModificationPayload) data).getModification(); - if(modification == null) { - LOG.error( - "modification is null - this is very unexpected, clientActor = {}, identifier = {}", - identifier, clientActor != null ? clientActor.path().toString() : null); - } else if(clientActor == null) { - // There's no clientActor to which to send a commit reply so we must be applying - // replicated state from the leader. - commitWithNewTransaction(MutableCompositeModification.fromSerializable( - modification, schemaContext)); - } else { - // This must be the OK to commit after replication consensus. - finishCommit(clientActor, identifier); - } + applyModificationToState(clientActor, identifier, modification); + } else if(data instanceof CompositeModificationByteStringPayload ){ + Object modification = ((CompositeModificationByteStringPayload) data).getModification(); + + applyModificationToState(clientActor, identifier, modification); } else { LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}", data, data.getClass().getClassLoader(), @@ -761,6 +772,21 @@ public class Shard extends RaftActor { } + private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) { + if(modification == null) { + LOG.error( + "modification is null - this is very unexpected, clientActor = {}, identifier = {}", + identifier, clientActor != null ? clientActor.path().toString() : null); + } else if(clientActor == null) { + // There's no clientActor to which to send a commit reply so we must be applying + // replicated state from the leader. + commitWithNewTransaction(MutableCompositeModification.fromSerializable(modification)); + } else { + // This must be the OK to commit after replication consensus. + finishCommit(clientActor, identifier); + } + } + private void updateJournalStats() { ReplicatedLogEntry lastLogEntry = getLastLogEntry(); @@ -771,28 +797,26 @@ public class Shard extends RaftActor { shardMBean.setCommitIndex(getCommitIndex()); shardMBean.setLastApplied(getLastApplied()); + shardMBean.setInMemoryJournalDataSize(getRaftActorContext().getReplicatedLog().dataSize()); } @Override protected void createSnapshot() { - if (createSnapshotTransaction == null) { + // Create a transaction actor. We are really going to treat the transaction as a worker + // so that this actor does not get block building the snapshot. THe transaction actor will + // after processing the CreateSnapshot message. - // Create a transaction. We are really going to treat the transaction as a worker - // so that this actor does not get block building the snapshot - createSnapshotTransaction = createTransaction( + ActorRef createSnapshotTransaction = createTransaction( TransactionProxy.TransactionType.READ_ONLY.ordinal(), "createSnapshot" + ++createSnapshotTransactionCounter, "", - CreateTransaction.CURRENT_CLIENT_VERSION); + DataStoreVersions.CURRENT_VERSION); - createSnapshotTransaction.tell( - new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self()); - - } + createSnapshotTransaction.tell(CreateSnapshot.INSTANCE, self()); } @VisibleForTesting @Override - protected void applySnapshot(ByteString snapshot) { + protected void applySnapshot(final byte[] snapshotBytes) { // Since this will be done only on Recovery or when this actor is a Follower // we can safely commit everything in here. We not need to worry about event notifications // as they would have already been disabled on the follower @@ -800,17 +824,16 @@ public class Shard extends RaftActor { LOG.info("Applying snapshot"); try { DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction(); - NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot); - NormalizedNode node = new NormalizedNodeToNodeCodec(schemaContext) - .decode(serializedNode); + + NormalizedNode node = SerializationUtils.deserializeNormalizedNode(snapshotBytes); // delete everything first - transaction.delete(YangInstanceIdentifier.builder().build()); + transaction.delete(DATASTORE_ROOT); // Add everything from the remote node back - transaction.write(YangInstanceIdentifier.builder().build(), node); + transaction.write(DATASTORE_ROOT, node); syncCommitTransaction(transaction); - } catch (InvalidProtocolBufferException | InterruptedException | ExecutionException e) { + } catch (InterruptedException | ExecutionException e) { LOG.error(e, "An exception occurred when applying snapshot"); } finally { LOG.info("Done applying snapshot"); @@ -857,7 +880,7 @@ public class Shard extends RaftActor { return dataPersistenceProvider; } - @Override protected void onLeaderChanged(String oldLeader, String newLeader) { + @Override protected void onLeaderChanged(final String oldLeader, final String newLeader) { shardMBean.setLeader(newLeader); } @@ -879,8 +902,8 @@ public class Shard extends RaftActor { final DatastoreContext datastoreContext; final SchemaContext schemaContext; - ShardCreator(ShardIdentifier name, Map peerAddresses, - DatastoreContext datastoreContext, SchemaContext schemaContext) { + ShardCreator(final ShardIdentifier name, final Map peerAddresses, + final DatastoreContext datastoreContext, final SchemaContext schemaContext) { this.name = name; this.peerAddresses = peerAddresses; this.datastoreContext = datastoreContext; @@ -913,11 +936,11 @@ public class Shard extends RaftActor { private volatile ListenerRegistration>> delegate; - DelayedListenerRegistration(RegisterChangeListener registerChangeListener) { + DelayedListenerRegistration(final RegisterChangeListener registerChangeListener) { this.registerChangeListener = registerChangeListener; } - void setDelegate( ListenerRegistration>> registration) { this.delegate = registration; }