X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=7ca79fc2349add0e9c19236a945aaed53676b45d;hp=c98e11d60a2e063395ed0b428d2eb5baaee77ea2;hb=43fb391bf873b252383a8d736b2651b04da8d40d;hpb=057b787289f7b909d7013c22ac73a1c91c860af8 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index c98e11d60a..7ca79fc234 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -12,18 +12,40 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Cancellable; import akka.actor.Props; +import akka.actor.Status.Failure; import akka.serialization.Serialization; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Ticker; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Range; import java.io.IOException; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.opendaylight.controller.cluster.access.ABIVersion; +import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest; +import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess; +import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest; +import org.opendaylight.controller.cluster.access.commands.NotLeaderException; +import org.opendaylight.controller.cluster.access.commands.TransactionRequest; +import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; +import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.controller.cluster.access.concepts.Request; +import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; +import org.opendaylight.controller.cluster.access.concepts.RequestException; +import org.opendaylight.controller.cluster.access.concepts.RequestSuccess; +import org.opendaylight.controller.cluster.access.concepts.RetiredGenerationException; +import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException; import org.opendaylight.controller.cluster.common.actor.CommonConfig; import org.opendaylight.controller.cluster.common.actor.MessageTracker; import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error; @@ -40,16 +62,17 @@ import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionCh import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; -import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; -import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot; import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree; +import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; +import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot; import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; @@ -59,6 +82,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort; import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; +import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; @@ -72,7 +96,8 @@ import scala.concurrent.duration.FiniteDuration; /** * A Shard represents a portion of the logical data tree. - *

+ * + *

* Our Shard uses InMemoryDataTree as it's internal representation and delegates all requests it */ public class Shard extends RaftActor { @@ -96,6 +121,17 @@ public class Shard extends RaftActor { // FIXME: shard names should be encapsulated in their own class and this should be exposed as a constant. public static final String DEFAULT_NAME = "default"; + private static final Collection SUPPORTED_ABIVERSIONS; + + static { + final ABIVersion[] values = ABIVersion.values(); + final ABIVersion[] real = Arrays.copyOfRange(values, 1, values.length - 1); + SUPPORTED_ABIVERSIONS = ImmutableList.copyOf(real).reverse(); + } + + // FIXME: make this a dynamic property based on mailbox size and maximum number of clients + private static final int CLIENT_MAX_MESSAGES = 1000; + // The state of this Shard private final ShardDataTree store; @@ -129,6 +165,7 @@ public class Shard extends RaftActor { private final ShardTransactionMessageRetrySupport messageRetrySupport; private final FrontendMetadata frontendMetadata = new FrontendMetadata(); + private final Map knownFrontends = new HashMap<>(); protected Shard(final AbstractBuilder builder) { super(builder.getId().toString(), builder.getPeerAddresses(), @@ -151,7 +188,8 @@ public class Shard extends RaftActor { treeChangeListenerPublisher, dataChangeListenerPublisher, name); } else { store = new ShardDataTree(this, builder.getSchemaContext(), builder.getTreeType(), - treeChangeListenerPublisher, dataChangeListenerPublisher, name); + builder.getDatastoreContext().getStoreRoot(), treeChangeListenerPublisher, + dataChangeListenerPublisher, name); } shardMBean = ShardMBeanFactory.getShardStatsMBean(name, datastoreContext.getDataStoreMXBeanType(), this); @@ -171,8 +209,8 @@ public class Shard extends RaftActor { getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis()); transactionActorFactory = new ShardTransactionActorFactory(store, datastoreContext, - new Dispatchers(context().system().dispatchers()).getDispatcherPath( - Dispatchers.DispatcherType.Transaction), self(), getContext(), shardMBean); + new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Transaction), + self(), getContext(), shardMBean, builder.getId().getShardName()); snapshotCohort = ShardSnapshotCohort.create(getContext(), builder.getId().getMemberName(), store, LOG, this.name); @@ -219,16 +257,36 @@ public class Shard extends RaftActor { } } + @SuppressWarnings("checkstyle:IllegalCatch") @Override protected void handleNonRaftCommand(final Object message) { - try (final MessageTracker.Context context = appendEntriesReplyTracker.received(message)) { + try (MessageTracker.Context context = appendEntriesReplyTracker.received(message)) { final Optional maybeError = context.error(); if (maybeError.isPresent()) { LOG.trace("{} : AppendEntriesReply failed to arrive at the expected interval {}", persistenceId(), maybeError.get()); } - if (CreateTransaction.isSerializedType(message)) { + if (message instanceof RequestEnvelope) { + final long now = ticker().read(); + final RequestEnvelope envelope = (RequestEnvelope)message; + + try { + final RequestSuccess success = handleRequest(envelope, now); + if (success != null) { + envelope.sendSuccess(success, ticker().read() - now); + } + } catch (RequestException e) { + LOG.debug("{}: request {} failed", persistenceId(), envelope, e); + envelope.sendFailure(e, ticker().read() - now); + } catch (Exception e) { + LOG.debug("{}: request {} caused failure", persistenceId(), envelope, e); + envelope.sendFailure(new RuntimeRequestException("Request failed to process", e), + ticker().read() - now); + } + } else if (message instanceof ConnectClientRequest) { + handleConnectClient((ConnectClientRequest)message); + } else if (CreateTransaction.isSerializedType(message)) { handleCreateTransaction(message); } else if (message instanceof BatchedModifications) { handleBatchedModifications((BatchedModifications)message); @@ -280,6 +338,87 @@ public class Shard extends RaftActor { } } + // Acquire our frontend tracking handle and verify generation matches + private LeaderFrontendState getFrontend(final ClientIdentifier clientId) throws RequestException { + final LeaderFrontendState existing = knownFrontends.get(clientId.getFrontendId()); + if (existing != null) { + final int cmp = Long.compareUnsigned(existing.getIdentifier().getGeneration(), clientId.getGeneration()); + if (cmp == 0) { + return existing; + } + if (cmp > 0) { + LOG.debug("{}: rejecting request from outdated client {}", persistenceId(), clientId); + throw new RetiredGenerationException(existing.getIdentifier().getGeneration()); + } + + LOG.info("{}: retiring state {}, outdated by request from client {}", persistenceId(), existing, clientId); + existing.retire(); + knownFrontends.remove(clientId.getFrontendId()); + } else { + LOG.debug("{}: client {} is not yet known", persistenceId(), clientId); + } + + final LeaderFrontendState ret = new LeaderFrontendState(persistenceId(), clientId, store); + knownFrontends.put(clientId.getFrontendId(), ret); + LOG.debug("{}: created state {} for client {}", persistenceId(), ret, clientId); + return ret; + } + + private static @Nonnull ABIVersion selectVersion(final ConnectClientRequest message) { + final Range clientRange = Range.closed(message.getMinVersion(), message.getMaxVersion()); + for (ABIVersion v : SUPPORTED_ABIVERSIONS) { + if (clientRange.contains(v)) { + return v; + } + } + + throw new IllegalArgumentException(String.format( + "No common version between backend versions %s and client versions %s", SUPPORTED_ABIVERSIONS, + clientRange)); + } + + @SuppressWarnings("checkstyle:IllegalCatch") + private void handleConnectClient(final ConnectClientRequest message) { + try { + if (!isLeader() || !isLeaderActive()) { + LOG.debug("{}: not currently leader, rejecting request {}", persistenceId(), message); + throw new NotLeaderException(getSelf()); + } + + final ABIVersion selectedVersion = selectVersion(message); + final LeaderFrontendState frontend = getFrontend(message.getTarget()); + frontend.reconnect(); + message.getReplyTo().tell(new ConnectClientSuccess(message.getTarget(), message.getSequence(), getSelf(), + ImmutableList.of(), store.getDataTree(), CLIENT_MAX_MESSAGES).toVersion(selectedVersion), + ActorRef.noSender()); + } catch (RequestException | RuntimeException e) { + message.getReplyTo().tell(new Failure(e), ActorRef.noSender()); + } + } + + private @Nullable RequestSuccess handleRequest(final RequestEnvelope envelope, final long now) + throws RequestException { + // We are not the leader, hence we want to fail-fast. + if (!isLeader() || !isLeaderActive()) { + LOG.debug("{}: not currently leader, rejecting request {}", persistenceId(), envelope); + throw new NotLeaderException(getSelf()); + } + + final Request request = envelope.getMessage(); + if (request instanceof TransactionRequest) { + final TransactionRequest txReq = (TransactionRequest)request; + final ClientIdentifier clientId = txReq.getTarget().getHistoryId().getClientId(); + return getFrontend(clientId).handleTransactionRequest(txReq, envelope, now); + } else if (request instanceof LocalHistoryRequest) { + final LocalHistoryRequest lhReq = (LocalHistoryRequest)request; + final ClientIdentifier clientId = lhReq.getTarget().getClientId(); + return getFrontend(clientId).handleLocalHistoryRequest(lhReq, envelope, now); + } else { + LOG.debug("{}: rejecting unsupported request {}", persistenceId(), request); + throw new UnsupportedRequestException(request); + } + } + private boolean hasLeader() { return getLeaderId() != null; } @@ -314,16 +453,15 @@ public class Shard extends RaftActor { updateConfigParams(datastoreContext.getShardRaftConfig()); } - boolean canSkipPayload() { - // If we do not have any followers and we are not using persistence we can apply modification to the state - // immediately - return !hasFollowers() && !persistence().isRecoveryApplicable(); - } - // applyState() will be invoked once consensus is reached on the payload - void persistPayload(final TransactionIdentifier transactionId, final Payload payload) { - // We are faking the sender - persistData(self(), transactionId, payload); + void persistPayload(final Identifier id, final Payload payload, final boolean batchHint) { + boolean canSkipPayload = !hasFollowers() && !persistence().isRecoveryApplicable(); + if (canSkipPayload) { + applyState(self(), id, payload); + } else { + // We are faking the sender + persistData(self(), id, payload, batchHint); + } } private void handleCommitTransaction(final CommitTransaction commit) { @@ -365,7 +503,7 @@ public class Shard extends RaftActor { } catch (Exception e) { LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(), batched.getTransactionId(), e); - sender.tell(new akka.actor.Status.Failure(e), getSelf()); + sender.tell(new Failure(e), getSelf()); } } @@ -411,7 +549,7 @@ public class Shard extends RaftActor { private boolean failIfIsolatedLeader(final ActorRef sender) { if (isIsolatedLeader()) { - sender.tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format( + sender.tell(new Failure(new NoShardLeaderException(String.format( "Shard %s was the leader but has lost contact with all of its followers. Either all" + " other follower nodes are down or this node is isolated by a network partition.", persistenceId()))), getSelf()); @@ -436,7 +574,7 @@ public class Shard extends RaftActor { } catch (Exception e) { LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(), message.getTransactionId(), e); - getSender().tell(new akka.actor.Status.Failure(e), getSelf()); + getSender().tell(new Failure(e), getSelf()); } } else { ActorSelection leader = getLeader(); @@ -477,7 +615,7 @@ public class Shard extends RaftActor { doAbortTransaction(abort.getTransactionId(), getSender()); } - void doAbortTransaction(final TransactionIdentifier transactionID, final ActorRef sender) { + void doAbortTransaction(final Identifier transactionID, final ActorRef sender) { commitCoordinator.handleAbort(transactionID, sender, this); } @@ -487,13 +625,15 @@ public class Shard extends RaftActor { } else if (getLeader() != null) { getLeader().forward(message, getContext()); } else { - getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException( + getSender().tell(new Failure(new NoShardLeaderException( "Could not create a shard transaction", persistenceId())), getSelf()); } } private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) { - store.closeTransactionChain(closeTransactionChain.getIdentifier()); + final LocalHistoryIdentifier id = closeTransactionChain.getIdentifier(); + store.closeTransactionChain(id, null); + store.purgeTransactionChain(id, null); } @SuppressWarnings("checkstyle:IllegalCatch") @@ -510,7 +650,7 @@ public class Shard extends RaftActor { getSender().tell(new CreateTransactionReply(Serialization.serializedActorPath(transactionActor), createTransaction.getTransactionId(), createTransaction.getVersion()).toSerializable(), getSelf()); } catch (Exception e) { - getSender().tell(new akka.actor.Status.Failure(e), getSelf()); + getSender().tell(new Failure(e), getSelf()); } } @@ -648,6 +788,13 @@ public class Shard extends RaftActor { store.setRunOnPendingTransactionsComplete(operation); } + @Override + protected OnDemandRaftState.AbstractBuilder newOnDemandRaftStateBuilder() { + return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors()) + .dataChangeListenerActors(changeSupport.getListenerActors()) + .commitCohortActors(store.getCohortActors()); + } + @Override public String persistenceId() { return this.name;