X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=9cd758ba30fdb94e85cd1703d99a8e0c55a50a17;hb=12fa2670527dfe66df922dd2feb2001eec3f60ba;hp=dea377a810cb73be01cf4c17dab93c4c2b476c1b;hpb=3f2221486de63178fbfbb43508ce9466c0b23b73;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index dea377a810..9cd758ba30 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -11,7 +11,6 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Cancellable; -import akka.actor.PoisonPill; import akka.actor.Props; import akka.event.Logging; import akka.event.LoggingAdapter; @@ -25,8 +24,6 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; import java.io.IOException; import java.util.Collection; import java.util.HashMap; @@ -52,13 +49,12 @@ import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransacti import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain; import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.EnableNotification; import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; -import org.opendaylight.controller.cluster.datastore.messages.ReadData; -import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; @@ -66,18 +62,16 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; -import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec; +import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; import org.opendaylight.controller.cluster.raft.RaftActor; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory; -import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory; @@ -97,6 +91,8 @@ import scala.concurrent.duration.FiniteDuration; */ public class Shard extends RaftActor { + private static final YangInstanceIdentifier DATASTORE_ROOT = YangInstanceIdentifier.builder().build(); + private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck"; @VisibleForTesting @@ -124,8 +120,6 @@ public class Shard extends RaftActor { private SchemaContext schemaContext; - private ActorRef createSnapshotTransaction; - private int createSnapshotTransactionCounter; private final ShardCommitCoordinator commitCoordinator; @@ -244,9 +238,7 @@ public class Shard extends RaftActor { LOG.debug("onReceiveCommand: Received message {} from {}", message, getSender()); } - if(message.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) { - handleReadDataReply(message); - } else if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) { + if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) { handleCreateTransaction(message); } else if(message instanceof ForwardedReadyTransaction) { handleForwardedReadyTransaction((ForwardedReadyTransaction)message); @@ -477,20 +469,6 @@ public class Shard extends RaftActor { } } - private void handleReadDataReply(final Object message) { - // This must be for install snapshot. Don't want to open this up and trigger - // deSerialization - - self().tell(new CaptureSnapshotReply(ReadDataReply.fromSerializableAsByteString(message)), - self()); - - createSnapshotTransaction = null; - - // Send a PoisonPill instead of sending close transaction because we do not really need - // a response - getSender().tell(PoisonPill.getInstance(), self()); - } - private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) { DOMStoreTransactionChain chain = transactionChains.remove(closeTransactionChain.getTransactionChainId()); @@ -702,12 +680,12 @@ public class Shard extends RaftActor { } @Override - protected void applyRecoverySnapshot(final ByteString snapshot) { + protected void applyRecoverySnapshot(final byte[] snapshotBytes) { if(recoveryCoordinator == null) { recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext); } - recoveryCoordinator.submit(snapshot, store.newWriteOnlyTransaction()); + recoveryCoordinator.submit(snapshotBytes, store.newWriteOnlyTransaction()); if(LOG.isDebugEnabled()) { LOG.debug("{} : submitted recovery sbapshot", persistenceId()); @@ -824,24 +802,21 @@ public class Shard extends RaftActor { @Override protected void createSnapshot() { - if (createSnapshotTransaction == null) { + // Create a transaction actor. We are really going to treat the transaction as a worker + // so that this actor does not get block building the snapshot. THe transaction actor will + // after processing the CreateSnapshot message. - // Create a transaction. We are really going to treat the transaction as a worker - // so that this actor does not get block building the snapshot - createSnapshotTransaction = createTransaction( + ActorRef createSnapshotTransaction = createTransaction( TransactionProxy.TransactionType.READ_ONLY.ordinal(), "createSnapshot" + ++createSnapshotTransactionCounter, "", DataStoreVersions.CURRENT_VERSION); - createSnapshotTransaction.tell( - new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self()); - - } + createSnapshotTransaction.tell(CreateSnapshot.INSTANCE, self()); } @VisibleForTesting @Override - protected void applySnapshot(final ByteString snapshot) { + protected void applySnapshot(final byte[] snapshotBytes) { // Since this will be done only on Recovery or when this actor is a Follower // we can safely commit everything in here. We not need to worry about event notifications // as they would have already been disabled on the follower @@ -849,17 +824,16 @@ public class Shard extends RaftActor { LOG.info("Applying snapshot"); try { DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction(); - NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot); - NormalizedNode node = new NormalizedNodeToNodeCodec(schemaContext) - .decode(serializedNode); + + NormalizedNode node = SerializationUtils.deserializeNormalizedNode(snapshotBytes); // delete everything first - transaction.delete(YangInstanceIdentifier.builder().build()); + transaction.delete(DATASTORE_ROOT); // Add everything from the remote node back - transaction.write(YangInstanceIdentifier.builder().build(), node); + transaction.write(DATASTORE_ROOT, node); syncCommitTransaction(transaction); - } catch (InvalidProtocolBufferException | InterruptedException | ExecutionException e) { + } catch (InterruptedException | ExecutionException e) { LOG.error(e, "An exception occurred when applying snapshot"); } finally { LOG.info("Done applying snapshot");