import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.datastore.messages.PersistAbortTransactionPayload;
import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
/// The name of this shard
private final String name;
+ private final String shardName;
+
private final ShardStats shardMBean;
private final ShardDataTreeListenerInfoMXBeanImpl listenerInfoMXBean;
private final ShardSnapshotCohort snapshotCohort;
private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this);
- private final DataChangeListenerSupport changeSupport = new DataChangeListenerSupport(this);
-
private ShardSnapshot restoreFromSnapshot;
Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
this.name = builder.getId().toString();
+ this.shardName = builder.getId().getShardName();
this.datastoreContext = builder.getDatastoreContext();
this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
this.frontendMetadata = new FrontendMetadata(name);
ShardDataTreeChangeListenerPublisherActorProxy treeChangeListenerPublisher =
new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher", name);
- ShardDataChangeListenerPublisherActorProxy dataChangeListenerPublisher =
- new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher", name);
if (builder.getDataTree() != null) {
store = new ShardDataTree(this, builder.getSchemaContext(), builder.getDataTree(),
- treeChangeListenerPublisher, dataChangeListenerPublisher, name, frontendMetadata);
+ treeChangeListenerPublisher, name, frontendMetadata);
} else {
store = new ShardDataTree(this, builder.getSchemaContext(), builder.getTreeType(),
- builder.getDatastoreContext().getStoreRoot(), treeChangeListenerPublisher,
- dataChangeListenerPublisher, name, frontendMetadata);
+ builder.getDatastoreContext().getStoreRoot(), treeChangeListenerPublisher, name, frontendMetadata);
}
shardMBean = ShardMBeanFactory.getShardStatsMBean(name, datastoreContext.getDataStoreMXBeanType(), this);
handleAbortTransaction(AbortTransaction.fromSerializable(message));
} else if (CloseTransactionChain.isSerializedType(message)) {
closeTransactionChain(CloseTransactionChain.fromSerializable(message));
- } else if (message instanceof RegisterChangeListener) {
- changeSupport.onMessage((RegisterChangeListener) message, isLeader(), hasLeader());
} else if (message instanceof RegisterDataTreeChangeListener) {
treeChangeSupport.onMessage((RegisterDataTreeChangeListener) message, isLeader(), hasLeader());
} else if (message instanceof UpdateSchemaContext) {
responseMessageSlicer.slice(SliceOptions.builder().identifier(success.getTarget())
.message(envelope.newSuccessEnvelope(success, executionTimeNanos))
.sendTo(envelope.getMessage().getReplyTo()).replyTo(self())
- .onFailureCallback(t -> {
- LOG.warn("Error slicing response {}", success, t);
- }).build()));
+ .onFailureCallback(t -> LOG.warn("Error slicing response {}", success, t)).build()));
} else {
envelope.sendSuccess(success, executionTimeNanos);
}
}
if (cmp > 0) {
LOG.debug("{}: rejecting request from outdated client {}", persistenceId(), clientId);
- throw new RetiredGenerationException(existing.getIdentifier().getGeneration());
+ throw new RetiredGenerationException(clientId.getGeneration(),
+ existing.getIdentifier().getGeneration());
}
LOG.info("{}: retiring state {}, outdated by request from client {}", persistenceId(), existing, clientId);
return roleChangeNotifier;
}
+ String getShardName() {
+ return shardName;
+ }
+
@Override
protected LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId,
final short leaderPayloadVersion) {
LOG.debug("{}: Forwarding ForwardedReadyTransaction to leader {}", persistenceId(), leader);
ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(forwardedReady.getTransactionId(),
- forwardedReady.getTransaction().getSnapshot(), forwardedReady.isDoImmediateCommit());
+ forwardedReady.getTransaction().getSnapshot(), forwardedReady.isDoImmediateCommit(),
+ forwardedReady.getParticipatingShardNames());
readyLocal.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
leader.forward(readyLocal, getContext());
}
protected void onStateChanged() {
boolean isLeader = isLeader();
boolean hasLeader = hasLeader();
- changeSupport.onLeadershipChange(isLeader, hasLeader);
treeChangeSupport.onLeadershipChange(isLeader, hasLeader);
// If this actor is no longer the leader close all the transaction chains
messagesToForward.size(), leader);
for (Object message : messagesToForward) {
+ LOG.debug("{}: Forwarding pending transaction message {}", persistenceId(), message);
+
leader.tell(message, self());
}
}
@Override
protected OnDemandRaftState.AbstractBuilder<?, ?> newOnDemandRaftStateBuilder() {
return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors())
- .dataChangeListenerActors(changeSupport.getListenerActors())
.commitCohortActors(store.getCohortActors());
}
private DatastoreContext datastoreContext;
private SchemaContextProvider schemaContextProvider;
private DatastoreSnapshot.ShardSnapshot restoreFromSnapshot;
- private TipProducingDataTree dataTree;
+ private DataTree dataTree;
private volatile boolean sealed;
protected AbstractBuilder(final Class<S> shardClass) {
return self();
}
- public T schemaContextProvider(final SchemaContextProvider schemaContextProvider) {
+ public T schemaContextProvider(final SchemaContextProvider newSchemaContextProvider) {
checkSealed();
- this.schemaContextProvider = Preconditions.checkNotNull(schemaContextProvider);
+ this.schemaContextProvider = Preconditions.checkNotNull(newSchemaContextProvider);
return self();
}
return self();
}
- public T dataTree(final TipProducingDataTree newDataTree) {
+ public T dataTree(final DataTree newDataTree) {
checkSealed();
this.dataTree = newDataTree;
return self();
return restoreFromSnapshot;
}
- public TipProducingDataTree getDataTree() {
+ public DataTree getDataTree() {
return dataTree;
}
}
public static class Builder extends AbstractBuilder<Builder, Shard> {
- private Builder() {
+ Builder() {
super(Shard.class);
}
}