/* * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.cluster.sharding; import static akka.actor.ActorRef.noSender; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.PoisonPill; import akka.actor.Props; import akka.util.Timeout; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.ForwardingObject; import com.google.common.util.concurrent.Uninterruptibles; import java.util.Collection; import java.util.Collections; import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient; import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor; import org.opendaylight.controller.cluster.datastore.DistributedDataStore; import org.opendaylight.controller.cluster.datastore.Shard; import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration; import org.opendaylight.controller.cluster.datastore.messages.CreatePrefixedShard; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor.ShardedDataTreeActorCreator; import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated; import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved; import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated; import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved; import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.mdsal.dom.api.DOMDataTreeListener; import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException; import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer; import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException; import org.opendaylight.mdsal.dom.api.DOMDataTreeService; import org.opendaylight.mdsal.dom.api.DOMDataTreeShard; import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException; import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingService; import org.opendaylight.mdsal.dom.broker.ShardedDOMDataTree; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Await; import scala.concurrent.Future; /** * A layer on top of DOMDataTreeService that distributes producer/shard registrations to remote nodes via * {@link ShardedDataTreeActor}. Also provides QoL method for addition of prefix based clustered shard into the system. */ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDataTreeShardingService, DistributedShardFactory { private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTree.class); private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS); private static final int MAX_ACTOR_CREATION_RETRIES = 100; private static final int ACTOR_RETRY_DELAY = 100; private static final TimeUnit ACTOR_RETRY_TIME_UNIT = TimeUnit.MILLISECONDS; static final String ACTOR_ID = "ShardedDOMDataTreeFrontend"; private final ShardedDOMDataTree shardedDOMDataTree; private final ActorSystem actorSystem; private final DistributedDataStore distributedOperDatastore; private final DistributedDataStore distributedConfigDatastore; private final ActorRef shardedDataTreeActor; private final MemberName memberName; public DistributedShardedDOMDataTree(final ActorSystem actorSystem, final DistributedDataStore distributedOperDatastore, final DistributedDataStore distributedConfigDatastore) { this.actorSystem = Preconditions.checkNotNull(actorSystem); this.distributedOperDatastore = Preconditions.checkNotNull(distributedOperDatastore); this.distributedConfigDatastore = Preconditions.checkNotNull(distributedConfigDatastore); shardedDOMDataTree = new ShardedDOMDataTree(); shardedDataTreeActor = createShardedDataTreeActor(actorSystem, new ShardedDataTreeActorCreator() .setDataTreeService(shardedDOMDataTree) .setShardingService(shardedDOMDataTree) .setActorSystem(actorSystem) .setClusterWrapper(distributedConfigDatastore.getActorContext().getClusterWrapper()) .setDistributedConfigDatastore(distributedConfigDatastore) .setDistributedOperDatastore(distributedOperDatastore), ACTOR_ID); this.memberName = distributedConfigDatastore.getActorContext().getCurrentMemberName(); } @Nonnull @Override public ListenerRegistration registerListener( final T listener, final Collection subtrees, final boolean allowRxMerges, final Collection producers) throws DOMDataTreeLoopException { throw new UnsupportedOperationException("Not implemented"); } @Nonnull @Override public DOMDataTreeProducer createProducer(@Nonnull final Collection subtrees) { LOG.debug("Creating producer for {}", subtrees); final DOMDataTreeProducer producer = shardedDOMDataTree.createProducer(subtrees); final Object response = distributedConfigDatastore.getActorContext() .executeOperation(shardedDataTreeActor, new ProducerCreated(subtrees)); if (response == null) { LOG.debug("Received success from remote nodes, creating producer:{}", subtrees); return new ProxyProducer(producer, subtrees, shardedDataTreeActor, distributedConfigDatastore.getActorContext()); } else if (response instanceof Exception) { closeProducer(producer); throw Throwables.propagate((Exception) response); } else { closeProducer(producer); throw new RuntimeException("Unexpected response to create producer received." + response); } } @Override @SuppressWarnings("checkstyle:IllegalCatch") public DistributedShardRegistration createDistributedShard( final DOMDataTreeIdentifier prefix, final Collection replicaMembers) throws DOMDataTreeShardingConflictException, DOMDataTreeProducerException, DOMDataTreeShardCreationFailedException { final String shardName = ClusterUtils.getCleanShardName(prefix.getRootIdentifier()); final DistributedDataStore distributedDataStore = prefix.getDatastoreType().equals(org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION) ? distributedConfigDatastore : distributedOperDatastore; final PrefixShardConfiguration config = new PrefixShardConfiguration(prefix, "prefix", replicaMembers); if (replicaMembers.contains(memberName)) { // spawn the backend shard and have the shard Manager create all replicas final ActorRef shardManager = distributedDataStore.getActorContext().getShardManager(); shardManager.tell(new CreatePrefixedShard(config, null, Shard.builder()), noSender()); } LOG.debug("Creating distributed datastore client for shard {}", shardName); final Props distributedDataStoreClientProps = SimpleDataStoreClientActor .props(memberName, "Shard-" + shardName, distributedDataStore.getActorContext(), shardName); final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps); final DataStoreClient client; try { client = SimpleDataStoreClientActor.getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS); } catch (final Exception e) { LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e); clientActor.tell(PoisonPill.getInstance(), noSender()); throw new DOMDataTreeProducerException("Unable to create producer", e); } // register the frontend into the sharding service and let the actor distribute this onto the other nodes final ListenerRegistration shardFrontendRegistration; try (DOMDataTreeProducer producer = createProducer(Collections.singletonList(prefix))) { shardFrontendRegistration = shardedDOMDataTree .registerDataTreeShard(prefix, new ShardFrontend(client, prefix), ((ProxyProducer) producer).delegate()); } final Future future = distributedDataStore.getActorContext() .executeOperationAsync(shardedDataTreeActor, new PrefixShardCreated(config), DEFAULT_ASK_TIMEOUT); try { final Object result = Await.result(future, DEFAULT_ASK_TIMEOUT.duration()); if (result != null) { throw new DOMDataTreeShardCreationFailedException("Received unexpected response to PrefixShardCreated" + result); } return new DistributedShardRegistrationImpl(shardFrontendRegistration, prefix, shardedDataTreeActor); } catch (final CompletionException e) { shardedDataTreeActor.tell(new PrefixShardRemoved(prefix), noSender()); clientActor.tell(PoisonPill.getInstance(), noSender()); final Throwable cause = e.getCause(); if (cause instanceof DOMDataTreeShardingConflictException) { throw (DOMDataTreeShardingConflictException) cause; } throw new DOMDataTreeShardCreationFailedException("Shard creation failed.", e.getCause()); } catch (final Exception e) { shardedDataTreeActor.tell(new PrefixShardRemoved(prefix), noSender()); clientActor.tell(PoisonPill.getInstance(), noSender()); throw new DOMDataTreeShardCreationFailedException("Shard creation failed.", e); } } @Nonnull @Override public ListenerRegistration registerDataTreeShard( @Nonnull final DOMDataTreeIdentifier prefix, @Nonnull final T shard, @Nonnull final DOMDataTreeProducer producer) throws DOMDataTreeShardingConflictException { LOG.debug("Registering shard[{}] at prefix: {}", shard, prefix); return shardedDOMDataTree.registerDataTreeShard(prefix, shard, producer); } private static void closeProducer(final DOMDataTreeProducer producer) { try { producer.close(); } catch (final DOMDataTreeProducerException e) { LOG.error("Unable to close producer", e); } } @SuppressWarnings("checkstyle:IllegalCatch") private static ActorRef createShardedDataTreeActor(final ActorSystem actorSystem, final ShardedDataTreeActorCreator creator, final String shardDataTreeActorId) { Exception lastException = null; for (int i = 0; i < MAX_ACTOR_CREATION_RETRIES; i++) { try { return actorSystem.actorOf(creator.props(), shardDataTreeActorId); } catch (final Exception e) { lastException = e; Uninterruptibles.sleepUninterruptibly(ACTOR_RETRY_DELAY, ACTOR_RETRY_TIME_UNIT); LOG.debug("Could not create actor {} because of {} -" + " waiting for sometime before retrying (retry count = {})", shardDataTreeActorId, e.getMessage(), i); } } throw new IllegalStateException("Failed to create actor for ShardedDOMDataTree", lastException); } private static class DistributedShardRegistrationImpl implements DistributedShardRegistration { private final ListenerRegistration registration; private final DOMDataTreeIdentifier prefix; private final ActorRef shardedDataTreeActor; DistributedShardRegistrationImpl(final ListenerRegistration registration, final DOMDataTreeIdentifier prefix, final ActorRef shardedDataTreeActor) { this.registration = registration; this.prefix = prefix; this.shardedDataTreeActor = shardedDataTreeActor; } @Override public void close() { // TODO send the correct messages to ShardManager to destroy the shard // maybe we could provide replica removal mechanisms also? shardedDataTreeActor.tell(new PrefixShardRemoved(prefix), noSender()); registration.close(); } } private static final class ProxyProducer extends ForwardingObject implements DOMDataTreeProducer { private final DOMDataTreeProducer delegate; private final Collection subtrees; private final ActorRef shardDataTreeActor; private final ActorContext actorContext; ProxyProducer(final DOMDataTreeProducer delegate, final Collection subtrees, final ActorRef shardDataTreeActor, final ActorContext actorContext) { this.delegate = Preconditions.checkNotNull(delegate); this.subtrees = Preconditions.checkNotNull(subtrees); this.shardDataTreeActor = Preconditions.checkNotNull(shardDataTreeActor); this.actorContext = Preconditions.checkNotNull(actorContext); } @Nonnull @Override public DOMDataTreeCursorAwareTransaction createTransaction(final boolean isolated) { return delegate.createTransaction(isolated); } @Nonnull @Override public DOMDataTreeProducer createProducer(@Nonnull final Collection subtrees) { // TODO we probably don't need to distribute this on the remote nodes since once we have this producer // open we surely have the rights to all the subtrees. return delegate.createProducer(subtrees); } @Override public void close() throws DOMDataTreeProducerException { delegate.close(); final Object o = actorContext.executeOperation(shardDataTreeActor, new ProducerRemoved(subtrees)); if (o instanceof DOMDataTreeProducerException) { throw ((DOMDataTreeProducerException) o); } else if (o instanceof Throwable) { throw new DOMDataTreeProducerException("Unable to close producer", (Throwable) o); } } @Override protected DOMDataTreeProducer delegate() { return delegate; } } }