/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2016, 2017 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
-import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
+import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
import org.opendaylight.controller.cluster.datastore.Shard;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ModuleShardStrategy;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.controller.cluster.dom.api.CDSDataTreeProducer;
+import org.opendaylight.controller.cluster.dom.api.CDSShardAccess;
import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor.ShardedDataTreeActorCreator;
import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener;
import org.opendaylight.controller.cluster.sharding.messages.LookupPrefixShard;
private final ShardedDOMDataTree shardedDOMDataTree;
private final ActorSystem actorSystem;
- private final DistributedDataStore distributedOperDatastore;
- private final DistributedDataStore distributedConfigDatastore;
+ private final AbstractDataStore distributedOperDatastore;
+ private final AbstractDataStore distributedConfigDatastore;
private final ActorRef shardedDataTreeActor;
private final MemberName memberName;
private final PrefixedShardConfigUpdateHandler updateHandler;
public DistributedShardedDOMDataTree(final ActorSystemProvider actorSystemProvider,
- final DistributedDataStore distributedOperDatastore,
- final DistributedDataStore distributedConfigDatastore) {
+ final AbstractDataStore distributedOperDatastore,
+ final AbstractDataStore distributedConfigDatastore) {
this.actorSystem = Preconditions.checkNotNull(actorSystemProvider).getActorSystem();
this.distributedOperDatastore = Preconditions.checkNotNull(distributedOperDatastore);
this.distributedConfigDatastore = Preconditions.checkNotNull(distributedConfigDatastore);
createPrefixConfigShard(distributedOperDatastore);
}
- private void createPrefixConfigShard(final DistributedDataStore dataStore) {
+ private static void createPrefixConfigShard(final AbstractDataStore dataStore) {
Configuration configuration = dataStore.getActorContext().getConfiguration();
Collection<MemberName> memberNames = configuration.getUniqueMemberNamesForAllShards();
CreateShard createShardMessage =
LOG.debug("{} - Received success from remote nodes, creating producer:{}",
distributedConfigDatastore.getActorContext().getClusterWrapper().getCurrentMemberName(), subtrees);
return new ProxyProducer(producer, subtrees, shardedDataTreeActor,
- distributedConfigDatastore.getActorContext());
+ distributedConfigDatastore.getActorContext(), shards);
} else if (response instanceof Exception) {
closeProducer(producer);
throw Throwables.propagate((Exception) response);
private void createShardFrontend(final DOMDataTreeIdentifier prefix) {
LOG.debug("Member {}: Creating CDS shard for prefix: {}", memberName, prefix);
final String shardName = ClusterUtils.getCleanShardName(prefix.getRootIdentifier());
- final DistributedDataStore distributedDataStore =
+ final AbstractDataStore distributedDataStore =
prefix.getDatastoreType().equals(org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION)
? distributedConfigDatastore : distributedOperDatastore;
- try (final DOMDataTreeProducer producer = localCreateProducer(Collections.singletonList(prefix))) {
+ try (DOMDataTreeProducer producer = localCreateProducer(Collections.singletonList(prefix))) {
final Entry<DataStoreClient, ActorRef> entry =
createDatastoreClient(shardName, distributedDataStore.getActorContext());
final DistributedShardFrontend shard =
new DistributedShardFrontend(distributedDataStore, entry.getKey(), prefix);
- @SuppressWarnings("unchecked")
final DOMDataTreeShardRegistration<DOMDataTreeShard> reg =
- (DOMDataTreeShardRegistration) shardedDOMDataTree.registerDataTreeShard(prefix, shard, producer);
+ shardedDOMDataTree.registerDataTreeShard(prefix, shard, producer);
synchronized (shards) {
shards.store(prefix, reg);
Futures.addCallback(future, new FutureCallback<Void>() {
@Override
- public void onSuccess(@Nullable Void result) {
+ public void onSuccess(@Nullable final Void result) {
LOG.debug("{} - Succesfuly removed shard for {}", memberName, prefix);
}
@Override
- public void onFailure(Throwable throwable) {
+ public void onFailure(final Throwable throwable) {
LOG.error("Removal of shard {} from configuration failed.", prefix, throwable);
}
});
DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookupShardFrontend(
final DOMDataTreeIdentifier prefix) {
- return shards.lookup(prefix);
-
+ synchronized (shards) {
+ return shards.lookup(prefix);
+ }
}
DOMDataTreeProducer localCreateProducer(final Collection<DOMDataTreeIdentifier> prefix) {
final Future<Void> closeFuture = ask.transform(
new Mapper<Object, Void>() {
@Override
- public Void apply(Object parameter) {
+ public Void apply(final Object parameter) {
return null;
}
},
new Mapper<Throwable, Throwable>() {
@Override
- public Throwable apply(Throwable throwable) {
+ public Throwable apply(final Throwable throwable) {
return throwable;
}
}, actorSystem.dispatcher());
}
}
- private static final class ProxyProducer extends ForwardingObject implements DOMDataTreeProducer {
+ // TODO what about producers created by this producer?
+ // They should also be CDSProducers
+ private static final class ProxyProducer extends ForwardingObject implements CDSDataTreeProducer {
private final DOMDataTreeProducer delegate;
private final Collection<DOMDataTreeIdentifier> subtrees;
private final ActorRef shardDataTreeActor;
private final ActorContext actorContext;
+ @GuardedBy("shardAccessMap")
+ private final Map<DOMDataTreeIdentifier, CDSShardAccessImpl> shardAccessMap = new HashMap<>();
+
+ // We don't have to guard access to shardTable in ProxyProducer.
+ // ShardTable's entries relevant to this ProxyProducer shouldn't
+ // change during producer's lifetime.
+ private final DOMDataTreePrefixTable<DOMDataTreeShardRegistration<DOMDataTreeShard>> shardTable;
ProxyProducer(final DOMDataTreeProducer delegate,
final Collection<DOMDataTreeIdentifier> subtrees,
final ActorRef shardDataTreeActor,
- final ActorContext actorContext) {
+ final ActorContext actorContext,
+ final DOMDataTreePrefixTable<DOMDataTreeShardRegistration<DOMDataTreeShard>> shardLayout) {
this.delegate = Preconditions.checkNotNull(delegate);
this.subtrees = Preconditions.checkNotNull(subtrees);
this.shardDataTreeActor = Preconditions.checkNotNull(shardDataTreeActor);
this.actorContext = Preconditions.checkNotNull(actorContext);
+ this.shardTable = Preconditions.checkNotNull(shardLayout);
}
@Nonnull
}
@Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
public void close() throws DOMDataTreeProducerException {
delegate.close();
+ synchronized (shardAccessMap) {
+ shardAccessMap.values().forEach(CDSShardAccessImpl::close);
+ }
+
final Object o = actorContext.executeOperation(shardDataTreeActor, new ProducerRemoved(subtrees));
if (o instanceof DOMDataTreeProducerException) {
- throw ((DOMDataTreeProducerException) o);
+ throw (DOMDataTreeProducerException) o;
} else if (o instanceof Throwable) {
throw new DOMDataTreeProducerException("Unable to close producer", (Throwable) o);
}
protected DOMDataTreeProducer delegate() {
return delegate;
}
+
+ @Nonnull
+ @Override
+ public CDSShardAccess getShardAccess(@Nonnull final DOMDataTreeIdentifier subtree) {
+ Preconditions.checkArgument(
+ subtrees.stream().anyMatch(dataTreeIdentifier -> dataTreeIdentifier.contains(subtree)),
+ "Subtree {} is not controlled by this producer {}", subtree, this);
+
+ final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookup =
+ shardTable.lookup(subtree);
+ Preconditions.checkState(lookup != null, "Subtree {} is not contained in any registered shard.");
+
+ final DOMDataTreeIdentifier lookupId = lookup.getValue().getPrefix();
+
+ synchronized (shardAccessMap) {
+ if (shardAccessMap.get(lookupId) != null) {
+ return shardAccessMap.get(lookupId);
+ }
+
+ // TODO Maybe we can have static factory method and return the same instance
+ // for same subtrees. But maybe it is not needed since there can be only one
+ // producer attached to some subtree at a time. And also how we can close ShardAccess
+ // then
+ final CDSShardAccessImpl shardAccess = new CDSShardAccessImpl(lookupId, actorContext);
+ shardAccessMap.put(lookupId, shardAccess);
+ return shardAccess;
+ }
+ }
}
}