Simplify code using Java 8 features
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / sharding / DistributedShardedDOMDataTree.java
index 35f5e998825951829be49b5e57e36a0ce3615cd4..64c3f14dfb5dde59806c31dd3f92e92af745ebc7 100644 (file)
@@ -21,16 +21,19 @@ import akka.util.Timeout;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
+import com.google.common.collect.ClassToInstanceMap;
 import com.google.common.collect.ForwardingObject;
+import com.google.common.collect.ImmutableClassToInstanceMap;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.AbstractMap.SimpleEntry;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.List;
@@ -73,6 +76,7 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeServiceExtension;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingService;
@@ -86,7 +90,6 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.compat.java8.FutureConverters;
-import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.Promise;
 import scala.concurrent.duration.FiniteDuration;
@@ -122,9 +125,6 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
     private final DOMDataTreePrefixTable<DOMDataTreeShardRegistration<DOMDataTreeShard>> shards =
             DOMDataTreePrefixTable.create();
 
-    private final EnumMap<LogicalDatastoreType, DistributedShardRegistration> defaultShardRegistrations =
-            new EnumMap<>(LogicalDatastoreType.class);
-
     private final EnumMap<LogicalDatastoreType, Entry<DataStoreClient, ActorRef>> configurationShardMap =
             new EnumMap<>(LogicalDatastoreType.class);
 
@@ -195,21 +195,17 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
     public void init() {
         // create our writers to the configuration
         try {
-            LOG.debug("{} - starting config shard lookup.",
-                    distributedConfigDatastore.getActorContext().getCurrentMemberName());
+            LOG.debug("{} - starting config shard lookup.", memberName);
 
             // We have to wait for prefix config shards to be up and running
             // so we can create datastore clients for them
             handleConfigShardLookup().get(SHARD_FUTURE_TIMEOUT_DURATION.length(), SHARD_FUTURE_TIMEOUT_DURATION.unit());
-
-            LOG.debug("Prefix configuration shards ready - creating clients");
-
         } catch (InterruptedException | ExecutionException | TimeoutException e) {
             throw new IllegalStateException("Prefix config shards not found", e);
         }
 
         try {
-            LOG.debug("Prefix configuration shards ready - creating clients");
+            LOG.debug("{}: Prefix configuration shards ready - creating clients", memberName);
             configurationShardMap.put(LogicalDatastoreType.CONFIGURATION,
                     createDatastoreClient(ClusterUtils.PREFIX_CONFIG_SHARD_ID,
                             distributedConfigDatastore.getActorContext()));
@@ -242,19 +238,8 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
 
 
         //create shard registration for DEFAULT_SHARD
-        try {
-            defaultShardRegistrations.put(LogicalDatastoreType.CONFIGURATION,
-                    initDefaultShard(LogicalDatastoreType.CONFIGURATION));
-        } catch (final InterruptedException | ExecutionException e) {
-            throw new IllegalStateException("Unable to create default shard frontend for config shard", e);
-        }
-
-        try {
-            defaultShardRegistrations.put(LogicalDatastoreType.OPERATIONAL,
-                    initDefaultShard(LogicalDatastoreType.OPERATIONAL));
-        } catch (final InterruptedException | ExecutionException e) {
-            throw new IllegalStateException("Unable to create default shard frontend for operational shard", e);
-        }
+        initDefaultShard(LogicalDatastoreType.CONFIGURATION);
+        initDefaultShard(LogicalDatastoreType.OPERATIONAL);
     }
 
     private ListenableFuture<List<Void>> handleConfigShardLookup() {
@@ -273,7 +258,7 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
 
         ask.onComplete(new OnComplete<Object>() {
             @Override
-            public void onComplete(final Throwable throwable, final Object result) throws Throwable {
+            public void onComplete(final Throwable throwable, final Object result) {
                 if (throwable != null) {
                     future.setException(throwable);
                 } else {
@@ -294,27 +279,32 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
         return shardedDOMDataTree.registerListener(listener, subtrees, allowRxMerges, producers);
     }
 
+    @Override
+    public ClassToInstanceMap<DOMDataTreeServiceExtension> getExtensions() {
+        return ImmutableClassToInstanceMap.of();
+    }
+
     @Nonnull
     @Override
     public DOMDataTreeProducer createProducer(@Nonnull final Collection<DOMDataTreeIdentifier> subtrees) {
-        LOG.debug("{} - Creating producer for {}",
-                distributedConfigDatastore.getActorContext().getClusterWrapper().getCurrentMemberName(), subtrees);
+        LOG.debug("{} - Creating producer for {}", memberName, subtrees);
         final DOMDataTreeProducer producer = shardedDOMDataTree.createProducer(subtrees);
 
         final Object response = distributedConfigDatastore.getActorContext()
                 .executeOperation(shardedDataTreeActor, new ProducerCreated(subtrees));
         if (response == null) {
-            LOG.debug("{} - Received success from remote nodes, creating producer:{}",
-                    distributedConfigDatastore.getActorContext().getClusterWrapper().getCurrentMemberName(), subtrees);
+            LOG.debug("{} - Received success from remote nodes, creating producer:{}", memberName, subtrees);
             return new ProxyProducer(producer, subtrees, shardedDataTreeActor,
                     distributedConfigDatastore.getActorContext(), shards);
-        } else if (response instanceof Exception) {
-            closeProducer(producer);
-            throw Throwables.propagate((Exception) response);
-        } else {
-            closeProducer(producer);
-            throw new RuntimeException("Unexpected response to create producer received." + response);
         }
+
+        closeProducer(producer);
+
+        if (response instanceof Throwable) {
+            Throwables.throwIfUnchecked((Throwable) response);
+            throw new RuntimeException((Throwable) response);
+        }
+        throw new RuntimeException("Unexpected response to create producer received." + response);
     }
 
     @Override
@@ -366,37 +356,29 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
                 shardRegistrationPromise.failure(
                         new DOMDataTreeShardCreationFailedException("Unable to create a cds shard.", throwable));
             }
-        });
+        }, MoreExecutors.directExecutor());
 
         return FutureConverters.toJava(shardRegistrationPromise.future());
     }
 
     void resolveShardAdditions(final Set<DOMDataTreeIdentifier> additions) {
-        LOG.debug("Member {}: Resolving additions : {}", memberName, additions);
-        final ArrayList<DOMDataTreeIdentifier> list = new ArrayList<>(additions);
+        LOG.debug("{}: Resolving additions : {}", memberName, additions);
         // we need to register the shards from top to bottom, so we need to atleast make sure the ordering reflects that
-        Collections.sort(list, (o1, o2) -> {
-            if (o1.getRootIdentifier().getPathArguments().size() < o2.getRootIdentifier().getPathArguments().size()) {
-                return -1;
-            } else if (o1.getRootIdentifier().getPathArguments().size()
-                    == o2.getRootIdentifier().getPathArguments().size()) {
-                return 0;
-            } else {
-                return 1;
-            }
-        });
-        list.forEach(this::createShardFrontend);
+        additions
+            .stream()
+            .sorted(Comparator.comparingInt(o -> o.getRootIdentifier().getPathArguments().size()))
+            .forEachOrdered(this::createShardFrontend);
     }
 
     void resolveShardRemovals(final Set<DOMDataTreeIdentifier> removals) {
-        LOG.debug("Member {}: Resolving removals : {}", memberName, removals);
+        LOG.debug("{}: Resolving removals : {}", memberName, removals);
 
         // do we need to go from bottom to top?
         removals.forEach(this::despawnShardFrontend);
     }
 
     private void createShardFrontend(final DOMDataTreeIdentifier prefix) {
-        LOG.debug("Member {}: Creating CDS shard for prefix: {}", memberName, prefix);
+        LOG.debug("{}: Creating CDS shard for prefix: {}", memberName, prefix);
         final String shardName = ClusterUtils.getCleanShardName(prefix.getRootIdentifier());
         final AbstractDataStore distributedDataStore =
                 prefix.getDatastoreType().equals(org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION)
@@ -409,9 +391,8 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
             final DistributedShardFrontend shard =
                     new DistributedShardFrontend(distributedDataStore, entry.getKey(), prefix);
 
-            @SuppressWarnings("unchecked")
             final DOMDataTreeShardRegistration<DOMDataTreeShard> reg =
-                    (DOMDataTreeShardRegistration) shardedDOMDataTree.registerDataTreeShard(prefix, shard, producer);
+                    shardedDOMDataTree.registerDataTreeShard(prefix, shard, producer);
 
             synchronized (shards) {
                 shards.store(prefix, reg);
@@ -428,14 +409,14 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
     }
 
     private void despawnShardFrontend(final DOMDataTreeIdentifier prefix) {
-        LOG.debug("Member {}: Removing CDS shard for prefix: {}", memberName, prefix);
+        LOG.debug("{}: Removing CDS shard for prefix: {}", memberName, prefix);
         final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookup;
         synchronized (shards) {
             lookup = shards.lookup(prefix);
         }
 
         if (lookup == null || !lookup.getValue().getPrefix().equals(prefix)) {
-            LOG.debug("Member {}: Received despawn for non-existing CDS shard frontend, prefix: {}, ignoring..",
+            LOG.debug("{}: Received despawn for non-existing CDS shard frontend, prefix: {}, ignoring..",
                     memberName, prefix);
             return;
         }
@@ -459,7 +440,7 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
             public void onFailure(final Throwable throwable) {
                 LOG.error("Removal of shard {} from configuration failed.", prefix, throwable);
             }
-        });
+        }, MoreExecutors.directExecutor());
     }
 
     DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookupShardFrontend(
@@ -495,7 +476,7 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
             final String shardName, final ActorContext actorContext)
             throws DOMDataTreeShardCreationFailedException {
 
-        LOG.debug("Creating distributed datastore client for shard {}", shardName);
+        LOG.debug("{}: Creating distributed datastore client for shard {}", memberName, shardName);
         final Props distributedDataStoreClientProps =
                 SimpleDataStoreClientActor.props(memberName, "Shard-" + shardName, actorContext, shardName);
 
@@ -504,7 +485,7 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
             return new SimpleEntry<>(SimpleDataStoreClientActor
                     .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS), clientActor);
         } catch (final Exception e) {
-            LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
+            LOG.error("{}: Failed to get actor for {}", distributedDataStoreClientProps, memberName, e);
             clientActor.tell(PoisonPill.getInstance(), noSender());
             throw new DOMDataTreeShardCreationFailedException(
                     "Unable to create datastore client for shard{" + shardName + "}", e);
@@ -512,26 +493,20 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
-    private DistributedShardRegistration initDefaultShard(final LogicalDatastoreType logicalDatastoreType)
-            throws ExecutionException, InterruptedException {
-        final Collection<MemberName> names =
-                distributedConfigDatastore.getActorContext().getConfiguration().getUniqueMemberNamesForAllShards();
+    private void initDefaultShard(final LogicalDatastoreType logicalDatastoreType) {
 
         final PrefixedShardConfigWriter writer = writerMap.get(logicalDatastoreType);
 
         if (writer.checkDefaultIsPresent()) {
-            LOG.debug("Default shard for {} is already present in the config. Possibly saved in snapshot.",
-                    logicalDatastoreType);
-            return new DistributedShardRegistrationImpl(
-                    new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY),
-                    shardedDataTreeActor, this);
+            LOG.debug("{}: Default shard for {} is already present in the config. Possibly saved in snapshot.",
+                    memberName, logicalDatastoreType);
         } else {
             try {
-                // There can be situation when there is already started default shard
-                // because it is present in modules.conf. In that case we have to create
-                // just frontend for default shard, but not shard itself
-                // TODO we don't have to do it for config and operational default shard
-                // separately. Just one of them should be enough
+                // Currently the default shard configuration is present in the out-of-box modules.conf and is
+                // expected to be present. So look up the local default shard here and create the frontend.
+
+                // TODO we don't have to do it for config and operational default shard separately. Just one of them
+                // should be enough
                 final ActorContext actorContext = logicalDatastoreType == LogicalDatastoreType.CONFIGURATION
                         ? distributedConfigDatastore.getActorContext() : distributedOperDatastore.getActorContext();
 
@@ -539,24 +514,27 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
                         actorContext.findLocalShard(ClusterUtils.getCleanShardName(YangInstanceIdentifier.EMPTY));
 
                 if (defaultLocalShardOptional.isPresent()) {
-                    LOG.debug("{} Default shard is already started, creating just frontend", logicalDatastoreType);
+                    LOG.debug("{}: Default shard for {} is already started, creating just frontend", memberName,
+                            logicalDatastoreType);
                     createShardFrontend(new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY));
-                    return new DistributedShardRegistrationImpl(
-                            new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY),
-                            shardedDataTreeActor, this);
                 }
 
-                // we should probably only have one node create the default shards
-                return Await.result(FutureConverters.toScala(createDistributedShard(
-                        new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY), names)),
-                        SHARD_FUTURE_TIMEOUT_DURATION);
-            } catch (DOMDataTreeShardingConflictException e) {
-                LOG.debug("Default shard already registered, possibly due to other node doing it faster");
-                return new DistributedShardRegistrationImpl(
-                        new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY),
-                        shardedDataTreeActor, this);
+                // The local shard isn't present - we assume that means the local member isn't in the replica list
+                // and will be dynamically created later via an explicit add-shard-replica request. This is the
+                // bootstrapping mechanism to add a new node into an existing cluster. The following code to create
+                // the default shard as a prefix shard is problematic in this scenario so it is commented out. Since
+                // the default shard is a module-based shard by default, it makes sense to always treat it as such,
+                // ie bootstrap it in the same manner as the special prefix-configuration and EOS shards.
+//                final Collection<MemberName> names = distributedConfigDatastore.getActorContext().getConfiguration()
+//                        .getUniqueMemberNamesForAllShards();
+//                Await.result(FutureConverters.toScala(createDistributedShard(
+//                        new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY), names)),
+//                        SHARD_FUTURE_TIMEOUT_DURATION);
+//            } catch (DOMDataTreeShardingConflictException e) {
+//                LOG.debug("{}: Default shard for {} already registered, possibly due to other node doing it faster",
+//                        memberName, logicalDatastoreType);
             } catch (Exception e) {
-                LOG.error("{} default shard initialization failed", logicalDatastoreType, e);
+                LOG.error("{}: Default shard initialization for {} failed", memberName, logicalDatastoreType, e);
                 throw new RuntimeException(e);
             }
         }
@@ -667,6 +645,7 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
 
         @Nonnull
         @Override
+        @SuppressWarnings("checkstyle:hiddenField")
         public DOMDataTreeProducer createProducer(@Nonnull final Collection<DOMDataTreeIdentifier> subtrees) {
             // TODO we probably don't need to distribute this on the remote nodes since once we have this producer
             // open we surely have the rights to all the subtrees.
@@ -700,11 +679,11 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
         public CDSShardAccess getShardAccess(@Nonnull final DOMDataTreeIdentifier subtree) {
             Preconditions.checkArgument(
                     subtrees.stream().anyMatch(dataTreeIdentifier -> dataTreeIdentifier.contains(subtree)),
-                    "Subtree {} is not controlled by this producer {}", subtree, this);
+                    "Subtree %s is not controlled by this producer %s", subtree, this);
 
             final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookup =
                     shardTable.lookup(subtree);
-            Preconditions.checkState(lookup != null, "Subtree {} is not contained in any registered shard.");
+            Preconditions.checkState(lookup != null, "Subtree %s is not contained in any registered shard.", subtree);
 
             final DOMDataTreeIdentifier lookupId = lookup.getValue().getPrefix();