X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=af16d02eea2909ee75e83e2b62dc58bc633b7031;hp=1bf32e7fca7191236afeeb2c7ed0048b28db499c;hb=7f2ecd54dd3eb09af469a610fdd541b48ed95b80;hpb=b525d4eee599fce34a98e8e7e06d513b895c81c8 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 1bf32e7fca..af16d02eea 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -8,6 +8,25 @@ package org.opendaylight.controller.cluster.datastore; +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.Cancellable; +import akka.actor.PoisonPill; +import akka.actor.Props; +import akka.event.Logging; +import akka.event.LoggingAdapter; +import akka.japi.Creator; +import akka.persistence.RecoveryFailure; +import akka.serialization.Serialization; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -46,6 +65,7 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec; +import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; import org.opendaylight.controller.cluster.raft.RaftActor; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; @@ -65,25 +85,6 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; -import akka.actor.ActorRef; -import akka.actor.ActorSelection; -import akka.actor.Cancellable; -import akka.actor.PoisonPill; -import akka.actor.Props; -import akka.event.Logging; -import akka.event.LoggingAdapter; -import akka.japi.Creator; -import akka.persistence.RecoveryFailure; -import akka.serialization.Serialization; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; /** * A Shard represents a portion of the logical data tree
@@ -93,8 +94,6 @@ import com.google.protobuf.InvalidProtocolBufferException; */ public class Shard extends RaftActor { - private static final int HELIUM_1_TX_VERSION = 1; - private static final Object COMMIT_TRANSACTION_REPLY = new CommitTransactionReply().toSerializable(); private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck"; @@ -133,6 +132,8 @@ public class Shard extends RaftActor { private Cancellable txCommitTimeoutCheckSchedule; + private Optional roleChangeNotifier; + /** * Coordinates persistence recovery on startup. */ @@ -141,8 +142,8 @@ public class Shard extends RaftActor { private final Map transactionChains = new HashMap<>(); - protected Shard(ShardIdentifier name, Map peerAddresses, - DatastoreContext datastoreContext, SchemaContext schemaContext) { + protected Shard(final ShardIdentifier name, final Map peerAddresses, + final DatastoreContext datastoreContext, final SchemaContext schemaContext) { super(name.toString(), mapPeerAddresses(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig())); @@ -162,7 +163,6 @@ public class Shard extends RaftActor { shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(), datastoreContext.getDataStoreMXBeanType()); - shardMBean.setDataStoreExecutor(store.getDomStoreExecutor()); shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager()); if (isMetricsCaptureEnabled()) { @@ -174,10 +174,13 @@ public class Shard extends RaftActor { transactionCommitTimeout = TimeUnit.MILLISECONDS.convert( datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS); + + // create a notifier actor for each cluster member + roleChangeNotifier = createRoleChangeNotifier(name.toString()); } private static Map mapPeerAddresses( - Map peerAddresses) { + final Map peerAddresses) { Map map = new HashMap<>(); for (Map.Entry entry : peerAddresses @@ -190,7 +193,7 @@ public class Shard extends RaftActor { public static Props props(final ShardIdentifier name, final Map peerAddresses, - DatastoreContext datastoreContext, SchemaContext schemaContext) { + final DatastoreContext datastoreContext, final SchemaContext schemaContext) { Preconditions.checkNotNull(name, "name should not be null"); Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null"); Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null"); @@ -199,6 +202,12 @@ public class Shard extends RaftActor { return Props.create(new ShardCreator(name, peerAddresses, datastoreContext, schemaContext)); } + private Optional createRoleChangeNotifier(String shardId) { + ActorRef shardRoleChangeNotifier = this.getContext().actorOf( + RoleChangeNotifier.getProps(shardId), shardId + "-notifier"); + return Optional.of(shardRoleChangeNotifier); + } + @Override public void postStop() { super.postStop(); @@ -209,7 +218,7 @@ public class Shard extends RaftActor { } @Override - public void onReceiveRecover(Object message) throws Exception { + public void onReceiveRecover(final Object message) throws Exception { if(LOG.isDebugEnabled()) { LOG.debug("onReceiveRecover: Received message {} from {}", message.getClass().toString(), @@ -228,7 +237,7 @@ public class Shard extends RaftActor { } @Override - public void onReceiveCommand(Object message) throws Exception { + public void onReceiveCommand(final Object message) throws Exception { if(LOG.isDebugEnabled()) { LOG.debug("onReceiveCommand: Received message {} from {}", message, getSender()); } @@ -262,6 +271,11 @@ public class Shard extends RaftActor { } } + @Override + protected Optional getRoleChangeNotifier() { + return roleChangeNotifier; + } + private void handleTransactionCommitTimeoutCheck() { CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry(); if(cohortEntry != null) { @@ -275,7 +289,7 @@ public class Shard extends RaftActor { } } - private void handleCommitTransaction(CommitTransaction commit) { + private void handleCommitTransaction(final CommitTransaction commit) { final String transactionID = commit.getTransactionID(); LOG.debug("Committing transaction {}", transactionID); @@ -370,7 +384,7 @@ public class Shard extends RaftActor { commitCoordinator.currentTransactionComplete(transactionID, true); } - private void handleCanCommitTransaction(CanCommitTransaction canCommit) { + private void handleCanCommitTransaction(final CanCommitTransaction canCommit) { LOG.debug("Can committing transaction {}", canCommit.getTransactionID()); commitCoordinator.handleCanCommit(canCommit, getSender(), self()); } @@ -391,7 +405,7 @@ public class Shard extends RaftActor { // transactionId so to maintain backwards compatibility, we create a separate cohort actor // to provide the compatible behavior. ActorRef replyActorPath = self(); - if(ready.getTxnClientVersion() < HELIUM_1_TX_VERSION) { + if(ready.getTxnClientVersion() < CreateTransaction.HELIUM_1_VERSION) { LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort"); replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props( ready.getTransactionID())); @@ -403,11 +417,11 @@ public class Shard extends RaftActor { readyTransactionReply, getSelf()); } - private void handleAbortTransaction(AbortTransaction abort) { + private void handleAbortTransaction(final AbortTransaction abort) { doAbortTransaction(abort.getTransactionID(), getSender()); } - private void doAbortTransaction(String transactionID, final ActorRef sender) { + private void doAbortTransaction(final String transactionID, final ActorRef sender) { final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID); if(cohortEntry != null) { LOG.debug("Aborting transaction {}", transactionID); @@ -422,7 +436,7 @@ public class Shard extends RaftActor { Futures.addCallback(future, new FutureCallback() { @Override - public void onSuccess(Void v) { + public void onSuccess(final Void v) { shardMBean.incrementAbortTransactionsCount(); if(sender != null) { @@ -431,7 +445,7 @@ public class Shard extends RaftActor { } @Override - public void onFailure(Throwable t) { + public void onFailure(final Throwable t) { LOG.error(t, "An exception happened during abort"); if(sender != null) { @@ -442,7 +456,7 @@ public class Shard extends RaftActor { } } - private void handleCreateTransaction(Object message) { + private void handleCreateTransaction(final Object message) { if (isLeader()) { createTransaction(CreateTransaction.fromSerializable(message)); } else if (getLeader() != null) { @@ -455,7 +469,7 @@ public class Shard extends RaftActor { } } - private void handleReadDataReply(Object message) { + private void handleReadDataReply(final Object message) { // This must be for install snapshot. Don't want to open this up and trigger // deSerialization @@ -469,7 +483,7 @@ public class Shard extends RaftActor { getSender().tell(PoisonPill.getInstance(), self()); } - private void closeTransactionChain(CloseTransactionChain closeTransactionChain) { + private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) { DOMStoreTransactionChain chain = transactionChains.remove(closeTransactionChain.getTransactionChainId()); @@ -492,8 +506,8 @@ public class Shard extends RaftActor { } } - if(this.schemaContext == null){ - throw new NullPointerException("schemaContext should not be null"); + if(this.schemaContext == null) { + throw new IllegalStateException("SchemaContext is not set"); } if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) { @@ -534,9 +548,16 @@ public class Shard extends RaftActor { } private void createTransaction(CreateTransaction createTransaction) { - createTransaction(createTransaction.getTransactionType(), - createTransaction.getTransactionId(), createTransaction.getTransactionChainId(), - createTransaction.getClientVersion()); + try { + ActorRef transactionActor = createTransaction(createTransaction.getTransactionType(), + createTransaction.getTransactionId(), createTransaction.getTransactionChainId(), + createTransaction.getVersion()); + + getSender().tell(new CreateTransactionReply(Serialization.serializedActorPath(transactionActor), + createTransaction.getTransactionId()).toSerializable(), getSelf()); + } catch (Exception e) { + getSender().tell(new akka.actor.Status.Failure(e), getSelf()); + } } private ActorRef createTransaction(int transactionType, String remoteTransactionId, @@ -546,29 +567,25 @@ public class Shard extends RaftActor { ShardTransactionIdentifier.builder() .remoteTransactionId(remoteTransactionId) .build(); + if(LOG.isDebugEnabled()) { LOG.debug("Creating transaction : {} ", transactionId); } + ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId, transactionChainId, clientVersion); - getSender() - .tell(new CreateTransactionReply( - Serialization.serializedActorPath(transactionActor), - remoteTransactionId).toSerializable(), - getSelf()); - return transactionActor; } - private void syncCommitTransaction(DOMStoreWriteTransaction transaction) + private void syncCommitTransaction(final DOMStoreWriteTransaction transaction) throws ExecutionException, InterruptedException { DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready(); commitCohort.preCommit().get(); commitCohort.commit().get(); } - private void commitWithNewTransaction(Modification modification) { + private void commitWithNewTransaction(final Modification modification) { DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction(); modification.apply(tx); try { @@ -581,18 +598,18 @@ public class Shard extends RaftActor { } } - private void updateSchemaContext(UpdateSchemaContext message) { + private void updateSchemaContext(final UpdateSchemaContext message) { this.schemaContext = message.getSchemaContext(); updateSchemaContext(message.getSchemaContext()); store.onGlobalContextUpdated(message.getSchemaContext()); } @VisibleForTesting - void updateSchemaContext(SchemaContext schemaContext) { + void updateSchemaContext(final SchemaContext schemaContext) { store.onGlobalContextUpdated(schemaContext); } - private void registerChangeListener(RegisterChangeListener registerChangeListener) { + private void registerChangeListener(final RegisterChangeListener registerChangeListener) { LOG.debug("registerDataChangeListener for {}", registerChangeListener.getPath()); @@ -615,12 +632,12 @@ public class Shard extends RaftActor { LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = {} ", listenerRegistration.path()); - getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()),getSelf()); + getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf()); } private ListenerRegistration>> doChangeListenerRegistration( - RegisterChangeListener registerChangeListener) { + final RegisterChangeListener registerChangeListener) { ActorSelection dataChangeListenerPath = getContext().system().actorSelection( registerChangeListener.getDataChangeListenerPath()); @@ -650,7 +667,7 @@ public class Shard extends RaftActor { @Override protected - void startLogRecoveryBatch(int maxBatchSize) { + void startLogRecoveryBatch(final int maxBatchSize) { currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize); if(LOG.isDebugEnabled()) { @@ -659,7 +676,7 @@ public class Shard extends RaftActor { } @Override - protected void appendRecoveredLogEntry(Payload data) { + protected void appendRecoveredLogEntry(final Payload data) { if (data instanceof CompositeModificationPayload) { currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification()); } else { @@ -668,7 +685,7 @@ public class Shard extends RaftActor { } @Override - protected void applyRecoverySnapshot(ByteString snapshot) { + protected void applyRecoverySnapshot(final ByteString snapshot) { if(recoveryCoordinator == null) { recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext); } @@ -733,7 +750,7 @@ public class Shard extends RaftActor { } @Override - protected void applyState(ActorRef clientActor, String identifier, Object data) { + protected void applyState(final ActorRef clientActor, final String identifier, final Object data) { if (data instanceof CompositeModificationPayload) { Object modification = ((CompositeModificationPayload) data).getModification(); @@ -771,6 +788,7 @@ public class Shard extends RaftActor { shardMBean.setCommitIndex(getCommitIndex()); shardMBean.setLastApplied(getLastApplied()); + shardMBean.setDataSize(getRaftActorContext().getReplicatedLog().dataSize()); } @Override @@ -782,7 +800,7 @@ public class Shard extends RaftActor { createSnapshotTransaction = createTransaction( TransactionProxy.TransactionType.READ_ONLY.ordinal(), "createSnapshot" + ++createSnapshotTransactionCounter, "", - CreateTransaction.CURRENT_CLIENT_VERSION); + CreateTransaction.CURRENT_VERSION); createSnapshotTransaction.tell( new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self()); @@ -792,7 +810,7 @@ public class Shard extends RaftActor { @VisibleForTesting @Override - protected void applySnapshot(ByteString snapshot) { + protected void applySnapshot(final ByteString snapshot) { // Since this will be done only on Recovery or when this actor is a Follower // we can safely commit everything in here. We not need to worry about event notifications // as they would have already been disabled on the follower @@ -857,7 +875,7 @@ public class Shard extends RaftActor { return dataPersistenceProvider; } - @Override protected void onLeaderChanged(String oldLeader, String newLeader) { + @Override protected void onLeaderChanged(final String oldLeader, final String newLeader) { shardMBean.setLeader(newLeader); } @@ -879,8 +897,8 @@ public class Shard extends RaftActor { final DatastoreContext datastoreContext; final SchemaContext schemaContext; - ShardCreator(ShardIdentifier name, Map peerAddresses, - DatastoreContext datastoreContext, SchemaContext schemaContext) { + ShardCreator(final ShardIdentifier name, final Map peerAddresses, + final DatastoreContext datastoreContext, final SchemaContext schemaContext) { this.name = name; this.peerAddresses = peerAddresses; this.datastoreContext = datastoreContext; @@ -913,11 +931,11 @@ public class Shard extends RaftActor { private volatile ListenerRegistration>> delegate; - DelayedListenerRegistration(RegisterChangeListener registerChangeListener) { + DelayedListenerRegistration(final RegisterChangeListener registerChangeListener) { this.registerChangeListener = registerChangeListener; } - void setDelegate( ListenerRegistration>> registration) { this.delegate = registration; }