X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=dbcbd3d02eff96f393e389d6b92ee5487b83ce18;hb=5dbaf1259ead1904536db204bbc742a3359c1eb1;hp=402bf4822b01a1f56a12ebfcbf1a2296c24d4fb2;hpb=8232a626b43fdd2f5799da0fbcfb0f02d3c8f4fb;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 402bf4822b..dbcbd3d02e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -78,7 +78,6 @@ import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; import org.opendaylight.controller.cluster.datastore.messages.PersistAbortTransactionPayload; import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction; -import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; @@ -103,8 +102,8 @@ import org.opendaylight.controller.cluster.raft.messages.RequestLeadership; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.yangtools.concepts.Identifier; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; -import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider; @@ -162,6 +161,8 @@ public class Shard extends RaftActor { /// The name of this shard private final String name; + private final String shardName; + private final ShardStats shardMBean; private final ShardDataTreeListenerInfoMXBeanImpl listenerInfoMXBean; @@ -183,8 +184,6 @@ public class Shard extends RaftActor { private final ShardSnapshotCohort snapshotCohort; private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this); - private final DataChangeListenerSupport changeSupport = new DataChangeListenerSupport(this); - private ShardSnapshot restoreFromSnapshot; @@ -204,6 +203,7 @@ public class Shard extends RaftActor { Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION); this.name = builder.getId().toString(); + this.shardName = builder.getId().getShardName(); this.datastoreContext = builder.getDatastoreContext(); this.restoreFromSnapshot = builder.getRestoreFromSnapshot(); this.frontendMetadata = new FrontendMetadata(name); @@ -214,15 +214,12 @@ public class Shard extends RaftActor { ShardDataTreeChangeListenerPublisherActorProxy treeChangeListenerPublisher = new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher", name); - ShardDataChangeListenerPublisherActorProxy dataChangeListenerPublisher = - new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher", name); if (builder.getDataTree() != null) { store = new ShardDataTree(this, builder.getSchemaContext(), builder.getDataTree(), - treeChangeListenerPublisher, dataChangeListenerPublisher, name, frontendMetadata); + treeChangeListenerPublisher, name, frontendMetadata); } else { store = new ShardDataTree(this, builder.getSchemaContext(), builder.getTreeType(), - builder.getDatastoreContext().getStoreRoot(), treeChangeListenerPublisher, - dataChangeListenerPublisher, name, frontendMetadata); + builder.getDatastoreContext().getStoreRoot(), treeChangeListenerPublisher, name, frontendMetadata); } shardMBean = ShardMBeanFactory.getShardStatsMBean(name, datastoreContext.getDataStoreMXBeanType(), this); @@ -339,8 +336,6 @@ public class Shard extends RaftActor { handleAbortTransaction(AbortTransaction.fromSerializable(message)); } else if (CloseTransactionChain.isSerializedType(message)) { closeTransactionChain(CloseTransactionChain.fromSerializable(message)); - } else if (message instanceof RegisterChangeListener) { - changeSupport.onMessage((RegisterChangeListener) message, isLeader(), hasLeader()); } else if (message instanceof RegisterDataTreeChangeListener) { treeChangeSupport.onMessage((RegisterDataTreeChangeListener) message, isLeader(), hasLeader()); } else if (message instanceof UpdateSchemaContext) { @@ -400,9 +395,7 @@ public class Shard extends RaftActor { responseMessageSlicer.slice(SliceOptions.builder().identifier(success.getTarget()) .message(envelope.newSuccessEnvelope(success, executionTimeNanos)) .sendTo(envelope.getMessage().getReplyTo()).replyTo(self()) - .onFailureCallback(t -> { - LOG.warn("Error slicing response {}", success, t); - }).build())); + .onFailureCallback(t -> LOG.warn("Error slicing response {}", success, t)).build())); } else { envelope.sendSuccess(success, executionTimeNanos); } @@ -479,7 +472,8 @@ public class Shard extends RaftActor { } if (cmp > 0) { LOG.debug("{}: rejecting request from outdated client {}", persistenceId(), clientId); - throw new RetiredGenerationException(existing.getIdentifier().getGeneration()); + throw new RetiredGenerationException(clientId.getGeneration(), + existing.getIdentifier().getGeneration()); } LOG.info("{}: retiring state {}, outdated by request from client {}", persistenceId(), existing, clientId); @@ -594,6 +588,10 @@ public class Shard extends RaftActor { return roleChangeNotifier; } + String getShardName() { + return shardName; + } + @Override protected LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId, final short leaderPayloadVersion) { @@ -762,7 +760,8 @@ public class Shard extends RaftActor { LOG.debug("{}: Forwarding ForwardedReadyTransaction to leader {}", persistenceId(), leader); ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(forwardedReady.getTransactionId(), - forwardedReady.getTransaction().getSnapshot(), forwardedReady.isDoImmediateCommit()); + forwardedReady.getTransaction().getSnapshot(), forwardedReady.isDoImmediateCommit(), + forwardedReady.getParticipatingShardNames()); readyLocal.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion()); leader.forward(readyLocal, getContext()); } @@ -883,7 +882,6 @@ public class Shard extends RaftActor { protected void onStateChanged() { boolean isLeader = isLeader(); boolean hasLeader = hasLeader(); - changeSupport.onLeadershipChange(isLeader, hasLeader); treeChangeSupport.onLeadershipChange(isLeader, hasLeader); // If this actor is no longer the leader close all the transaction chains @@ -932,6 +930,8 @@ public class Shard extends RaftActor { messagesToForward.size(), leader); for (Object message : messagesToForward) { + LOG.debug("{}: Forwarding pending transaction message {}", persistenceId(), message); + leader.tell(message, self()); } } @@ -986,7 +986,6 @@ public class Shard extends RaftActor { @Override protected OnDemandRaftState.AbstractBuilder newOnDemandRaftStateBuilder() { return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors()) - .dataChangeListenerActors(changeSupport.getListenerActors()) .commitCohortActors(store.getCohortActors()); } @@ -1025,7 +1024,7 @@ public class Shard extends RaftActor { private DatastoreContext datastoreContext; private SchemaContextProvider schemaContextProvider; private DatastoreSnapshot.ShardSnapshot restoreFromSnapshot; - private TipProducingDataTree dataTree; + private DataTree dataTree; private volatile boolean sealed; protected AbstractBuilder(final Class shardClass) { @@ -1059,9 +1058,9 @@ public class Shard extends RaftActor { return self(); } - public T schemaContextProvider(final SchemaContextProvider schemaContextProvider) { + public T schemaContextProvider(final SchemaContextProvider newSchemaContextProvider) { checkSealed(); - this.schemaContextProvider = Preconditions.checkNotNull(schemaContextProvider); + this.schemaContextProvider = Preconditions.checkNotNull(newSchemaContextProvider); return self(); } @@ -1071,7 +1070,7 @@ public class Shard extends RaftActor { return self(); } - public T dataTree(final TipProducingDataTree newDataTree) { + public T dataTree(final DataTree newDataTree) { checkSealed(); this.dataTree = newDataTree; return self(); @@ -1097,7 +1096,7 @@ public class Shard extends RaftActor { return restoreFromSnapshot; } - public TipProducingDataTree getDataTree() { + public DataTree getDataTree() { return dataTree; } @@ -1128,7 +1127,7 @@ public class Shard extends RaftActor { } public static class Builder extends AbstractBuilder { - private Builder() { + Builder() { super(Shard.class); } }