mavenBundle(CONTROLLER, "sal-common-impl").versionAsInProject(), // //
mavenBundle("org.apache.commons", "commons-lang3").versionAsInProject(), //
- mavenBundle("com.google.guava", "guava").versionAsInProject()
+ mavenBundle("com.google.guava", "guava").versionAsInProject(),
+ mavenBundle("com.github.romix", "java-concurrent-hash-trie-map").versionAsInProject()
);
}
output = new DataOutputStream(stream);
}
- public NormalizedNodeOutputStreamWriter(DataOutput output) throws IOException {
+ public NormalizedNodeOutputStreamWriter(DataOutput output) {
this.output = Preconditions.checkNotNull(output);
}
*/
package org.opendaylight.controller.cluster.datastore;
-import com.google.common.collect.Lists;
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import scala.concurrent.Future;
abstract class AbstractTransactionContext implements TransactionContext {
- protected final TransactionIdentifier identifier;
- protected final List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
+ private final List<Future<Object>> recordedOperationFutures = new ArrayList<>();
+ private final TransactionIdentifier identifier;
- AbstractTransactionContext(TransactionIdentifier identifier) {
+ protected AbstractTransactionContext(TransactionIdentifier identifier) {
this.identifier = identifier;
}
@Override
- public List<Future<Object>> getRecordedOperationFutures() {
- return recordedOperationFutures;
+ public final void copyRecordedOperationFutures(Collection<Future<Object>> target) {
+ target.addAll(recordedOperationFutures);
}
-}
\ No newline at end of file
+
+ protected final TransactionIdentifier getIdentifier() {
+ return identifier;
+ }
+
+ protected final Collection<Future<Object>> copyRecordedOperationFutures() {
+ return ImmutableList.copyOf(recordedOperationFutures);
+ }
+
+ protected final int recordedOperationCount() {
+ return recordedOperationFutures.size();
+ }
+
+ protected final void recordOperationFuture(Future<Object> future) {
+ recordedOperationFutures.add(future);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSelection;
+import akka.dispatch.OnComplete;
+import java.util.List;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+import scala.concurrent.Promise;
+
+final class ChainedTransactionProxy extends TransactionProxy {
+ private static final Logger LOG = LoggerFactory.getLogger(ChainedTransactionProxy.class);
+
+ /**
+ * Stores the ready Futures from the previous Tx in the chain.
+ */
+ private final List<Future<ActorSelection>> previousReadyFutures;
+
+ /**
+ * Stores the ready Futures from this transaction when it is readied.
+ */
+ private volatile List<Future<ActorSelection>> readyFutures;
+
+ ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType,
+ String transactionChainId, List<Future<ActorSelection>> previousReadyFutures) {
+ super(actorContext, transactionType, transactionChainId);
+ this.previousReadyFutures = previousReadyFutures;
+ }
+
+ List<Future<ActorSelection>> getReadyFutures() {
+ return readyFutures;
+ }
+
+ boolean isReady() {
+ return readyFutures != null;
+ }
+
+ @Override
+ protected void onTransactionReady(List<Future<ActorSelection>> readyFutures) {
+ LOG.debug("onTransactionReady {} pending readyFutures size {} chain {}", getIdentifier(),
+ readyFutures.size(), getTransactionChainId());
+ this.readyFutures = readyFutures;
+ }
+
+ /**
+ * This method is overridden to ensure the previous Tx's ready operations complete
+ * before we initiate the next Tx in the chain to avoid creation failures if the
+ * previous Tx's ready operations haven't completed yet.
+ */
+ @Override
+ protected Future<ActorSelection> sendFindPrimaryShardAsync(final String shardName) {
+ // Check if there are any previous ready Futures, otherwise let the super class handle it.
+ if(previousReadyFutures.isEmpty()) {
+ return super.sendFindPrimaryShardAsync(shardName);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Waiting for {} previous ready futures for Tx {} on chain {}",
+ previousReadyFutures.size(), getIdentifier(), getTransactionChainId());
+ }
+
+ // Combine the ready Futures into 1.
+ Future<Iterable<ActorSelection>> combinedFutures = akka.dispatch.Futures.sequence(
+ previousReadyFutures, getActorContext().getClientDispatcher());
+
+ // Add a callback for completion of the combined Futures.
+ final Promise<ActorSelection> returnPromise = akka.dispatch.Futures.promise();
+ OnComplete<Iterable<ActorSelection>> onComplete = new OnComplete<Iterable<ActorSelection>>() {
+ @Override
+ public void onComplete(Throwable failure, Iterable<ActorSelection> notUsed) {
+ if(failure != null) {
+ // A Ready Future failed so fail the returned Promise.
+ returnPromise.failure(failure);
+ } else {
+ LOG.debug("Previous Tx readied - sending FindPrimaryShard for {} on chain {}",
+ getIdentifier(), getTransactionChainId());
+
+ // Send the FindPrimaryShard message and use the resulting Future to complete the
+ // returned Promise.
+ returnPromise.completeWith(ChainedTransactionProxy.super.sendFindPrimaryShardAsync(shardName));
+ }
+ }
+ };
+
+ combinedFutures.onComplete(onComplete, getActorContext().getClientDispatcher());
+
+ return returnPromise.future();
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import java.util.ArrayList;
+import java.util.List;
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class DataChangeListenerSupport extends LeaderLocalDelegateFactory<RegisterChangeListener, ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>> {
+ private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerSupport.class);
+ private final List<DelayedListenerRegistration> delayedListenerRegistrations = new ArrayList<>();
+ private final List<ActorSelection> dataChangeListeners = new ArrayList<>();
+
+ DataChangeListenerSupport(final Shard shard) {
+ super(shard);
+ }
+
+ @Override
+ void onLeadershipChange(final boolean isLeader) {
+ for (ActorSelection dataChangeListener : dataChangeListeners) {
+ dataChangeListener.tell(new EnableNotification(isLeader), getSelf());
+ }
+
+ if (isLeader) {
+ for (DelayedListenerRegistration reg: delayedListenerRegistrations) {
+ if(!reg.isClosed()) {
+ reg.setDelegate(createDelegate(reg.getRegisterChangeListener()));
+ }
+ }
+
+ delayedListenerRegistrations.clear();
+ }
+ }
+
+ @Override
+ void onMessage(final RegisterChangeListener message, final boolean isLeader) {
+
+ LOG.debug("{}: registerDataChangeListener for {}, leader: {}", persistenceId(), message.getPath(), isLeader);
+
+ ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+ NormalizedNode<?, ?>>> registration;
+ if (isLeader) {
+ registration = createDelegate(message);
+ } else {
+ LOG.debug("{}: Shard is not the leader - delaying registration", persistenceId());
+
+ DelayedListenerRegistration delayedReg = new DelayedListenerRegistration(message);
+ delayedListenerRegistrations.add(delayedReg);
+ registration = delayedReg;
+ }
+
+ ActorRef listenerRegistration = createActor(DataChangeListenerRegistration.props(registration));
+
+ LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
+ persistenceId(), listenerRegistration.path());
+
+ tellSender(new RegisterChangeListenerReply(listenerRegistration));
+ }
+
+ @Override
+ ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> createDelegate(
+ final RegisterChangeListener message) {
+ ActorSelection dataChangeListenerPath = selectActor(message.getDataChangeListenerPath());
+
+ // Notify the listener if notifications should be enabled or not
+ // If this shard is the leader then it will enable notifications else
+ // it will not
+ dataChangeListenerPath.tell(new EnableNotification(true), getSelf());
+
+ // Now store a reference to the data change listener so it can be notified
+ // at a later point if notifications should be enabled or disabled
+ dataChangeListeners.add(dataChangeListenerPath);
+
+ AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
+ new DataChangeListenerProxy(dataChangeListenerPath);
+
+ LOG.debug("{}: Registering for path {}", persistenceId(), message.getPath());
+
+ return getShard().getDataStore().registerChangeListener(message.getPath(), listener,
+ message.getScope());
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.Props;
+import akka.japi.Creator;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.cluster.datastore.messages.DataTreeChanged;
+import org.opendaylight.controller.cluster.datastore.messages.DataTreeChangedReply;
+import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Proxy actor which acts as a facade to the user-provided listener. Responsible for decapsulating
+ * DataTreeChanged messages and dispatching their context to the user.
+ */
+final class DataTreeChangeListenerActor extends AbstractUntypedActor {
+ private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerActor.class);
+ private final DOMDataTreeChangeListener listener;
+ private boolean notificationsEnabled = false;
+
+ private DataTreeChangeListenerActor(final DOMDataTreeChangeListener listener) {
+ this.listener = Preconditions.checkNotNull(listener);
+ }
+
+ @Override
+ protected void handleReceive(final Object message) {
+ if (message instanceof DataTreeChanged) {
+ dataChanged((DataTreeChanged)message);
+ } else if (message instanceof EnableNotification) {
+ enableNotification((EnableNotification) message);
+ }
+ }
+
+ private void dataChanged(final DataTreeChanged message) {
+ // Do nothing if notifications are not enabled
+ if (!notificationsEnabled) {
+ LOG.debug("Notifications not enabled for listener {} - dropping change notification", listener);
+ return;
+ }
+
+ LOG.debug("Sending change notification {} to listener {}", message.getChanges(), listener);
+
+ try {
+ this.listener.onDataTreeChanged(message.getChanges());
+ } catch (Exception e) {
+ LOG.error("Error notifying listener {}", this.listener, e);
+ }
+
+ // TODO: do we really need this?
+ // It seems the sender is never null but it doesn't hurt to check. If the caller passes in
+ // a null sender (ActorRef.noSender()), akka translates that to the deadLetters actor.
+ if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
+ getSender().tell(DataTreeChangedReply.getInstance(), getSelf());
+ }
+ }
+
+ private void enableNotification(final EnableNotification message) {
+ notificationsEnabled = message.isEnabled();
+ LOG.debug("{} notifications for listener {}", (notificationsEnabled ? "Enabled" : "Disabled"),
+ listener);
+ }
+
+ public static Props props(final DOMDataTreeChangeListener listener) {
+ return Props.create(new DataTreeChangeListenerCreator(listener));
+ }
+
+ private static final class DataTreeChangeListenerCreator implements Creator<DataTreeChangeListenerActor> {
+ private static final long serialVersionUID = 1L;
+ private final DOMDataTreeChangeListener listener;
+
+ DataTreeChangeListenerCreator(final DOMDataTreeChangeListener listener) {
+ this.listener = Preconditions.checkNotNull(listener);
+ }
+
+ @Override
+ public DataTreeChangeListenerActor create() {
+ return new DataTreeChangeListenerActor(listener);
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.PoisonPill;
+import akka.dispatch.OnComplete;
+import com.google.common.base.Preconditions;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
+/**
+ * Proxy class for holding required state to lazily instantiate a listener registration with an
+ * asynchronously-discovered actor.
+ *
+ * @param <T> listener type
+ */
+final class DataTreeChangeListenerProxy<T extends DOMDataTreeChangeListener> extends AbstractListenerRegistration<T> {
+ private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerProxy.class);
+ private final ActorRef dataChangeListenerActor;
+ private final ActorContext actorContext;
+
+ @GuardedBy("this")
+ private ActorSelection listenerRegistrationActor;
+
+ public DataTreeChangeListenerProxy(final ActorContext actorContext, final T listener) {
+ super(listener);
+ this.actorContext = Preconditions.checkNotNull(actorContext);
+ this.dataChangeListenerActor = actorContext.getActorSystem().actorOf(
+ DataTreeChangeListenerActor.props(getInstance()).withDispatcher(actorContext.getNotificationDispatcherPath()));
+ }
+
+ @Override
+ protected synchronized void removeRegistration() {
+ if (listenerRegistrationActor != null) {
+ listenerRegistrationActor.tell(CloseDataTreeChangeListenerRegistration.getInstance(), ActorRef.noSender());
+ listenerRegistrationActor = null;
+ }
+
+ dataChangeListenerActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }
+
+ void init(final String shardName, final YangInstanceIdentifier treeId) {
+ Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName);
+ findFuture.onComplete(new OnComplete<ActorRef>() {
+ @Override
+ public void onComplete(final Throwable failure, final ActorRef shard) {
+ if (failure instanceof LocalShardNotFoundException) {
+ LOG.debug("No local shard found for {} - DataTreeChangeListener {} at path {} " +
+ "cannot be registered", shardName, getInstance(), treeId);
+ } else if (failure != null) {
+ LOG.error("Failed to find local shard {} - DataTreeChangeListener {} at path {} " +
+ "cannot be registered: {}", shardName, getInstance(), treeId, failure);
+ } else {
+ doRegistration(shard, treeId);
+ }
+ }
+ }, actorContext.getClientDispatcher());
+ }
+
+ private void setListenerRegistrationActor(final ActorSelection actor) {
+ if (actor == null) {
+ LOG.debug("Ignoring null actor on {}", this);
+ return;
+ }
+
+ synchronized (this) {
+ if (!isClosed()) {
+ this.listenerRegistrationActor = actor;
+ return;
+ }
+ }
+
+ // This registration has already been closed, notify the actor
+ actor.tell(CloseDataTreeChangeListenerRegistration.getInstance(), null);
+ }
+
+ private void doRegistration(final ActorRef shard, final YangInstanceIdentifier path) {
+
+ Future<Object> future = actorContext.executeOperationAsync(shard,
+ new RegisterDataTreeChangeListener(path, dataChangeListenerActor),
+ actorContext.getDatastoreContext().getShardInitializationTimeout());
+
+ future.onComplete(new OnComplete<Object>(){
+ @Override
+ public void onComplete(final Throwable failure, final Object result) {
+ if (failure != null) {
+ LOG.error("Failed to register DataTreeChangeListener {} at path {}",
+ getInstance(), path.toString(), failure);
+ } else {
+ RegisterDataTreeChangeListenerReply reply = (RegisterDataTreeChangeListenerReply) result;
+ setListenerRegistrationActor(actorContext.actorSelection(
+ reply.getListenerRegistrationPath().path()));
+ }
+ }
+ }, actorContext.getClientDispatcher());
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.japi.Creator;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistrationReply;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+
+/**
+ * Actor co-located with a shard. It exists only to terminate the registration when
+ * asked to do so via {@link CloseDataTreeChangeListenerRegistration}.
+ */
+public final class DataTreeChangeListenerRegistrationActor extends AbstractUntypedActor {
+ private final ListenerRegistration<DOMDataTreeChangeListener> registration;
+
+ public DataTreeChangeListenerRegistrationActor(final ListenerRegistration<DOMDataTreeChangeListener> registration) {
+ this.registration = Preconditions.checkNotNull(registration);
+ }
+
+ @Override
+ protected void handleReceive(Object message) throws Exception {
+ if (message instanceof CloseDataTreeChangeListenerRegistration) {
+ registration.close();
+ getSender().tell(CloseDataTreeChangeListenerRegistrationReply.getInstance(), getSelf());
+ getSelf().tell(PoisonPill.getInstance(), getSelf());
+ }
+ }
+
+ public static Props props(final ListenerRegistration<DOMDataTreeChangeListener> registration) {
+ return Props.create(new DataTreeChangeListenerRegistrationCreator(registration));
+ }
+
+ private static final class DataTreeChangeListenerRegistrationCreator implements Creator<DataTreeChangeListenerRegistrationActor> {
+ private static final long serialVersionUID = 1L;
+ final ListenerRegistration<DOMDataTreeChangeListener> registration;
+
+ DataTreeChangeListenerRegistrationCreator(ListenerRegistration<DOMDataTreeChangeListener> registration) {
+ this.registration = Preconditions.checkNotNull(registration);
+ }
+
+ @Override
+ public DataTreeChangeListenerRegistrationActor create() {
+ return new DataTreeChangeListenerRegistrationActor(registration);
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import java.util.ArrayList;
+import java.util.Collection;
+import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class DataTreeChangeListenerSupport extends LeaderLocalDelegateFactory<RegisterDataTreeChangeListener, ListenerRegistration<DOMDataTreeChangeListener>> {
+ private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerSupport.class);
+ private final ArrayList<DelayedDataTreeListenerRegistration> delayedRegistrations = new ArrayList<>();
+ private final Collection<ActorSelection> actors = new ArrayList<>();
+
+ DataTreeChangeListenerSupport(final Shard shard) {
+ super(shard);
+ }
+
+ @Override
+ void onLeadershipChange(final boolean isLeader) {
+ if (isLeader) {
+ for (DelayedDataTreeListenerRegistration reg : delayedRegistrations) {
+ reg.createDelegate(this);
+ }
+ delayedRegistrations.clear();
+ delayedRegistrations.trimToSize();
+ }
+
+ final EnableNotification msg = new EnableNotification(isLeader);
+ for (ActorSelection dataChangeListener : actors) {
+ dataChangeListener.tell(msg, getSelf());
+ }
+ }
+
+ @Override
+ void onMessage(final RegisterDataTreeChangeListener registerTreeChangeListener, final boolean isLeader) {
+ LOG.debug("{}: registerTreeChangeListener for {}, leader: {}", persistenceId(), registerTreeChangeListener.getPath(), isLeader);
+
+ final ListenerRegistration<DOMDataTreeChangeListener> registration;
+ if (!isLeader) {
+ LOG.debug("{}: Shard is not the leader - delaying registration", persistenceId());
+
+ DelayedDataTreeListenerRegistration delayedReg =
+ new DelayedDataTreeListenerRegistration(registerTreeChangeListener);
+ delayedRegistrations.add(delayedReg);
+ registration = delayedReg;
+ } else {
+ registration = createDelegate(registerTreeChangeListener);
+ }
+
+ ActorRef listenerRegistration = createActor(DataTreeChangeListenerRegistrationActor.props(registration));
+
+ LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
+ persistenceId(), listenerRegistration.path());
+
+ tellSender(new RegisterDataTreeChangeListenerReply(listenerRegistration));
+ }
+
+ @Override
+ ListenerRegistration<DOMDataTreeChangeListener> createDelegate(final RegisterDataTreeChangeListener message) {
+ ActorSelection dataChangeListenerPath = selectActor(message.getDataTreeChangeListenerPath());
+
+ // Notify the listener if notifications should be enabled or not
+ // If this shard is the leader then it will enable notifications else
+ // it will not
+ dataChangeListenerPath.tell(new EnableNotification(true), getSelf());
+
+ // Now store a reference to the data change listener so it can be notified
+ // at a later point if notifications should be enabled or disabled
+ actors.add(dataChangeListenerPath);
+
+ DOMDataTreeChangeListener listener = new ForwardingDataTreeChangeListener(dataChangeListenerPath);
+
+ LOG.debug("{}: Registering for path {}", persistenceId(), message.getPath());
+
+ return getShard().getDataStore().registerTreeChangeListener(message.getPath(), listener);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Preconditions;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+
+/**
+ * Intermediate proxy registration returned to the user when we cannot
+ * instantiate the registration immediately. It provides a bridge to
+ * a real registration which may materialize at some point in the future.
+ */
+final class DelayedDataTreeListenerRegistration implements ListenerRegistration<DOMDataTreeChangeListener> {
+ private final RegisterDataTreeChangeListener registerTreeChangeListener;
+ private volatile ListenerRegistration<DOMDataTreeChangeListener> delegate;
+ @GuardedBy("this")
+ private boolean closed;
+
+ DelayedDataTreeListenerRegistration(final RegisterDataTreeChangeListener registerTreeChangeListener) {
+ this.registerTreeChangeListener = Preconditions.checkNotNull(registerTreeChangeListener);
+ }
+
+ synchronized void createDelegate(final DelegateFactory<RegisterDataTreeChangeListener, ListenerRegistration<DOMDataTreeChangeListener>> factory) {
+ if (!closed) {
+ this.delegate = factory.createDelegate(registerTreeChangeListener);
+ }
+ }
+
+ @Override
+ public DOMDataTreeChangeListener getInstance() {
+ final ListenerRegistration<DOMDataTreeChangeListener> d = delegate;
+ return d == null ? null : d.getInstance();
+ }
+
+ @Override
+ public synchronized void close() {
+ if (!closed) {
+ closed = true;
+ if (delegate != null) {
+ delegate.close();
+ }
+ }
+ }
+}
+
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+final class DelayedListenerRegistration implements
+ ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> {
+
+ private volatile boolean closed;
+
+ private final RegisterChangeListener registerChangeListener;
+
+ private volatile ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+ NormalizedNode<?, ?>>> delegate;
+
+ DelayedListenerRegistration(final RegisterChangeListener registerChangeListener) {
+ this.registerChangeListener = registerChangeListener;
+ }
+
+ void setDelegate( final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+ NormalizedNode<?, ?>>> registration) {
+ this.delegate = registration;
+ }
+
+ boolean isClosed() {
+ return closed;
+ }
+
+ RegisterChangeListener getRegisterChangeListener() {
+ return registerChangeListener;
+ }
+
+ @Override
+ public AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> getInstance() {
+ return delegate != null ? delegate.getInstance() : null;
+ }
+
+ @Override
+ public void close() {
+ closed = true;
+ if(delegate != null) {
+ delegate.close();
+ }
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+/**
+ * Base class for factories instantiating delegates.
+ *
+ * <D> delegate type
+ * <M> message type
+ */
+abstract class DelegateFactory<M, D> {
+ abstract D createDelegate(M message);
+}
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
*
*/
public class DistributedDataStore implements DOMStore, SchemaContextListener,
- DatastoreContextConfigAdminOverlay.Listener, AutoCloseable {
+ DatastoreContextConfigAdminOverlay.Listener, DOMStoreTreeChangePublisher, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
private static final String UNKNOWN_TYPE = "unknown";
return listenerRegistrationProxy;
}
+ @Override
+ public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(YangInstanceIdentifier treeId, L listener) {
+ Preconditions.checkNotNull(treeId, "treeId should not be null");
+ Preconditions.checkNotNull(listener, "listener should not be null");
+
+ final String shardName = ShardStrategyFactory.getStrategy(treeId).findShard(treeId);
+ LOG.debug("Registering tree listener: {} for tree: {} shard: {}", listener, treeId, shardName);
+
+ final DataTreeChangeListenerProxy<L> listenerRegistrationProxy =
+ new DataTreeChangeListenerProxy<L>(actorContext, listener);
+ listenerRegistrationProxy.init(shardName, treeId);
+
+ return listenerRegistrationProxy;
+ }
+
@Override
public DOMStoreTransactionChain createTransactionChain() {
return new TransactionChainProxy(actorContext);
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Preconditions;
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import java.util.Collection;
+import org.opendaylight.controller.cluster.datastore.messages.DataTreeChanged;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+
+/**
+ * Internal implementation of a {@link DOMDataTreeChangeListener} which
+ * encapsulates received notifications into a {@link DataTreeChanged}
+ * message and forwards them towards the client's {@link DataTreeChangeListenerActor}.
+ */
+final class ForwardingDataTreeChangeListener implements DOMDataTreeChangeListener {
+ private final ActorSelection actor;
+
+ ForwardingDataTreeChangeListener(final ActorSelection actor) {
+ this.actor = Preconditions.checkNotNull(actor, "actor should not be null");
+ }
+
+ @Override
+ public void onDataTreeChanged(Collection<DataTreeCandidate> changes) {
+ actor.tell(new DataTreeChanged(changes), ActorRef.noSender());
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorPath;
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.Props;
+import com.google.common.base.Preconditions;
+
+/**
+ * Base class for factories instantiating delegates which are local to the
+ * shard leader.
+ *
+ * <D> delegate type
+ * <M> message type
+ */
+abstract class LeaderLocalDelegateFactory<M, D> extends DelegateFactory<M, D> {
+ private final Shard shard;
+
+ protected LeaderLocalDelegateFactory(final Shard shard) {
+ this.shard = Preconditions.checkNotNull(shard);
+ }
+
+ protected final ActorRef getSelf() {
+ return shard.getSelf();
+ }
+
+ protected final Shard getShard() {
+ return shard;
+ }
+
+ protected final String persistenceId() {
+ return shard.persistenceId();
+ }
+
+ protected final void tellSender(final Object message) {
+ shard.getSender().tell(message, getSelf());
+ }
+
+ protected final ActorRef createActor(final Props props) {
+ return shard.getContext().actorOf(props);
+ }
+
+ protected final ActorSelection selectActor(ActorRef ref) {
+ return shard.getContext().system().actorSelection(ref.path());
+ }
+
+ protected final ActorSelection selectActor(ActorPath path) {
+ return shard.getContext().system().actorSelection(path);
+ }
+
+ /**
+ * Invoked whenever the local shard's leadership role changes.
+ *
+ * @param isLeader true if the shard has become leader, false if it has
+ * become a follower.
+ */
+ abstract void onLeadershipChange(boolean isLeader);
+ abstract void onMessage(M message, boolean isLeader);
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+
+/**
+ * A {@link DOMStoreThreePhaseCommitCohort} instance given out for empty transactions.
+ */
+final class NoOpDOMStoreThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort {
+ static final NoOpDOMStoreThreePhaseCommitCohort INSTANCE = new NoOpDOMStoreThreePhaseCommitCohort();
+
+ private static final ListenableFuture<Void> IMMEDIATE_VOID_SUCCESS = Futures.immediateFuture(null);
+ private static final ListenableFuture<Boolean> IMMEDIATE_BOOLEAN_SUCCESS = Futures.immediateFuture(Boolean.TRUE);
+
+ private NoOpDOMStoreThreePhaseCommitCohort() {
+ // Hidden to prevent instantiation
+ }
+
+ @Override
+ public ListenableFuture<Boolean> canCommit() {
+ return IMMEDIATE_BOOLEAN_SUCCESS;
+ }
+
+ @Override
+ public ListenableFuture<Void> preCommit() {
+ return IMMEDIATE_VOID_SUCCESS;
+ }
+
+ @Override
+ public ListenableFuture<Void> abort() {
+ return IMMEDIATE_VOID_SUCCESS;
+ }
+
+ @Override
+ public ListenableFuture<Void> commit() {
+ return IMMEDIATE_VOID_SUCCESS;
+ }
+}
\ No newline at end of file
@Override
public void closeTransaction() {
- LOG.debug("NoOpTransactionContext {} closeTransaction called", identifier);
+ LOG.debug("NoOpTransactionContext {} closeTransaction called", getIdentifier());
}
@Override
public Future<ActorSelection> readyTransaction() {
- LOG.debug("Tx {} readyTransaction called", identifier);
+ LOG.debug("Tx {} readyTransaction called", getIdentifier());
operationLimiter.release();
return akka.dispatch.Futures.failed(failure);
}
@Override
public void deleteData(YangInstanceIdentifier path) {
- LOG.debug("Tx {} deleteData called path = {}", identifier, path);
+ LOG.debug("Tx {} deleteData called path = {}", getIdentifier(), path);
operationLimiter.release();
}
@Override
public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
- LOG.debug("Tx {} mergeData called path = {}", identifier, path);
+ LOG.debug("Tx {} mergeData called path = {}", getIdentifier(), path);
operationLimiter.release();
}
@Override
public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
- LOG.debug("Tx {} writeData called path = {}", identifier, path);
+ LOG.debug("Tx {} writeData called path = {}", getIdentifier(), path);
operationLimiter.release();
}
@Override
public void readData(final YangInstanceIdentifier path, SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
- LOG.debug("Tx {} readData called path = {}", identifier, path);
+ LOG.debug("Tx {} readData called path = {}", getIdentifier(), path);
operationLimiter.release();
proxyFuture.setException(new ReadFailedException("Error reading data for path " + path, failure));
}
@Override
public void dataExists(YangInstanceIdentifier path, SettableFuture<Boolean> proxyFuture) {
- LOG.debug("Tx {} dataExists called path = {}", identifier, path);
+ LOG.debug("Tx {} dataExists called path = {}", getIdentifier(), path);
operationLimiter.release();
proxyFuture.setException(new ReadFailedException("Error checking exists for path " + path, failure));
}
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
private final ShardStats shardMBean;
- private final List<ActorSelection> dataChangeListeners = Lists.newArrayList();
-
- private final List<DelayedListenerRegistration> delayedListenerRegistrations =
- Lists.newArrayList();
-
private DatastoreContext datastoreContext;
private SchemaContext schemaContext;
private final String txnDispatcherPath;
+ private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this);
+ private final DataChangeListenerSupport changeSupport = new DataChangeListenerSupport(this);
+
protected Shard(final ShardIdentifier name, final Map<String, String> peerAddresses,
final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig()));
store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
datastoreContext.getDataStoreProperties());
- if(schemaContext != null) {
+ if (schemaContext != null) {
store.onGlobalContextUpdated(schemaContext);
}
} else if (CloseTransactionChain.SERIALIZABLE_CLASS.isInstance(message)) {
closeTransactionChain(CloseTransactionChain.fromSerializable(message));
} else if (message instanceof RegisterChangeListener) {
- registerChangeListener((RegisterChangeListener) message);
+ changeSupport.onMessage((RegisterChangeListener) message, isLeader());
+ } else if (message instanceof RegisterDataTreeChangeListener) {
+ treeChangeSupport.onMessage((RegisterDataTreeChangeListener) message, isLeader());
} else if (message instanceof UpdateSchemaContext) {
updateSchemaContext((UpdateSchemaContext) message);
} else if (message instanceof PeerAddressResolved) {
store.onGlobalContextUpdated(schemaContext);
}
- private void registerChangeListener(final RegisterChangeListener registerChangeListener) {
-
- LOG.debug("{}: registerDataChangeListener for {}", persistenceId(), registerChangeListener.getPath());
-
- ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
- NormalizedNode<?, ?>>> registration;
- if(isLeader()) {
- registration = doChangeListenerRegistration(registerChangeListener);
- } else {
- LOG.debug("{}: Shard is not the leader - delaying registration", persistenceId());
-
- DelayedListenerRegistration delayedReg =
- new DelayedListenerRegistration(registerChangeListener);
- delayedListenerRegistrations.add(delayedReg);
- registration = delayedReg;
- }
-
- ActorRef listenerRegistration = getContext().actorOf(
- DataChangeListenerRegistration.props(registration));
-
- LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
- persistenceId(), listenerRegistration.path());
-
- getSender().tell(new RegisterChangeListenerReply(listenerRegistration), getSelf());
- }
-
- private ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
- NormalizedNode<?, ?>>> doChangeListenerRegistration(
- final RegisterChangeListener registerChangeListener) {
-
- ActorSelection dataChangeListenerPath = getContext().system().actorSelection(
- registerChangeListener.getDataChangeListenerPath());
-
- // Notify the listener if notifications should be enabled or not
- // If this shard is the leader then it will enable notifications else
- // it will not
- dataChangeListenerPath.tell(new EnableNotification(true), getSelf());
-
- // Now store a reference to the data change listener so it can be notified
- // at a later point if notifications should be enabled or disabled
- dataChangeListeners.add(dataChangeListenerPath);
-
- AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
- new DataChangeListenerProxy(dataChangeListenerPath);
-
- LOG.debug("{}: Registering for path {}", persistenceId(), registerChangeListener.getPath());
-
- return store.registerChangeListener(registerChangeListener.getPath(), listener,
- registerChangeListener.getScope());
- }
-
- private boolean isMetricsCaptureEnabled(){
+ private boolean isMetricsCaptureEnabled() {
CommonConfig config = new CommonConfig(getContext().system().settings().config());
return config.isMetricCaptureEnabled();
}
@Override
protected void onStateChanged() {
boolean isLeader = isLeader();
- for (ActorSelection dataChangeListener : dataChangeListeners) {
- dataChangeListener.tell(new EnableNotification(isLeader), getSelf());
- }
-
- if(isLeader) {
- for(DelayedListenerRegistration reg: delayedListenerRegistrations) {
- if(!reg.isClosed()) {
- reg.setDelegate(doChangeListenerRegistration(reg.getRegisterChangeListener()));
- }
- }
-
- delayedListenerRegistrations.clear();
- }
+ changeSupport.onLeadershipChange(isLeader);
+ treeChangeSupport.onLeadershipChange(isLeader);
// If this actor is no longer the leader close all the transaction chains
- if(!isLeader) {
+ if (!isLeader) {
if(LOG.isDebugEnabled()) {
LOG.debug(
"{}: onStateChanged: Closing all transaction chains because shard {} is no longer the leader",
ShardStats getShardMBean() {
return shardMBean;
}
-
- private static class DelayedListenerRegistration implements
- ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> {
-
- private volatile boolean closed;
-
- private final RegisterChangeListener registerChangeListener;
-
- private volatile ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
- NormalizedNode<?, ?>>> delegate;
-
- DelayedListenerRegistration(final RegisterChangeListener registerChangeListener) {
- this.registerChangeListener = registerChangeListener;
- }
-
- void setDelegate( final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
- NormalizedNode<?, ?>>> registration) {
- this.delegate = registration;
- }
-
- boolean isClosed() {
- return closed;
- }
-
- RegisterChangeListener getRegisterChangeListener() {
- return registerChangeListener;
- }
-
- @Override
- public AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> getInstance() {
- return delegate != null ? delegate.getInstance() : null;
- }
-
- @Override
- public void close() {
- closed = true;
- if(delegate != null) {
- delegate.close();
- }
- }
- }
}
@Override
public ListenableFuture<Void> commit() {
- OperationCallback operationCallback = (cohortFutures.size() == 0) ? NO_OP_CALLBACK :
+ OperationCallback operationCallback = cohortFutures.isEmpty() ? NO_OP_CALLBACK :
new TransactionRateLimitingCallback(actorContext);
return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(),
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorSelection;
-import akka.dispatch.OnComplete;
import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.List;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
-import scala.concurrent.Promise;
/**
* TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard
*/
public class TransactionChainProxy implements DOMStoreTransactionChain {
- private static final Logger LOG = LoggerFactory.getLogger(TransactionChainProxy.class);
-
private interface State {
boolean isReady();
private void checkReadyState(State state) {
Preconditions.checkState(state.isReady(), "Previous transaction is not ready yet");
}
-
- private static class ChainedTransactionProxy extends TransactionProxy {
-
- /**
- * Stores the ready Futures from the previous Tx in the chain.
- */
- private final List<Future<ActorSelection>> previousReadyFutures;
-
- /**
- * Stores the ready Futures from this transaction when it is readied.
- */
- private volatile List<Future<ActorSelection>> readyFutures;
-
- private ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType,
- String transactionChainId, List<Future<ActorSelection>> previousReadyFutures) {
- super(actorContext, transactionType, transactionChainId);
- this.previousReadyFutures = previousReadyFutures;
- }
-
- List<Future<ActorSelection>> getReadyFutures() {
- return readyFutures;
- }
-
- boolean isReady() {
- return readyFutures != null;
- }
-
- @Override
- protected void onTransactionReady(List<Future<ActorSelection>> readyFutures) {
- LOG.debug("onTransactionReady {} pending readyFutures size {} chain {}", getIdentifier(),
- readyFutures.size(), getTransactionChainId());
- this.readyFutures = readyFutures;
- }
-
- /**
- * This method is overridden to ensure the previous Tx's ready operations complete
- * before we initiate the next Tx in the chain to avoid creation failures if the
- * previous Tx's ready operations haven't completed yet.
- */
- @Override
- protected Future<ActorSelection> sendFindPrimaryShardAsync(final String shardName) {
- // Check if there are any previous ready Futures, otherwise let the super class handle it.
- if(previousReadyFutures.isEmpty()) {
- return super.sendFindPrimaryShardAsync(shardName);
- }
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("Waiting for {} previous ready futures for Tx {} on chain {}",
- previousReadyFutures.size(), getIdentifier(), getTransactionChainId());
- }
-
- // Combine the ready Futures into 1.
- Future<Iterable<ActorSelection>> combinedFutures = akka.dispatch.Futures.sequence(
- previousReadyFutures, getActorContext().getClientDispatcher());
-
- // Add a callback for completion of the combined Futures.
- final Promise<ActorSelection> returnPromise = akka.dispatch.Futures.promise();
- OnComplete<Iterable<ActorSelection>> onComplete = new OnComplete<Iterable<ActorSelection>>() {
- @Override
- public void onComplete(Throwable failure, Iterable<ActorSelection> notUsed) {
- if(failure != null) {
- // A Ready Future failed so fail the returned Promise.
- returnPromise.failure(failure);
- } else {
- LOG.debug("Previous Tx readied - sending FindPrimaryShard for {} on chain {}",
- getIdentifier(), getTransactionChainId());
-
- // Send the FindPrimaryShard message and use the resulting Future to complete the
- // returned Promise.
- returnPromise.completeWith(ChainedTransactionProxy.super.sendFindPrimaryShardAsync(shardName));
- }
- }
- };
-
- combinedFutures.onComplete(onComplete, getActorContext().getClientDispatcher());
-
- return returnPromise.future();
- }
- }
}
import akka.actor.ActorSelection;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.SettableFuture;
-import java.util.List;
+import java.util.Collection;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import scala.concurrent.Future;
void dataExists(YangInstanceIdentifier path, SettableFuture<Boolean> proxyFuture);
- List<Future<Object>> getRecordedOperationFutures();
+ void copyRecordedOperationFutures(Collection<Future<Object>> target);
}
@Override
public void closeTransaction() {
- LOG.debug("Tx {} closeTransaction called", identifier);
+ LOG.debug("Tx {} closeTransaction called", getIdentifier());
actorContext.sendOperationAsync(getActor(), CloseTransaction.INSTANCE.toSerializable());
}
@Override
public Future<ActorSelection> readyTransaction() {
LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
- identifier, recordedOperationFutures.size());
+ getIdentifier(), recordedOperationCount());
// Send the remaining batched modifications if any.
// Future will fail. We need all prior operations and the ready operation to succeed
// in order to attempt commit.
- List<Future<Object>> futureList = Lists.newArrayListWithCapacity(recordedOperationFutures.size() + 1);
- futureList.addAll(recordedOperationFutures);
+ List<Future<Object>> futureList = Lists.newArrayListWithCapacity(recordedOperationCount() + 1);
+ copyRecordedOperationFutures(futureList);
futureList.add(withLastReplyFuture);
Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(futureList,
@Override
public ActorSelection checkedApply(Iterable<Object> notUsed) {
LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
- identifier);
+ getIdentifier());
// At this point all the Futures succeeded and we need to extract the cohort
// actor path from the ReadyTransactionReply. For the recorded operations, they
} else {
// Throwing an exception here will fail the Future.
throw new IllegalArgumentException(String.format("%s: Invalid reply type %s",
- identifier, serializedReadyReply.getClass()));
+ getIdentifier(), serializedReadyReply.getClass()));
}
}
}, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
private void batchModification(Modification modification) {
if(batchedModifications == null) {
- batchedModifications = new BatchedModifications(identifier.toString(), remoteTransactionVersion,
+ batchedModifications = new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion,
transactionChainId);
}
private void sendAndRecordBatchedModifications() {
Future<Object> sentFuture = sendBatchedModifications();
if(sentFuture != null) {
- recordedOperationFutures.add(sentFuture);
+ recordOperationFuture(sentFuture);
}
}
Future<Object> sent = null;
if(batchedModifications != null) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} sending {} batched modifications, ready: {}", identifier,
+ LOG.debug("Tx {} sending {} batched modifications, ready: {}", getIdentifier(),
batchedModifications.getModifications().size(), ready);
}
batchedModifications.setReady(ready);
sent = executeOperationAsync(batchedModifications);
- batchedModifications = new BatchedModifications(identifier.toString(), remoteTransactionVersion,
+ batchedModifications = new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion,
transactionChainId);
}
@Override
public void deleteData(YangInstanceIdentifier path) {
- LOG.debug("Tx {} deleteData called path = {}", identifier, path);
+ LOG.debug("Tx {} deleteData called path = {}", getIdentifier(), path);
batchModification(new DeleteModification(path));
}
@Override
public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
- LOG.debug("Tx {} mergeData called path = {}", identifier, path);
+ LOG.debug("Tx {} mergeData called path = {}", getIdentifier(), path);
batchModification(new MergeModification(path, data));
}
@Override
public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
- LOG.debug("Tx {} writeData called path = {}", identifier, path);
+ LOG.debug("Tx {} writeData called path = {}", getIdentifier(), path);
batchModification(new WriteModification(path, data));
}
public void readData(
final YangInstanceIdentifier path,final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture ) {
- LOG.debug("Tx {} readData called path = {}", identifier, path);
+ LOG.debug("Tx {} readData called path = {}", getIdentifier(), path);
// Send the remaining batched modifications if any.
// must wait for them to successfully complete. This is necessary to honor the read
// uncommitted semantics of the public API contract. If any one fails then fail the read.
- if(recordedOperationFutures.isEmpty()) {
+ if(recordedOperationCount() == 0) {
finishReadData(path, returnFuture);
} else {
LOG.debug("Tx {} readData: verifying {} previous recorded operations",
- identifier, recordedOperationFutures.size());
+ getIdentifier(), recordedOperationCount());
// Note: we make a copy of recordedOperationFutures to be on the safe side in case
// Futures#sequence accesses the passed List on a different thread, as
// recordedOperationFutures is not synchronized.
Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
- Lists.newArrayList(recordedOperationFutures),
- actorContext.getClientDispatcher());
+ copyRecordedOperationFutures(), actorContext.getClientDispatcher());
OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
@Override
throws Throwable {
if(failure != null) {
LOG.debug("Tx {} readData: a recorded operation failed: {}",
- identifier, failure);
+ getIdentifier(), failure);
returnFuture.setException(new ReadFailedException(
"The read could not be performed because a previous put, merge,"
+ "or delete operation failed", failure));
private void finishReadData(final YangInstanceIdentifier path,
final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture) {
- LOG.debug("Tx {} finishReadData called path = {}", identifier, path);
+ LOG.debug("Tx {} finishReadData called path = {}", getIdentifier(), path);
OnComplete<Object> onComplete = new OnComplete<Object>() {
@Override
public void onComplete(Throwable failure, Object readResponse) throws Throwable {
if(failure != null) {
- LOG.debug("Tx {} read operation failed: {}", identifier, failure);
+ LOG.debug("Tx {} read operation failed: {}", getIdentifier(), failure);
returnFuture.setException(new ReadFailedException(
"Error reading data for path " + path, failure));
} else {
- LOG.debug("Tx {} read operation succeeded", identifier, failure);
+ LOG.debug("Tx {} read operation succeeded", getIdentifier(), failure);
if (readResponse instanceof ReadDataReply) {
ReadDataReply reply = (ReadDataReply) readResponse;
@Override
public void dataExists(final YangInstanceIdentifier path, final SettableFuture<Boolean> returnFuture) {
- LOG.debug("Tx {} dataExists called path = {}", identifier, path);
+ LOG.debug("Tx {} dataExists called path = {}", getIdentifier(), path);
// Send the remaining batched modifications if any.
// uncommitted semantics of the public API contract. If any one fails then fail this
// request.
- if(recordedOperationFutures.isEmpty()) {
+ if(recordedOperationCount() == 0) {
finishDataExists(path, returnFuture);
} else {
LOG.debug("Tx {} dataExists: verifying {} previous recorded operations",
- identifier, recordedOperationFutures.size());
+ getIdentifier(), recordedOperationCount());
// Note: we make a copy of recordedOperationFutures to be on the safe side in case
// Futures#sequence accesses the passed List on a different thread, as
// recordedOperationFutures is not synchronized.
Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
- Lists.newArrayList(recordedOperationFutures),
+ copyRecordedOperationFutures(),
actorContext.getClientDispatcher());
OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
@Override
throws Throwable {
if(failure != null) {
LOG.debug("Tx {} dataExists: a recorded operation failed: {}",
- identifier, failure);
+ getIdentifier(), failure);
returnFuture.setException(new ReadFailedException(
"The data exists could not be performed because a previous "
+ "put, merge, or delete operation failed", failure));
private void finishDataExists(final YangInstanceIdentifier path,
final SettableFuture<Boolean> returnFuture) {
- LOG.debug("Tx {} finishDataExists called path = {}", identifier, path);
+ LOG.debug("Tx {} finishDataExists called path = {}", getIdentifier(), path);
OnComplete<Object> onComplete = new OnComplete<Object>() {
@Override
public void onComplete(Throwable failure, Object response) throws Throwable {
if(failure != null) {
- LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure);
+ LOG.debug("Tx {} dataExists operation failed: {}", getIdentifier(), failure);
returnFuture.setException(new ReadFailedException(
"Error checking data exists for path " + path, failure));
} else {
- LOG.debug("Tx {} dataExists operation succeeded", identifier, failure);
+ LOG.debug("Tx {} dataExists operation succeeded", getIdentifier(), failure);
if (response instanceof DataExistsReply) {
returnFuture.set(Boolean.valueOf(((DataExistsReply) response).exists()));
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+/**
+ * Abstract superclass for transaction operations which should be executed
+ * on a {@link TransactionContext} at a later point in time.
+ */
+abstract class TransactionOperation {
+ /**
+ * Execute the delayed operation.
+ *
+ * @param transactionContext
+ */
+ protected abstract void invoke(TransactionContext transactionContext);
+}
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.util.ArrayList;
import java.util.Collection;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
* shards will be executed.
* </p>
*/
-public class TransactionProxy implements DOMStoreReadWriteTransaction {
+public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIdentifier> implements DOMStoreReadWriteTransaction {
public static enum TransactionType {
READ_ONLY,
WRITE_ONLY,
READ_WRITE;
- public static TransactionType fromInt(int type) {
- if(type == WRITE_ONLY.ordinal()) {
- return WRITE_ONLY;
- } else if(type == READ_WRITE.ordinal()) {
- return READ_WRITE;
- } else if(type == READ_ONLY.ordinal()) {
- return READ_ONLY;
- } else {
- throw new IllegalArgumentException("In TransactionType enum value" + type);
+ // Cache all values
+ private static final TransactionType[] VALUES = values();
+
+ public static TransactionType fromInt(final int type) {
+ try {
+ return VALUES[type];
+ } catch (IndexOutOfBoundsException e) {
+ throw new IllegalArgumentException("In TransactionType enum value " + type, e);
}
}
}
remoteTransactionActors = referent.remoteTransactionActors;
remoteTransactionActorsMB = referent.remoteTransactionActorsMB;
actorContext = referent.actorContext;
- identifier = referent.identifier;
+ identifier = referent.getIdentifier();
}
@Override
private final TransactionType transactionType;
private final ActorContext actorContext;
- private final TransactionIdentifier identifier;
private final String transactionChainId;
private final SchemaContext schemaContext;
private boolean inReadyState;
this(actorContext, transactionType, "");
}
- public TransactionProxy(ActorContext actorContext, TransactionType transactionType,
- String transactionChainId) {
+ public TransactionProxy(ActorContext actorContext, TransactionType transactionType, String transactionChainId) {
+ super(createIdentifier(actorContext));
this.actorContext = Preconditions.checkNotNull(actorContext,
"actorContext should not be null");
this.transactionType = Preconditions.checkNotNull(transactionType,
"schemaContext should not be null");
this.transactionChainId = transactionChainId;
+ LOG.debug("Created txn {} of type {} on chain {}", getIdentifier(), transactionType, transactionChainId);
+ }
+
+ private static TransactionIdentifier createIdentifier(ActorContext actorContext) {
String memberName = actorContext.getCurrentMemberName();
- if(memberName == null){
+ if (memberName == null) {
memberName = "UNKNOWN-MEMBER";
}
- this.identifier = new TransactionIdentifier(memberName, counter.getAndIncrement());
-
- LOG.debug("Created txn {} of type {} on chain {}", identifier, transactionType, transactionChainId);
+ return new TransactionIdentifier(memberName, counter.getAndIncrement());
}
@VisibleForTesting
List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
TransactionContext transactionContext = txFutureCallback.getTransactionContext();
- if(transactionContext != null) {
- recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures());
+ if (transactionContext != null) {
+ transactionContext.copyRecordedOperationFutures(recordedOperationFutures);
}
}
Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
"Read operation on write-only transaction is not allowed");
- LOG.debug("Tx {} read {}", identifier, path);
+ LOG.debug("Tx {} read {}", getIdentifier(), path);
throttleOperation();
Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
"Exists operation on write-only transaction is not allowed");
- LOG.debug("Tx {} exists {}", identifier, path);
+ LOG.debug("Tx {} exists {}", getIdentifier(), path);
throttleOperation();
checkModificationState();
- LOG.debug("Tx {} write {}", identifier, path);
+ LOG.debug("Tx {} write {}", getIdentifier(), path);
throttleOperation();
checkModificationState();
- LOG.debug("Tx {} merge {}", identifier, path);
+ LOG.debug("Tx {} merge {}", getIdentifier(), path);
throttleOperation();
checkModificationState();
- LOG.debug("Tx {} delete {}", identifier, path);
+ LOG.debug("Tx {} delete {}", getIdentifier(), path);
throttleOperation();
inReadyState = true;
- LOG.debug("Tx {} Readying {} transactions for commit", identifier,
+ LOG.debug("Tx {} Readying {} transactions for commit", getIdentifier(),
txFutureCallbackMap.size());
- if(txFutureCallbackMap.size() == 0) {
+ if (txFutureCallbackMap.isEmpty()) {
onTransactionReady(Collections.<Future<ActorSelection>>emptyList());
TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext);
return NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
- LOG.debug("Tx {} Readying transaction for shard {} chain {}", identifier,
+ LOG.debug("Tx {} Readying transaction for shard {} chain {}", getIdentifier(),
txFutureCallback.getShardName(), transactionChainId);
final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
onTransactionReady(cohortFutures);
return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
- identifier.toString());
+ getIdentifier().toString());
}
/**
protected void onTransactionReady(List<Future<ActorSelection>> cohortFutures) {
}
- @Override
- public Object getIdentifier() {
- return this.identifier;
- }
-
@Override
public void close() {
for (TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
return actorContext;
}
- /**
- * Interfaces for transaction operations to be invoked later.
- */
- private static interface TransactionOperation {
- void invoke(TransactionContext transactionContext);
- }
-
/**
* Implements a Future OnComplete callback for a CreateTransaction message. This class handles
* retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a
if(transactionType == TransactionType.WRITE_ONLY &&
actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context",
- identifier, primaryShard);
+ getIdentifier(), primaryShard);
// For write-only Tx's we prepare the transaction modifications directly on the shard actor
// to avoid the overhead of creating a separate transaction actor.
boolean invokeOperation = true;
synchronized(txOperationsOnComplete) {
if(transactionContext == null) {
- LOG.debug("Tx {} Adding operation on complete", identifier);
+ LOG.debug("Tx {} Adding operation on complete", getIdentifier());
invokeOperation = false;
txOperationsOnComplete.add(operation);
*/
private void tryCreateTransaction() {
if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} Primary shard {} found - trying create transaction", identifier, primaryShard);
+ LOG.debug("Tx {} Primary shard {} found - trying create transaction", getIdentifier(), primaryShard);
}
- Object serializedCreateMessage = new CreateTransaction(identifier.toString(),
+ Object serializedCreateMessage = new CreateTransaction(getIdentifier().toString(),
TransactionProxy.this.transactionType.ordinal(),
getTransactionChainId()).toSerializable();
// is ok.
if(--createTxTries > 0) {
LOG.debug("Tx {} Shard {} has no leader yet - scheduling create Tx retry",
- identifier, shardName);
+ getIdentifier(), shardName);
actorContext.getActorSystem().scheduler().scheduleOnce(CREATE_TX_TRY_INTERVAL,
new Runnable() {
// TransactionContext until after we've executed all cached TransactionOperations.
TransactionContext localTransactionContext;
if(failure != null) {
- LOG.debug("Tx {} Creating NoOpTransaction because of error", identifier, failure);
+ LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure);
- localTransactionContext = new NoOpTransactionContext(failure, identifier, operationLimiter);
- } else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
+ localTransactionContext = new NoOpTransactionContext(failure, getIdentifier(), operationLimiter);
+ } else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) {
localTransactionContext = createValidTransactionContext(
CreateTransactionReply.fromSerializable(response));
} else {
IllegalArgumentException exception = new IllegalArgumentException(String.format(
"Invalid reply type %s for CreateTransaction", response.getClass()));
- localTransactionContext = new NoOpTransactionContext(exception, identifier, operationLimiter);
+ localTransactionContext = new NoOpTransactionContext(exception, getIdentifier(), operationLimiter);
}
executeTxOperatonsOnComplete(localTransactionContext);
}
private TransactionContext createValidTransactionContext(CreateTransactionReply reply) {
- LOG.debug("Tx {} Received {}", identifier, reply);
+ LOG.debug("Tx {} Received {}", getIdentifier(), reply);
return createValidTransactionContext(actorContext.actorSelection(reply.getTransactionPath()),
reply.getTransactionPath(), reply.getVersion());
boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
- return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, identifier,
+ return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(),
transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion,
operationCompleter);
} else if (transactionType == TransactionType.WRITE_ONLY &&
actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
- return new WriteOnlyTransactionContextImpl(transactionActor, identifier, transactionChainId,
+ return new WriteOnlyTransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
} else {
- return new TransactionContextImpl(transactionActor, identifier, transactionChainId,
+ return new TransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
}
}
}
-
- private static class NoOpDOMStoreThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort {
- static NoOpDOMStoreThreePhaseCommitCohort INSTANCE = new NoOpDOMStoreThreePhaseCommitCohort();
-
- private static final ListenableFuture<Void> IMMEDIATE_VOID_SUCCESS =
- com.google.common.util.concurrent.Futures.immediateFuture(null);
- private static final ListenableFuture<Boolean> IMMEDIATE_BOOLEAN_SUCCESS =
- com.google.common.util.concurrent.Futures.immediateFuture(Boolean.TRUE);
-
- private NoOpDOMStoreThreePhaseCommitCohort() {
- }
-
- @Override
- public ListenableFuture<Boolean> canCommit() {
- return IMMEDIATE_BOOLEAN_SUCCESS;
- }
-
- @Override
- public ListenableFuture<Void> preCommit() {
- return IMMEDIATE_VOID_SUCCESS;
- }
-
- @Override
- public ListenableFuture<Void> abort() {
- return IMMEDIATE_VOID_SUCCESS;
- }
-
- @Override
- public ListenableFuture<Void> commit() {
- return IMMEDIATE_VOID_SUCCESS;
- }
- }
}
@Override
public Future<ActorSelection> readyTransaction() {
LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
- identifier, recordedOperationFutures.size());
+ getIdentifier(), recordedOperationCount());
// Send the remaining batched modifications if any.
@Override
public void deleteData(YangInstanceIdentifier path) {
- recordedOperationFutures.add(executeOperationAsync(
+ recordOperationFuture(executeOperationAsync(
new DeleteData(path, getRemoteTransactionVersion())));
}
@Override
public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
- recordedOperationFutures.add(executeOperationAsync(
+ recordOperationFuture(executeOperationAsync(
new MergeData(path, data, getRemoteTransactionVersion())));
}
@Override
public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
- recordedOperationFutures.add(executeOperationAsync(
+ recordOperationFuture(executeOperationAsync(
new WriteData(path, data, getRemoteTransactionVersion())));
}
@Override
public Future<ActorSelection> readyTransaction() {
LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
- identifier, recordedOperationFutures.size());
+ getIdentifier(), recordedOperationCount());
// Send the ReadyTransaction message to the Tx actor.
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import java.io.ObjectStreamException;
+import java.io.Serializable;
+
+public final class CloseDataTreeChangeListenerRegistration implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private static final CloseDataTreeChangeListenerRegistration INSTANCE = new CloseDataTreeChangeListenerRegistration();
+
+ private CloseDataTreeChangeListenerRegistration() {
+ }
+
+ public static CloseDataTreeChangeListenerRegistration getInstance() {
+ return INSTANCE;
+ }
+
+ private Object readResolve() throws ObjectStreamException {
+ return INSTANCE;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import java.io.ObjectStreamException;
+import java.io.Serializable;
+
+public final class CloseDataTreeChangeListenerRegistrationReply implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private static final CloseDataTreeChangeListenerRegistrationReply INSTANCE = new CloseDataTreeChangeListenerRegistrationReply();
+
+ private CloseDataTreeChangeListenerRegistrationReply() {
+ // Use getInstance() instead
+ }
+
+ public static CloseDataTreeChangeListenerRegistrationReply getInstance() {
+ return INSTANCE;
+ }
+
+ private Object readResolve() throws ObjectStreamException {
+ return INSTANCE;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+
+/**
+ * A message about a DataTree having been changed. The message is not
+ * serializable on purpose. For delegating the change across cluster nodes,
+ * this needs to be intercepted by a local agent and forwarded as
+ * a {@link DataTreeDelta}.
+ */
+public final class DataTreeChanged {
+ private final Collection<DataTreeCandidate> changes;
+
+ public DataTreeChanged(final Collection<DataTreeCandidate> changes) {
+ this.changes = Preconditions.checkNotNull(changes);
+ }
+
+ /**
+ * Return the data changes.
+ *
+ * @return Change events
+ */
+ public Collection<DataTreeCandidate> getChanges() {
+ return changes;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import java.io.ObjectStreamException;
+import java.io.Serializable;
+
+public final class DataTreeChangedReply implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private static final DataTreeChangedReply INSTANCE = new DataTreeChangedReply();
+
+ private DataTreeChangedReply() {
+ // Use getInstance() instead
+ }
+
+ public static DataTreeChangedReply getInstance() {
+ return INSTANCE;
+ }
+
+ private Object readResolve() throws ObjectStreamException {
+ return INSTANCE;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import akka.actor.ActorRef;
+import com.google.common.base.Preconditions;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+/**
+ * Request a {@link org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener} registration be made on the shard
+ * leader.
+ */
+public final class RegisterDataTreeChangeListener implements Externalizable {
+ private static final long serialVersionUID = 1L;
+ private ActorRef dataTreeChangeListenerPath;
+ private YangInstanceIdentifier path;
+
+ public RegisterDataTreeChangeListener(final YangInstanceIdentifier path, final ActorRef dataTreeChangeListenerPath) {
+ this.path = Preconditions.checkNotNull(path);
+ this.dataTreeChangeListenerPath = Preconditions.checkNotNull(dataTreeChangeListenerPath);
+ }
+
+ public YangInstanceIdentifier getPath() {
+ return path;
+ }
+
+ public ActorRef getDataTreeChangeListenerPath() {
+ return dataTreeChangeListenerPath;
+ }
+
+ @Override
+ public void writeExternal(final ObjectOutput out) throws IOException {
+ out.writeObject(dataTreeChangeListenerPath);
+ SerializationUtils.serializePath(path, out);
+ }
+
+ @Override
+ public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+ dataTreeChangeListenerPath = (ActorRef) in.readObject();
+ path = SerializationUtils.deserializePath(in);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import akka.actor.ActorRef;
+import com.google.common.base.Preconditions;
+import java.io.Serializable;
+
+/**
+ * Successful reply to a {@link RegisterDataTreeChangeListener} request.
+ */
+public final class RegisterDataTreeChangeListenerReply implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private final ActorRef listenerRegistrationPath;
+
+ public RegisterDataTreeChangeListenerReply(final ActorRef listenerRegistrationPath) {
+ this.listenerRegistrationPath = Preconditions.checkNotNull(listenerRegistrationPath);
+ }
+
+ public ActorRef getListenerRegistrationPath() {
+ return listenerRegistrationPath;
+ }
+}
import com.google.common.util.concurrent.ListenableFuture;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.InsufficientCapacityException;
-import com.lmax.disruptor.SleepingWaitStrategy;
+import com.lmax.disruptor.PhasedBackoffWaitStrategy;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
*/
public final class DOMNotificationRouter implements AutoCloseable, DOMNotificationPublishService, DOMNotificationService {
private static final ListenableFuture<Void> NO_LISTENERS = Futures.immediateFuture(null);
- private static final WaitStrategy DEFAULT_STRATEGY = new SleepingWaitStrategy();
+ private static final WaitStrategy DEFAULT_STRATEGY = PhasedBackoffWaitStrategy.withLock(1L, 30L, TimeUnit.MILLISECONDS);
private static final EventHandler<DOMNotificationRouterEvent> DISPATCH_NOTIFICATIONS = new EventHandler<DOMNotificationRouterEvent>() {
@Override
public void onEvent(final DOMNotificationRouterEvent event, final long sequence, final boolean endOfBatch) throws Exception {
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.md.sal.dom.store.impl;
+package org.opendaylight.controller.sal.core.spi.data;
+import com.google.common.annotations.Beta;
import com.google.common.base.MoreObjects;
import com.google.common.base.MoreObjects.ToStringHelper;
import com.google.common.base.Preconditions;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
-import org.slf4j.Logger;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
/**
- * Abstract DOM Store Transaction
+ * Abstract DOM Store Transaction.
*
* Convenience super implementation of DOM Store transaction which provides
* common implementation of {@link #toString()} and {@link #getIdentifier()}.
+ *
+ * It can optionally capture the context where it was allocated.
+ *
+ * <T> identifier type
*/
-abstract class AbstractDOMStoreTransaction implements DOMStoreTransaction {
+@Beta
+public abstract class AbstractDOMStoreTransaction<T> implements DOMStoreTransaction {
private final Throwable debugContext;
- private final Object identifier;
+ private final T identifier;
+
+ protected AbstractDOMStoreTransaction(@Nonnull final T identifier) {
+ this(identifier, false);
+ }
- protected AbstractDOMStoreTransaction(final Object identifier, final boolean debug) {
+ protected AbstractDOMStoreTransaction(@Nonnull final T identifier, final boolean debug) {
this.identifier = Preconditions.checkNotNull(identifier, "Identifier must not be null.");
this.debugContext = debug ? new Throwable().fillInStackTrace() : null;
}
@Override
- public final Object getIdentifier() {
+ public final T getIdentifier() {
return identifier;
}
- protected final void warnDebugContext(final Logger logger) {
- if (debugContext != null) {
- logger.warn("Transaction {} has been allocated in the following context", identifier, debugContext);
- }
+ /**
+ * Return the context in which this transaction was allocated.
+ *
+ * @return The context in which this transaction was allocated, or null
+ * if the context was not recorded.
+ */
+ @Nullable public final Throwable getDebugContext() {
+ return debugContext;
}
@Override
* ToStringHelper instance
* @return ToStringHelper instance which was passed in
*/
- protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
+ protected ToStringHelper addToStringAttributes(@Nonnull final ToStringHelper toStringHelper) {
return toStringHelper.add("id", identifier);
}
-}
\ No newline at end of file
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.core.spi.data;
+
+import com.google.common.annotations.Beta;
+import com.google.common.collect.ForwardingObject;
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * Abstract base class for {@link DOMStoreThreePhaseCommitCohort} implementations,
+ * which forward most of their functionality to a backend {@link #delegate()}.
+ */
+@Beta
+public abstract class ForwardingDOMStoreThreePhaseCommitCohort extends ForwardingObject implements DOMStoreThreePhaseCommitCohort {
+ @Override
+ protected abstract DOMStoreThreePhaseCommitCohort delegate();
+
+ @Override
+ public ListenableFuture<Boolean> canCommit() {
+ return delegate().canCommit();
+ }
+
+ @Override
+ public ListenableFuture<Void> preCommit() {
+ return delegate().preCommit();
+ }
+
+ @Override
+ public ListenableFuture<Void> abort() {
+ return delegate().abort();
+ }
+
+ @Override
+ public ListenableFuture<Void> commit() {
+ return delegate().commit();
+ }
+}
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.ForwardingDOMStoreThreePhaseCommitCohort;
-final class ChainedTransactionCommitImpl implements DOMStoreThreePhaseCommitCohort {
+final class ChainedTransactionCommitImpl extends ForwardingDOMStoreThreePhaseCommitCohort {
private final SnapshotBackedWriteTransaction transaction;
private final DOMStoreThreePhaseCommitCohort delegate;
private final DOMStoreTransactionChainImpl txChain;
- protected ChainedTransactionCommitImpl(final SnapshotBackedWriteTransaction transaction,
+ ChainedTransactionCommitImpl(final SnapshotBackedWriteTransaction transaction,
final DOMStoreThreePhaseCommitCohort delegate, final DOMStoreTransactionChainImpl txChain) {
this.transaction = Preconditions.checkNotNull(transaction);
this.delegate = Preconditions.checkNotNull(delegate);
}
@Override
- public ListenableFuture<Boolean> canCommit() {
- return delegate.canCommit();
- }
-
- @Override
- public ListenableFuture<Void> preCommit() {
- return delegate.preCommit();
- }
-
- @Override
- public ListenableFuture<Void> abort() {
- return delegate.abort();
+ protected DOMStoreThreePhaseCommitCohort delegate() {
+ return delegate;
}
@Override
public ListenableFuture<Void> commit() {
- ListenableFuture<Void> commitFuture = delegate.commit();
+ ListenableFuture<Void> commitFuture = super.commit();
Futures.addCallback(commitFuture, new FutureCallback<Void>() {
@Override
public void onFailure(final Throwable t) {
});
return commitFuture;
}
+
}
\ No newline at end of file
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
+import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
return name + "-" + txCounter.getAndIncrement();
}
+ private static void warnDebugContext(AbstractDOMStoreTransaction<?> transaction) {
+ final Throwable ctx = transaction.getDebugContext();
+ if (ctx != null) {
+ LOG.warn("Transaction {} has been allocated in the following context", transaction.getIdentifier(), ctx);
+ }
+ }
+
private final class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
private final SnapshotBackedWriteTransaction transaction;
private final DataTreeModification modification;
} catch (ConflictingModificationAppliedException e) {
LOG.warn("Store Tx: {} Conflicting modification for {}.", transaction.getIdentifier(),
e.getPath());
- transaction.warnDebugContext(LOG);
+ warnDebugContext(transaction);
return Futures.immediateFailedFuture(new OptimisticLockFailedException("Optimistic lock failed.", e));
} catch (DataValidationFailedException e) {
LOG.warn("Store Tx: {} Data Precondition failed for {}.", transaction.getIdentifier(),
e.getPath(), e);
- transaction.warnDebugContext(LOG);
+ warnDebugContext(transaction);
// For debugging purposes, allow dumping of the modification. Coupled with the above
// precondition log, it should allow us to understand what went on.
package org.opendaylight.controller.md.sal.dom.store.impl;
import static com.google.common.base.Preconditions.checkNotNull;
-
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
-
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
* which delegates most of its calls to similar methods provided by underlying snapshot.
*
*/
-final class SnapshotBackedReadTransaction extends AbstractDOMStoreTransaction
+final class SnapshotBackedReadTransaction extends AbstractDOMStoreTransaction<Object>
implements DOMStoreReadTransaction {
private static final Logger LOG = LoggerFactory.getLogger(SnapshotBackedReadTransaction.class);
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
* {@link org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype}.
*
*/
-class SnapshotBackedWriteTransaction extends AbstractDOMStoreTransaction implements DOMStoreWriteTransaction {
+class SnapshotBackedWriteTransaction extends AbstractDOMStoreTransaction<Object> implements DOMStoreWriteTransaction {
private static final Logger LOG = LoggerFactory.getLogger(SnapshotBackedWriteTransaction.class);
private static final AtomicReferenceFieldUpdater<SnapshotBackedWriteTransaction, TransactionReadyPrototype> READY_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(SnapshotBackedWriteTransaction.class, TransactionReadyPrototype.class, "readyImpl");