X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=402bf4822b01a1f56a12ebfcbf1a2296c24d4fb2;hb=2a035cba6169185444e7074952b5a2f5cebb3bb1;hp=768b19fa3e7f31b971142bcd4c49968200c36124;hpb=730ff1686220fca0176a8757b4444d63ab2afc44;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 768b19fa3e..402bf4822b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -11,9 +11,11 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Cancellable; +import akka.actor.ExtendedActorSystem; import akka.actor.Props; import akka.actor.Status; import akka.actor.Status.Failure; +import akka.serialization.JavaSerializer; import akka.serialization.Serialization; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; @@ -58,6 +60,7 @@ import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error; import org.opendaylight.controller.cluster.common.actor.MeteringBehavior; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardDataTreeListenerInfoMXBeanImpl; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; @@ -82,6 +85,7 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot; +import org.opendaylight.controller.cluster.messaging.MessageAssembler; import org.opendaylight.controller.cluster.messaging.MessageSlicer; import org.opendaylight.controller.cluster.messaging.SliceOptions; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; @@ -160,6 +164,8 @@ public class Shard extends RaftActor { private final ShardStats shardMBean; + private final ShardDataTreeListenerInfoMXBeanImpl listenerInfoMXBean; + private DatastoreContext datastoreContext; private final ShardCommitCoordinator commitCoordinator; @@ -191,6 +197,8 @@ public class Shard extends RaftActor { private final MessageSlicer responseMessageSlicer; private final Dispatchers dispatchers; + private final MessageAssembler requestMessageAssembler; + protected Shard(final AbstractBuilder builder) { super(builder.getId().toString(), builder.getPeerAddresses(), Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION); @@ -247,6 +255,15 @@ public class Shard extends RaftActor { .messageSliceSize(datastoreContext.getMaximumMessageSliceSize()) .fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory()) .expireStateAfterInactivity(2, TimeUnit.MINUTES).build(); + + requestMessageAssembler = MessageAssembler.builder().logContext(this.name) + .fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory()) + .assembledMessageCallback((message, sender) -> self().tell(message, sender)) + .expireStateAfterInactivity(datastoreContext.getRequestTimeout(), TimeUnit.NANOSECONDS).build(); + + listenerInfoMXBean = new ShardDataTreeListenerInfoMXBeanImpl(name, datastoreContext.getDataStoreMXBeanType(), + self()); + listenerInfoMXBean.register(); } private void setTransactionCommitTimeout() { @@ -275,6 +292,7 @@ public class Shard extends RaftActor { commitCoordinator.abortPendingTransactions("Transaction aborted due to shutdown.", this); shardMBean.unregisterMBean(); + listenerInfoMXBean.unregister(); } @Override @@ -301,6 +319,8 @@ public class Shard extends RaftActor { if (message instanceof RequestEnvelope) { handleRequestEnvelope((RequestEnvelope)message); + } else if (MessageAssembler.isHandledMessage(message)) { + handleRequestAssemblerMessage(message); } else if (message instanceof ConnectClientRequest) { handleConnectClient((ConnectClientRequest)message); } else if (CreateTransaction.isSerializedType(message)) { @@ -329,8 +349,7 @@ public class Shard extends RaftActor { PeerAddressResolved resolved = (PeerAddressResolved) message; setPeerAddress(resolved.getPeerId(), resolved.getPeerAddress()); } else if (TX_COMMIT_TIMEOUT_CHECK_MESSAGE.equals(message)) { - store.checkForExpiredTransactions(transactionCommitTimeout); - commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this); + commitTimeoutCheck(); } else if (message instanceof DatastoreContext) { onDatastoreContext((DatastoreContext)message); } else if (message instanceof RegisterRoleChangeListener) { @@ -362,6 +381,13 @@ public class Shard extends RaftActor { } } + private void handleRequestAssemblerMessage(final Object message) { + dispatchers.getDispatcher(DispatcherType.Serialization).execute(() -> { + JavaSerializer.currentSystem().value_$eq((ExtendedActorSystem) context().system()); + requestMessageAssembler.handleMessage(message, self()); + }); + } + @SuppressWarnings("checkstyle:IllegalCatch") private void handleRequestEnvelope(final RequestEnvelope envelope) { final long now = ticker().read(); @@ -391,6 +417,30 @@ public class Shard extends RaftActor { } } + private void commitTimeoutCheck() { + store.checkForExpiredTransactions(transactionCommitTimeout, this::updateAccess); + commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this); + requestMessageAssembler.checkExpiredAssembledMessageState(); + } + + private Optional updateAccess(final SimpleShardDataTreeCohort cohort) { + final FrontendIdentifier frontend = cohort.getIdentifier().getHistoryId().getClientId().getFrontendId(); + final LeaderFrontendState state = knownFrontends.get(frontend); + if (state == null) { + // Not tell-based protocol, do nothing + return Optional.absent(); + } + + if (isIsolatedLeader()) { + // We are isolated and no new request can come through until we emerge from it. We are still updating + // liveness of frontend when we see it attempting to communicate. Use the last access timer. + return Optional.of(state.getLastSeenTicks()); + } + + // If this frontend has freshly connected, give it some time to catch up before killing its transactions. + return Optional.of(state.getLastConnectTicks()); + } + private void onMakeLeaderLocal() { LOG.debug("{}: onMakeLeaderLocal received", persistenceId()); if (isLeader()) { @@ -452,7 +502,8 @@ public class Shard extends RaftActor { throw new OutOfSequenceEnvelopeException(0); } - private static @Nonnull ABIVersion selectVersion(final ConnectClientRequest message) { + @Nonnull + private static ABIVersion selectVersion(final ConnectClientRequest message) { final Range clientRange = Range.closed(message.getMinVersion(), message.getMaxVersion()); for (ABIVersion v : SUPPORTED_ABIVERSIONS) { if (clientRange.contains(v)) { @@ -500,7 +551,8 @@ public class Shard extends RaftActor { } } - private @Nullable RequestSuccess handleRequest(final RequestEnvelope envelope, final long now) + @Nullable + private RequestSuccess handleRequest(final RequestEnvelope envelope, final long now) throws RequestException { // We are not the leader, hence we want to fail-fast. if (!isLeader() || paused || !isLeaderActive()) { @@ -633,7 +685,7 @@ public class Shard extends RaftActor { ActorSelection leader = getLeader(); if (!isLeaderActive || leader == null) { messageRetrySupport.addMessageToRetry(batched, getSender(), - "Could not commit transaction " + batched.getTransactionId()); + "Could not process BatchedModifications " + batched.getTransactionId()); } else { // If this is not the first batch and leadership changed in between batched messages, // we need to reconstruct previous BatchedModifications from the transaction @@ -686,7 +738,7 @@ public class Shard extends RaftActor { ActorSelection leader = getLeader(); if (!isLeaderActive || leader == null) { messageRetrySupport.addMessageToRetry(message, getSender(), - "Could not commit transaction " + message.getTransactionId()); + "Could not process ready local transaction " + message.getTransactionId()); } else { LOG.debug("{}: Forwarding ReadyLocalTransaction to leader {}", persistenceId(), leader); message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion()); @@ -705,7 +757,7 @@ public class Shard extends RaftActor { ActorSelection leader = getLeader(); if (!isLeaderActive || leader == null) { messageRetrySupport.addMessageToRetry(forwardedReady, getSender(), - "Could not commit transaction " + forwardedReady.getTransactionId()); + "Could not process forwarded ready transaction " + forwardedReady.getTransactionId()); } else { LOG.debug("{}: Forwarding ForwardedReadyTransaction to leader {}", persistenceId(), leader); @@ -789,8 +841,11 @@ public class Shard extends RaftActor { @Override @Nonnull protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() { - return new ShardRecoveryCoordinator(store, - restoreFromSnapshot != null ? restoreFromSnapshot.getSnapshot() : null, persistenceId(), LOG); + if (restoreFromSnapshot == null) { + return ShardRecoveryCoordinator.create(store, persistenceId(), LOG); + } + + return ShardRecoveryCoordinator.forSnapshot(store, persistenceId(), LOG, restoreFromSnapshot.getSnapshot()); } @Override @@ -859,6 +914,8 @@ public class Shard extends RaftActor { knownFrontends = ImmutableMap.of(); } + requestMessageAssembler.close(); + if (!hasLeader()) { // No leader anywhere, nothing else to do return; @@ -927,7 +984,7 @@ public class Shard extends RaftActor { } @Override - protected OnDemandRaftState.AbstractBuilder newOnDemandRaftStateBuilder() { + protected OnDemandRaftState.AbstractBuilder newOnDemandRaftStateBuilder() { return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors()) .dataChangeListenerActors(changeSupport.getListenerActors()) .commitCohortActors(store.getCohortActors());