import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
+import static com.google.common.base.Verify.verifyNotNull;
import static java.util.Objects.requireNonNull;
import akka.actor.ActorRef;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Range;
import java.io.IOException;
import java.util.Arrays;
import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardDataTreeListenerInfoMXBeanImpl;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.GetKnownClients;
+import org.opendaylight.controller.cluster.datastore.messages.GetKnownClientsReply;
import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal;
import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContextProvider;
import scala.concurrent.duration.FiniteDuration;
/**
private static final Collection<ABIVersion> SUPPORTED_ABIVERSIONS;
+ // Make sure to keep this in sync with the journal configuration in factory-akka.conf
+ public static final String NON_PERSISTENT_JOURNAL_ID = "akka.persistence.non-persistent.journal";
+
static {
final ABIVersion[] values = ABIVersion.values();
final ABIVersion[] real = Arrays.copyOfRange(values, 1, values.length - 1);
new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher", name);
if (builder.getDataTree() != null) {
store = new ShardDataTree(this, builder.getSchemaContext(), builder.getDataTree(),
- treeChangeListenerPublisher, name, frontendMetadata);
+ treeChangeListenerPublisher, name,
+ frontendMetadata);
} else {
store = new ShardDataTree(this, builder.getSchemaContext(), builder.getTreeType(),
- builder.getDatastoreContext().getStoreRoot(), treeChangeListenerPublisher, name, frontendMetadata);
+ builder.getDatastoreContext().getStoreRoot(), treeChangeListenerPublisher, name,
+ frontendMetadata);
}
- shardMBean = ShardMBeanFactory.getShardStatsMBean(name, datastoreContext.getDataStoreMXBeanType(), this);
+ shardMBean = ShardStats.create(name, datastoreContext.getDataStoreMXBeanType(), this);
if (isMetricsCaptureEnabled()) {
getContext().become(new MeteringBehavior(this));
self(), getContext(), shardMBean, builder.getId().getShardName());
snapshotCohort = ShardSnapshotCohort.create(getContext(), builder.getId().getMemberName(), store, LOG,
- this.name);
+ this.name, datastoreContext);
messageRetrySupport = new ShardTransactionMessageRetrySupport(this);
onMakeLeaderLocal();
} else if (RESUME_NEXT_PENDING_TRANSACTION.equals(message)) {
store.resumeNextPendingTransaction();
+ } else if (GetKnownClients.INSTANCE.equals(message)) {
+ handleGetKnownClients();
} else if (!responseMessageSlicer.handleMessage(message)) {
super.handleNonRaftCommand(message);
}
}
}
+ private void handleGetKnownClients() {
+ final ImmutableSet<ClientIdentifier> clients;
+ if (isLeader()) {
+ clients = knownFrontends.values().stream()
+ .map(LeaderFrontendState::getIdentifier)
+ .collect(ImmutableSet.toImmutableSet());
+ } else {
+ clients = frontendMetadata.getClients();
+ }
+ sender().tell(new GetKnownClientsReply(clients), self());
+ }
+
private boolean hasLeader() {
return getLeaderId() != null;
}
}
protected void onDatastoreContext(final DatastoreContext context) {
- datastoreContext = context;
+ datastoreContext = verifyNotNull(context);
setTransactionCommitTimeout();
}
private void updateSchemaContext(final UpdateSchemaContext message) {
- updateSchemaContext(message.getSchemaContext());
+ updateSchemaContext(message.getEffectiveModelContext());
}
@VisibleForTesting
- void updateSchemaContext(final SchemaContext schemaContext) {
+ void updateSchemaContext(final @NonNull EffectiveModelContext schemaContext) {
store.updateSchemaContext(schemaContext);
}
return this.name;
}
+ @Override
+ public String journalPluginId() {
+ // This method may be invoked from super constructor (wonderful), hence we also need to handle the case of
+ // the field being uninitialized because our constructor is not finished.
+ if (datastoreContext != null && !datastoreContext.isPersistent()) {
+ return NON_PERSISTENT_JOURNAL_ID;
+ }
+ return super.journalPluginId();
+ }
+
@VisibleForTesting
ShardCommitCoordinator getCommitCoordinator() {
return commitCoordinator;
private ShardIdentifier id;
private Map<String, String> peerAddresses = Collections.emptyMap();
private DatastoreContext datastoreContext;
- private SchemaContextProvider schemaContextProvider;
+ private EffectiveModelContextProvider schemaContextProvider;
private DatastoreSnapshot.ShardSnapshot restoreFromSnapshot;
private DataTree dataTree;
return self();
}
- public T schemaContextProvider(final SchemaContextProvider newSchemaContextProvider) {
+ public T schemaContextProvider(final EffectiveModelContextProvider newSchemaContextProvider) {
checkSealed();
this.schemaContextProvider = requireNonNull(newSchemaContextProvider);
return self();
return datastoreContext;
}
- public SchemaContext getSchemaContext() {
- return Verify.verifyNotNull(schemaContextProvider.getSchemaContext());
+ public EffectiveModelContext getSchemaContext() {
+ return Verify.verifyNotNull(schemaContextProvider.getEffectiveModelContext());
}
public DatastoreSnapshot.ShardSnapshot getRestoreFromSnapshot() {