import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
-import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry;
import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
private final ShardStats shardMBean;
- private final List<ActorSelection> dataChangeListeners = Lists.newArrayList();
-
- private final List<DelayedListenerRegistration> delayedListenerRegistrations =
- Lists.newArrayList();
-
private DatastoreContext datastoreContext;
- private DataPersistenceProvider dataPersistenceProvider;
-
private SchemaContext schemaContext;
private int createSnapshotTransactionCounter;
private final String txnDispatcherPath;
+ private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this);
+ private final DataChangeListenerSupport changeSupport = new DataChangeListenerSupport(this);
+
protected Shard(final ShardIdentifier name, final Map<String, String> peerAddresses,
final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig()));
this.name = name.toString();
this.datastoreContext = datastoreContext;
this.schemaContext = schemaContext;
- this.dataPersistenceProvider = (datastoreContext.isPersistent())
- ? new PersistentDataProvider() : new NonPersistentRaftDataProvider();
this.txnDispatcherPath = new Dispatchers(context().system().dispatchers())
.getDispatcherPath(Dispatchers.DispatcherType.Transaction);
+ setPersistence(datastoreContext.isPersistent());
LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
datastoreContext.getDataStoreProperties());
- if(schemaContext != null) {
+ if (schemaContext != null) {
store.onGlobalContextUpdated(schemaContext);
}
} else if (CloseTransactionChain.SERIALIZABLE_CLASS.isInstance(message)) {
closeTransactionChain(CloseTransactionChain.fromSerializable(message));
} else if (message instanceof RegisterChangeListener) {
- registerChangeListener((RegisterChangeListener) message);
+ changeSupport.onMessage((RegisterChangeListener) message, isLeader());
+ } else if (message instanceof RegisterDataTreeChangeListener) {
+ treeChangeSupport.onMessage((RegisterDataTreeChangeListener) message, isLeader());
} else if (message instanceof UpdateSchemaContext) {
updateSchemaContext((UpdateSchemaContext) message);
} else if (message instanceof PeerAddressResolved) {
setTransactionCommitTimeout();
- if(datastoreContext.isPersistent() &&
- dataPersistenceProvider instanceof NonPersistentRaftDataProvider) {
- dataPersistenceProvider = new PersistentDataProvider();
- } else if(!datastoreContext.isPersistent() &&
- dataPersistenceProvider instanceof PersistentDataProvider) {
- dataPersistenceProvider = new NonPersistentRaftDataProvider();
+ if(datastoreContext.isPersistent() && !persistence().isRecoveryApplicable()) {
+ setPersistence(true);
+ } else if(!datastoreContext.isPersistent() && persistence().isRecoveryApplicable()) {
+ setPersistence(false);
}
updateConfigParams(datastoreContext.getShardRaftConfig());
store.onGlobalContextUpdated(schemaContext);
}
- private void registerChangeListener(final RegisterChangeListener registerChangeListener) {
-
- LOG.debug("{}: registerDataChangeListener for {}", persistenceId(), registerChangeListener.getPath());
-
- ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
- NormalizedNode<?, ?>>> registration;
- if(isLeader()) {
- registration = doChangeListenerRegistration(registerChangeListener);
- } else {
- LOG.debug("{}: Shard is not the leader - delaying registration", persistenceId());
-
- DelayedListenerRegistration delayedReg =
- new DelayedListenerRegistration(registerChangeListener);
- delayedListenerRegistrations.add(delayedReg);
- registration = delayedReg;
- }
-
- ActorRef listenerRegistration = getContext().actorOf(
- DataChangeListenerRegistration.props(registration));
-
- LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
- persistenceId(), listenerRegistration.path());
-
- getSender().tell(new RegisterChangeListenerReply(listenerRegistration), getSelf());
- }
-
- private ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
- NormalizedNode<?, ?>>> doChangeListenerRegistration(
- final RegisterChangeListener registerChangeListener) {
-
- ActorSelection dataChangeListenerPath = getContext().system().actorSelection(
- registerChangeListener.getDataChangeListenerPath());
-
- // Notify the listener if notifications should be enabled or not
- // If this shard is the leader then it will enable notifications else
- // it will not
- dataChangeListenerPath.tell(new EnableNotification(true), getSelf());
-
- // Now store a reference to the data change listener so it can be notified
- // at a later point if notifications should be enabled or disabled
- dataChangeListeners.add(dataChangeListenerPath);
-
- AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
- new DataChangeListenerProxy(dataChangeListenerPath);
-
- LOG.debug("{}: Registering for path {}", persistenceId(), registerChangeListener.getPath());
-
- return store.registerChangeListener(registerChangeListener.getPath(), listener,
- registerChangeListener.getScope());
- }
-
- private boolean isMetricsCaptureEnabled(){
+ private boolean isMetricsCaptureEnabled() {
CommonConfig config = new CommonConfig(getContext().system().settings().config());
return config.isMetricCaptureEnabled();
}
@Override
protected void onStateChanged() {
boolean isLeader = isLeader();
- for (ActorSelection dataChangeListener : dataChangeListeners) {
- dataChangeListener.tell(new EnableNotification(isLeader), getSelf());
- }
-
- if(isLeader) {
- for(DelayedListenerRegistration reg: delayedListenerRegistrations) {
- if(!reg.isClosed()) {
- reg.setDelegate(doChangeListenerRegistration(reg.getRegisterChangeListener()));
- }
- }
-
- delayedListenerRegistrations.clear();
- }
+ changeSupport.onLeadershipChange(isLeader);
+ treeChangeSupport.onLeadershipChange(isLeader);
// If this actor is no longer the leader close all the transaction chains
- if(!isLeader) {
+ if (!isLeader) {
if(LOG.isDebugEnabled()) {
LOG.debug(
"{}: onStateChanged: Closing all transaction chains because shard {} is no longer the leader",
}
@Override
- protected DataPersistenceProvider persistence() {
- return dataPersistenceProvider;
- }
-
- @Override public String persistenceId() {
+ public String persistenceId() {
return this.name;
}
- @VisibleForTesting
- DataPersistenceProvider getDataPersistenceProvider() {
- return dataPersistenceProvider;
- }
-
@VisibleForTesting
ShardCommitCoordinator getCommitCoordinator() {
return commitCoordinator;
ShardStats getShardMBean() {
return shardMBean;
}
-
- private static class DelayedListenerRegistration implements
- ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> {
-
- private volatile boolean closed;
-
- private final RegisterChangeListener registerChangeListener;
-
- private volatile ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
- NormalizedNode<?, ?>>> delegate;
-
- DelayedListenerRegistration(final RegisterChangeListener registerChangeListener) {
- this.registerChangeListener = registerChangeListener;
- }
-
- void setDelegate( final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
- NormalizedNode<?, ?>>> registration) {
- this.delegate = registration;
- }
-
- boolean isClosed() {
- return closed;
- }
-
- RegisterChangeListener getRegisterChangeListener() {
- return registerChangeListener;
- }
-
- @Override
- public AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> getInstance() {
- return delegate != null ? delegate.getInstance() : null;
- }
-
- @Override
- public void close() {
- closed = true;
- if(delegate != null) {
- delegate.close();
- }
- }
- }
}