import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.common.actor.Dispatchers;
import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
import org.opendaylight.controller.cluster.databroker.actors.dds.DistributedDataStoreClientActor;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManagerCreator;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataTreeChangeListener;
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher;
import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
* Base implementation of a distributed DOMStore.
*/
public abstract class AbstractDataStore implements DistributedDataStoreInterface, SchemaContextListener,
- DatastoreContextConfigAdminOverlay.Listener, DOMStoreTreeChangePublisher,
+ DatastoreContextPropertiesUpdater.Listener, DOMStoreTreeChangePublisher,
DOMDataTreeCommitCohortRegistry, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(AbstractDataStore.class);
.datastoreContextFactory(datastoreContextFactory)
.waitTillReadyCountDownLatch(waitTillReadyCountDownLatch)
.primaryShardInfoCache(primaryShardInfoCache)
- .restoreFromSnapshot(restoreFromSnapshot);
+ .restoreFromSnapshot(restoreFromSnapshot)
+ .distributedDataStore(this);
actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, creator, shardDispatcher,
shardManagerId), cluster, configuration, datastoreContextFactory.getBaseDatastoreContext(),
} catch (Exception e) {
LOG.error("Failed to get actor for {}", clientProps, e);
clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
- throw Throwables.propagate(e);
+ Throwables.throwIfUnchecked(e);
+ throw new RuntimeException(e);
}
identifier = client.getIdentifier();
.duration().toMillis() * READY_WAIT_FACTOR;
}
+ @VisibleForTesting
+ protected AbstractDataStore(final ActorContext actorContext, final ClientIdentifier identifier,
+ final DataStoreClient clientActor) {
+ this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
+ this.client = clientActor;
+ this.identifier = Preconditions.checkNotNull(identifier);
+ this.waitTillReadyTimeInMillis = actorContext.getDatastoreContext().getShardLeaderElectionTimeout()
+ .duration().toMillis() * READY_WAIT_FACTOR;
+ }
+
protected final DataStoreClient getClient() {
return client;
}
for (int i = 0; i < 100; i++) {
try {
- return actorSystem.actorOf(creator.props().withDispatcher(shardDispatcher).withMailbox(
- ActorContext.BOUNDED_MAILBOX), shardManagerId);
+ return actorSystem.actorOf(creator.props().withDispatcher(shardDispatcher), shardManagerId);
} catch (Exception e) {
lastException = e;
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
public CountDownLatch getWaitTillReadyCountDownLatch() {
return waitTillReadyCountDownLatch;
}
+
+ @SuppressWarnings("unchecked")
+ public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerProxyListener(
+ final YangInstanceIdentifier shardLookup,
+ final YangInstanceIdentifier insideShard,
+ final org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener delegate) {
+
+ Preconditions.checkNotNull(shardLookup, "shardLookup should not be null");
+ Preconditions.checkNotNull(insideShard, "insideShard should not be null");
+ Preconditions.checkNotNull(delegate, "delegate should not be null");
+
+ final String shardName = actorContext.getShardStrategyFactory().getStrategy(shardLookup).findShard(shardLookup);
+ LOG.debug("Registering tree listener: {} for tree: {} shard: {}, path inside shard: {}",
+ delegate,shardLookup, shardName, insideShard);
+
+ final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> listenerRegistrationProxy =
+ new DataTreeChangeListenerProxy<>(actorContext,
+ // wrap this in the ClusteredDOMDataTreeChangeLister interface
+ // since we always want clustered registration
+ (ClusteredDOMDataTreeChangeListener) delegate::onDataTreeChanged, insideShard);
+ listenerRegistrationProxy.init(shardName);
+
+ return (ListenerRegistration<L>) listenerRegistrationProxy;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerShardConfigListener(
+ final YangInstanceIdentifier internalPath,
+ final DOMDataTreeChangeListener delegate) {
+ Preconditions.checkNotNull(delegate, "delegate should not be null");
+
+ LOG.debug("Registering a listener for the configuration shard: {}", internalPath);
+
+ final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy =
+ new DataTreeChangeListenerProxy<>(actorContext, delegate, internalPath);
+ proxy.init(ClusterUtils.PREFIX_CONFIG_SHARD_ID);
+
+ return (ListenerRegistration<L>) proxy;
+ }
+
}