/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2016, 2017 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
-import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
+import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
import org.opendaylight.controller.cluster.datastore.Shard;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ModuleShardStrategy;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.controller.cluster.dom.api.CDSDataTreeProducer;
+import org.opendaylight.controller.cluster.dom.api.CDSShardAccess;
import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor.ShardedDataTreeActorCreator;
import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener;
import org.opendaylight.controller.cluster.sharding.messages.LookupPrefixShard;
private final ShardedDOMDataTree shardedDOMDataTree;
private final ActorSystem actorSystem;
- private final DistributedDataStore distributedOperDatastore;
- private final DistributedDataStore distributedConfigDatastore;
+ private final AbstractDataStore distributedOperDatastore;
+ private final AbstractDataStore distributedConfigDatastore;
private final ActorRef shardedDataTreeActor;
private final MemberName memberName;
private final PrefixedShardConfigUpdateHandler updateHandler;
public DistributedShardedDOMDataTree(final ActorSystemProvider actorSystemProvider,
- final DistributedDataStore distributedOperDatastore,
- final DistributedDataStore distributedConfigDatastore) {
+ final AbstractDataStore distributedOperDatastore,
+ final AbstractDataStore distributedConfigDatastore) {
this.actorSystem = Preconditions.checkNotNull(actorSystemProvider).getActorSystem();
this.distributedOperDatastore = Preconditions.checkNotNull(distributedOperDatastore);
this.distributedConfigDatastore = Preconditions.checkNotNull(distributedConfigDatastore);
createPrefixConfigShard(distributedOperDatastore);
}
- private void createPrefixConfigShard(final DistributedDataStore dataStore) {
+ private void createPrefixConfigShard(final AbstractDataStore dataStore) {
Configuration configuration = dataStore.getActorContext().getConfiguration();
Collection<MemberName> memberNames = configuration.getUniqueMemberNamesForAllShards();
CreateShard createShardMessage =
private void createShardFrontend(final DOMDataTreeIdentifier prefix) {
LOG.debug("Member {}: Creating CDS shard for prefix: {}", memberName, prefix);
final String shardName = ClusterUtils.getCleanShardName(prefix.getRootIdentifier());
- final DistributedDataStore distributedDataStore =
+ final AbstractDataStore distributedDataStore =
prefix.getDatastoreType().equals(org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION)
? distributedConfigDatastore : distributedOperDatastore;
- try (final DOMDataTreeProducer producer = localCreateProducer(Collections.singletonList(prefix))) {
+ try (DOMDataTreeProducer producer = localCreateProducer(Collections.singletonList(prefix))) {
final Entry<DataStoreClient, ActorRef> entry =
createDatastoreClient(shardName, distributedDataStore.getActorContext());
DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookupShardFrontend(
final DOMDataTreeIdentifier prefix) {
- return shards.lookup(prefix);
-
+ synchronized (shards) {
+ return shards.lookup(prefix);
+ }
}
DOMDataTreeProducer localCreateProducer(final Collection<DOMDataTreeIdentifier> prefix) {
final Future<Void> closeFuture = ask.transform(
new Mapper<Object, Void>() {
@Override
- public Void apply(Object parameter) {
+ public Void apply(final Object parameter) {
return null;
}
},
new Mapper<Throwable, Throwable>() {
@Override
- public Throwable apply(Throwable throwable) {
+ public Throwable apply(final Throwable throwable) {
return throwable;
}
}, actorSystem.dispatcher());
}
}
- private static final class ProxyProducer extends ForwardingObject implements DOMDataTreeProducer {
+ // TODO what about producers created by this producer?
+ // They should also be CDSProducers
+ private static final class ProxyProducer extends ForwardingObject implements CDSDataTreeProducer {
private final DOMDataTreeProducer delegate;
private final Collection<DOMDataTreeIdentifier> subtrees;
private final ActorRef shardDataTreeActor;
private final ActorContext actorContext;
+ @GuardedBy("shardAccessMap")
+ private final Map<DOMDataTreeIdentifier, CDSShardAccessImpl> shardAccessMap = new HashMap<>();
ProxyProducer(final DOMDataTreeProducer delegate,
final Collection<DOMDataTreeIdentifier> subtrees,
}
@Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
public void close() throws DOMDataTreeProducerException {
delegate.close();
+ synchronized (shardAccessMap) {
+ shardAccessMap.values().forEach(CDSShardAccessImpl::close);
+ }
final Object o = actorContext.executeOperation(shardDataTreeActor, new ProducerRemoved(subtrees));
if (o instanceof DOMDataTreeProducerException) {
- throw ((DOMDataTreeProducerException) o);
+ throw (DOMDataTreeProducerException) o;
} else if (o instanceof Throwable) {
throw new DOMDataTreeProducerException("Unable to close producer", (Throwable) o);
}
protected DOMDataTreeProducer delegate() {
return delegate;
}
+
+ @Nonnull
+ @Override
+ public CDSShardAccess getShardAccess(@Nonnull final DOMDataTreeIdentifier subtree) {
+ synchronized (shardAccessMap) {
+ Preconditions.checkArgument(subtrees.contains(subtree),
+ "Subtree {} is not controlled by this producer {}", subtree, this);
+ if (shardAccessMap.get(subtree) != null) {
+ return shardAccessMap.get(subtree);
+ }
+
+ // TODO Maybe we can have static factory method and return the same instance
+ // for same subtrees. But maybe it is not needed since there can be only one
+ // producer attached to some subtree at a time. And also how we can close ShardAccess
+ // then
+ final CDSShardAccessImpl shardAccess = new CDSShardAccessImpl(subtree, actorContext);
+ return shardAccessMap.put(subtree, shardAccess);
+ }
+ }
}
}